In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=f514ccddbba9eda486934110cb922d51aa3d1b8970f7ede7c0bcfe0da786df19
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
import requests

In [4]:
def download_file(url, filename):

    try:
        response = requests.get(url)
        response.raise_for_status()
        with open(filename, 'wb') as f:
            f.write(response.content)
        print(f"File downloaded successfully and saved as {filename}")
    except requests.exceptions.HTTPError as errh:
        print(f"HTTP Error: {errh}")
    except requests.exceptions.ConnectionError as errc:
        print(f"Error Connecting: {errc}")
    except requests.exceptions.Timeout as errt:
        print(f"Timeout Error: {errt}")
    except requests.exceptions.RequestException as err:
        print(f"OOps: Something Else: {err}")

In [10]:
url = 'https://raw.githubusercontent.com/prasertcbs/basic-dataset/master/Employee%20data.csv'
filename = 'data.csv'

download_file(url, filename)

File downloaded successfully and saved as data.csv


In [11]:
from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder.appName('DataFrame').getOrCreate()
spark

In [13]:
df_pyspark = spark.read.csv('data.csv', header=True, inferSchema=True)
df_pyspark.show(5)

+---+------+----------+----+--------+-------+--------+-------+-------+--------+
| id|gender|     bdate|educ|  jobcat| salary|salbegin|jobtime|prevexp|minority|
+---+------+----------+----+--------+-------+--------+-------+-------+--------+
|1.0|  Male|1952-02-03|  15| Manager|57000.0| 27000.0|   98.0|  144.0|      No|
|2.0|  Male|1958-05-23|  16|Clerical|40200.0| 18750.0|   98.0|   36.0|      No|
|3.0|Female|1929-07-26|  12|Clerical|21450.0| 12000.0|   98.0|  381.0|      No|
|4.0|Female|1947-04-15|   8|Clerical|21900.0| 13200.0|   98.0|  190.0|      No|
|5.0|  Male|1955-02-09|  15|Clerical|45000.0| 21000.0|   98.0|  138.0|      No|
+---+------+----------+----+--------+-------+--------+-------+-------+--------+
only showing top 5 rows



In [14]:
df_pyspark.printSchema()

root
 |-- id: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- bdate: date (nullable = true)
 |-- educ: integer (nullable = true)
 |-- jobcat: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- salbegin: double (nullable = true)
 |-- jobtime: double (nullable = true)
 |-- prevexp: string (nullable = true)
 |-- minority: string (nullable = true)



remove null rows

In [16]:
df_pyspark=df_pyspark.na.drop()
df_pyspark.show(5)

+---+------+----------+----+--------+-------+--------+-------+-------+--------+
| id|gender|     bdate|educ|  jobcat| salary|salbegin|jobtime|prevexp|minority|
+---+------+----------+----+--------+-------+--------+-------+-------+--------+
|1.0|  Male|1952-02-03|  15| Manager|57000.0| 27000.0|   98.0|  144.0|      No|
|2.0|  Male|1958-05-23|  16|Clerical|40200.0| 18750.0|   98.0|   36.0|      No|
|3.0|Female|1929-07-26|  12|Clerical|21450.0| 12000.0|   98.0|  381.0|      No|
|4.0|Female|1947-04-15|   8|Clerical|21900.0| 13200.0|   98.0|  190.0|      No|
|5.0|  Male|1955-02-09|  15|Clerical|45000.0| 21000.0|   98.0|  138.0|      No|
+---+------+----------+----+--------+-------+--------+-------+-------+--------+
only showing top 5 rows



## Vector Assembler

To group features together

In [18]:
from pyspark.ml.feature import VectorAssembler

takes two argument
1. inputCols
2. outputCol

In [40]:
# Create new Column

feature_assembler = VectorAssembler(inputCols=["educ", "salbegin", "jobtime"], outputCol="Independent_Features")

In [41]:
output = feature_assembler.transform(df_pyspark)

