In [111]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
import openpyxl

In [112]:
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
spark

In [113]:
# read csv file
df_pyspark = spark.read.csv('test.csv', header='true', inferSchema='true')
df_pyspark.show()

+------+----+----------+-----------+------+
|  name| age|experience| department|salary|
+------+----+----------+-----------+------+
|Rushil|24.0|         2|Engineering| 60000|
| Manav|27.0|         5|         HR| 75000|
| Ruchi|30.0|         8|    Finance| 80000|
|Akshat|28.0|         6|Engineering| 70000|
|Shivam|35.0|        10|         HR| 90000|
+------+----+----------+-----------+------+



In [114]:
# Check Schema
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- experience: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)



In [115]:
df_pyspark.na.drop(how="any").show()

+------+----+----------+-----------+------+
|  name| age|experience| department|salary|
+------+----+----------+-----------+------+
|Rushil|24.0|         2|Engineering| 60000|
| Manav|27.0|         5|         HR| 75000|
| Ruchi|30.0|         8|    Finance| 80000|
|Akshat|28.0|         6|Engineering| 70000|
|Shivam|35.0|        10|         HR| 90000|
+------+----+----------+-----------+------+



In [116]:
df_pyspark.na.drop(how="any", thresh=1).show()

+------+----+----------+-----------+------+
|  name| age|experience| department|salary|
+------+----+----------+-----------+------+
|Rushil|24.0|         2|Engineering| 60000|
| Manav|27.0|         5|         HR| 75000|
| Ruchi|30.0|         8|    Finance| 80000|
|Akshat|28.0|         6|Engineering| 70000|
|Shivam|35.0|        10|         HR| 90000|
+------+----+----------+-----------+------+



In [117]:
# For string columns
df_filled = df_pyspark.na.fill("Missing", subset=["name"])

# For numeric columns
df_filled = df_filled.na.fill(0, subset=[ "salary"])

df_filled.show()

+------+----+----------+-----------+------+
|  name| age|experience| department|salary|
+------+----+----------+-----------+------+
|Rushil|24.0|         2|Engineering| 60000|
| Manav|27.0|         5|         HR| 75000|
| Ruchi|30.0|         8|    Finance| 80000|
|Akshat|28.0|         6|Engineering| 70000|
|Shivam|35.0|        10|         HR| 90000|
+------+----+----------+-----------+------+



In [118]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["age", "experience", "salary"],
    outputCols=["{}_imputed".format(c) for c in ["age", "experience", "salary"]]
).setStrategy("median")

In [119]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+------+----+----------+-----------+------+-----------+------------------+--------------+
|  name| age|experience| department|salary|age_imputed|experience_imputed|salary_imputed|
+------+----+----------+-----------+------+-----------+------------------+--------------+
|Rushil|24.0|         2|Engineering| 60000|       24.0|                 2|         60000|
| Manav|27.0|         5|         HR| 75000|       27.0|                 5|         75000|
| Ruchi|30.0|         8|    Finance| 80000|       30.0|                 8|         80000|
|Akshat|28.0|         6|Engineering| 70000|       28.0|                 6|         70000|
|Shivam|35.0|        10|         HR| 90000|       35.0|                10|         90000|
+------+----+----------+-----------+------+-----------+------------------+--------------+



In [120]:
# Salary of people with less or equal to 70000
df_pyspark.filter(df_pyspark.salary <= 70000).show()

+------+----+----------+-----------+------+
|  name| age|experience| department|salary|
+------+----+----------+-----------+------+
|Rushil|24.0|         2|Engineering| 60000|
|Akshat|28.0|         6|Engineering| 70000|
+------+----+----------+-----------+------+



In [121]:
df_pyspark.filter(df_pyspark.salary <= 70000).select(["name", "age"]).show()

+------+----+
|  name| age|
+------+----+
|Rushil|24.0|
|Akshat|28.0|
+------+----+



In [122]:
df_pyspark.filter((df_pyspark.salary <= 70000) & (df_pyspark.experience >= 5)).show()