In [43]:
output.show(5)

+---+------+----------+----+--------+-------+--------+-------+-------+--------+-----------+--------------------+
| id|gender|     bdate|educ|  jobcat| salary|salbegin|jobtime|prevexp|minority|prevexp-int|Independent_Features|
+---+------+----------+----+--------+-------+--------+-------+-------+--------+-----------+--------------------+
|1.0|  Male|1952-02-03|  15| Manager|57000.0| 27000.0|   98.0|  144.0|      No|        144| [15.0,27000.0,98.0]|
|2.0|  Male|1958-05-23|  16|Clerical|40200.0| 18750.0|   98.0|   36.0|      No|         36| [16.0,18750.0,98.0]|
|3.0|Female|1929-07-26|  12|Clerical|21450.0| 12000.0|   98.0|  381.0|      No|        381| [12.0,12000.0,98.0]|
|4.0|Female|1947-04-15|   8|Clerical|21900.0| 13200.0|   98.0|  190.0|      No|        190|  [8.0,13200.0,98.0]|
|5.0|  Male|1955-02-09|  15|Clerical|45000.0| 21000.0|   98.0|  138.0|      No|        138| [15.0,21000.0,98.0]|
+---+------+----------+----+--------+-------+--------+-------+-------+--------+-----------+-----

take only necessary columns now

-> based on `Independent_Features` predict `salary`

In [45]:
finalized_data = output.select("Independent_Features", "salary")
finalized_data.show(5)

+--------------------+-------+
|Independent_Features| salary|
+--------------------+-------+
| [15.0,27000.0,98.0]|57000.0|
| [16.0,18750.0,98.0]|40200.0|
| [12.0,12000.0,98.0]|21450.0|
|  [8.0,13200.0,98.0]|21900.0|
| [15.0,21000.0,98.0]|45000.0|
+--------------------+-------+
only showing top 5 rows



## Train Test split

In [47]:
from pyspark.ml.regression import LinearRegression

In [50]:
train_data,test_data=finalized_data.randomSplit([0.75,0.25])

In [51]:
regressor=LinearRegression(featuresCol='Independent_Features', labelCol='salary')
regressor=regressor.fit(train_data)

coefficients and intercepts

In [52]:
regressor.coefficients

DenseVector([924.4474, 1.7515, 156.3267])

In [53]:
regressor.intercept

-20478.107892954664

In [56]:
pred_results=regressor.evaluate(test_data)

In [57]:
pred_results.predictions.show()

+--------------------+-------+------------------+
|Independent_Features| salary|        prediction|
+--------------------+-------+------------------+
|  [8.0,10950.0,81.0]|22500.0|18758.657454794833|
|  [8.0,10950.0,92.0]|24000.0|20478.251484842105|
|  [8.0,11250.0,96.0]|31350.0|21629.002823072733|
|  [8.0,12450.0,74.0]|21750.0|20291.592435831975|
|  [8.0,13200.0,88.0]|20100.0| 23793.77770142575|
|  [8.0,13500.0,65.0]|21600.0| 20723.70732954036|
|  [8.0,14250.0,95.0]|29250.0|26727.120275202884|
|  [8.0,15000.0,87.0]|30750.0|26790.117480702116|
|  [8.0,15750.0,67.0]|30000.0|24977.193926149783|
|  [8.0,15750.0,74.0]|31950.0|26071.481036179866|
|  [8.0,15750.0,78.0]|24000.0|26696.787956197055|
|  [12.0,9000.0,91.0]|30750.0|20604.325512755764|
| [12.0,10200.0,66.0]|16350.0| 18797.93493550211|
| [12.0,10200.0,72.0]|19950.0|19735.895315527894|
| [12.0,10200.0,84.0]|16500.0| 21611.81607557946|
| [12.0,10500.0,92.0]|28500.0|23387.874333827283|
| [12.0,10950.0,81.0]|19650.0| 22456.44693110018|