+------+----+----------+-----------+------+
|  name| age|experience| department|salary|
+------+----+----------+-----------+------+
|Akshat|28.0|         6|Engineering| 70000|
+------+----+----------+-----------+------+



In [123]:
# GroupBy
df_pyspark.groupBy("Name").sum().show()

+------+--------+---------------+-----------+
|  Name|sum(age)|sum(experience)|sum(salary)|
+------+--------+---------------+-----------+
|Rushil|    24.0|              2|      60000|
| Ruchi|    30.0|              8|      80000|
|Akshat|    28.0|              6|      70000|
|Shivam|    35.0|             10|      90000|
| Manav|    27.0|              5|      75000|
+------+--------+---------------+-----------+



In [124]:
# Group By Department
df_pyspark.groupBy("Department").sum().show()

+-----------+--------+---------------+-----------+
| Department|sum(age)|sum(experience)|sum(salary)|
+-----------+--------+---------------+-----------+
|Engineering|    52.0|              8|     130000|
|         HR|    62.0|             15|     165000|
|    Finance|    30.0|              8|      80000|
+-----------+--------+---------------+-----------+



In [125]:
df_pyspark.groupBy("Department").mean().show()

+-----------+--------+---------------+-----------+
| Department|avg(age)|avg(experience)|avg(salary)|
+-----------+--------+---------------+-----------+
|Engineering|    26.0|            4.0|    65000.0|
|         HR|    31.0|            7.5|    82500.0|
|    Finance|    30.0|            8.0|    80000.0|
+-----------+--------+---------------+-----------+



In [126]:
df_pyspark.groupBy("Department").count().show()

+-----------+-----+
| Department|count|
+-----------+-----+
|Engineering|    2|
|         HR|    2|
|    Finance|    1|
+-----------+-----+



In [127]:
df_pyspark.agg({'salary': 'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|     375000|
+-----------+



In [129]:
df_pyspark.show()

+------+----+----------+-----------+------+
|  name| age|experience| department|salary|
+------+----+----------+-----------+------+
|Rushil|24.0|         2|Engineering| 60000|
| Manav|27.0|         5|         HR| 75000|
| Ruchi|30.0|         8|    Finance| 80000|
|Akshat|28.0|         6|Engineering| 70000|
|Shivam|35.0|        10|         HR| 90000|
+------+----+----------+-----------+------+



In [130]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["age", "experience"], outputCol="Independent Features")

In [131]:
output = vectorAssembler.transform(df_pyspark)

In [132]:
output.show()

+------+----+----------+-----------+------+--------------------+
|  name| age|experience| department|salary|Independent Features|
+------+----+----------+-----------+------+--------------------+
|Rushil|24.0|         2|Engineering| 60000|          [24.0,2.0]|
| Manav|27.0|         5|         HR| 75000|          [27.0,5.0]|
| Ruchi|30.0|         8|    Finance| 80000|          [30.0,8.0]|
|Akshat|28.0|         6|Engineering| 70000|          [28.0,6.0]|
|Shivam|35.0|        10|         HR| 90000|         [35.0,10.0]|
+------+----+----------+-----------+------+--------------------+



In [133]:
finalized_data = output.select("Independent Features", "salary")

In [134]:
finalized_data.show()

+--------------------+------+
|Independent Features|salary|
+--------------------+------+
|          [24.0,2.0]| 60000|
|          [27.0,5.0]| 75000|
|          [30.0,8.0]| 80000|
|          [28.0,6.0]| 70000|
|         [35.0,10.0]| 90000|
+--------------------+------+



In [135]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='Independent Features', labelCol='salary')
regressor = regressor.fit(train_data)

25/10/03 16:23:33 WARN Instrumentation: [71513e99] regParam is zero, which might cause numerical instability and overfitting.
25/10/03 16:23:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/10/03 16:23:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [136]:
regressor.coefficients

DenseVector([-0.0, 5000.0])

In [137]:
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

+--------------------+------+-----------------+
|Independent Features|salary|       prediction|
+--------------------+------+-----------------+
|          [24.0,2.0]| 60000|49999.99999999482|
|          [27.0,5.0]| 75000|64999.99999999815|
+--------------------+------+-----------------+

