# PySpark Tutorial

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
#using pandas
df_pd = pd.read_csv('original.csv')
df_pd.head()

Unnamed: 0,id,first_name,last_name,gender,City,JobTitle,Salary,Latitude,Longitude
0,1,Melinde,Shilburne,Female,Nowa Ruda,Assistant Professor,$57438.18,50.577407,16.496718
1,2,Kimberly,Von Welden,Female,Bulgan,Programmer II,$62846.60,48.823157,103.52182
2,3,Alvera,Di Boldi,Female,,,$57576.52,39.994746,116.339772
3,4,Shannon,O'Griffin,Male,Divnomorskoye,Budget/Accounting Analyst II,$61489.23,44.504721,38.130017
4,5,Sherwood,Macieja,Male,Mytishchi,VP Sales,$63863.09,,37.648995


In [3]:
# Spark Session
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [4]:
spark

In [5]:
#df_spark = spark.read.format("csv").option("header","true").load("original.csv")
df_spark = spark.read.option("header","true").csv('original.csv')

In [6]:
df_spark.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [7]:
print(type(df_pd))
print(type(df_spark))

<class 'pandas.core.frame.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [8]:
df_spark.printSchema()

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- City: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Every column is treated as string

In [9]:
df = spark.read.option('header','true').csv('original.csv',inferSchema=True)
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [10]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- City: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [11]:
#another way of doing it
df= spark.read.csv('original.csv',header=True,inferSchema=True)
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [12]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- City: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [13]:
#to print columns
df.columns

['id',
 'first_name',
 'last_name',
 'gender',
 'City',
 'JobTitle',
 'Salary',
 'Latitude',
 'Longitude']

In [15]:
df.head(3)

[Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude=50.5774075, Longitude=16.4967184),
 Row(id=2, first_name='Kimberly', last_name='Von Welden', gender='Female', City='Bulgan', JobTitle='Programmer II', Salary='$62846.60', Latitude=48.8231572, Longitude=103.5218199),
 Row(id=3, first_name='Alvera', last_name='Di Boldi', gender='Female', City=None, JobTitle=None, Salary='$57576.52', Latitude=39.9947462, Longitude=116.3397725)]

In [16]:
# To Select Column
df.select(['first_name','JobTitle']).show()

+----------+--------------------+
|first_name|            JobTitle|
+----------+--------------------+
|   Melinde| Assistant Professor|
|  Kimberly|       Programmer II|
|    Alvera|                null|
|   Shannon|Budget/Accounting...|
|  Sherwood|            VP Sales|
|     Maris|      Civil Engineer|
|     Masha|                null|
|   Goddart|Desktop Support T...|
|      Roth|VP Product Manage...|
|      Bran|Mechanical System...|
|    Kylynn|Nuclear Power Eng...|
|       Rey|Systems Administr...|
|      Kerr|Compensation Analyst|
|    Mickie|Assistant Media P...|
|    Kaspar|  Analyst Programmer|
|    Norbie|              Editor|
|    Claude|Research Assistan...|
|     Thain|     Design Engineer|
|  Tiffanie|Senior Financial ...|
|    Ettore| Staff Accountant IV|
+----------+--------------------+
only showing top 20 rows



In [17]:
# To see datatypes of columns
df.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'double'),
 ('Longitude', 'double')]

In [18]:
df.describe()

DataFrame[summary: string, id: string, first_name: string, last_name: string, gender: string, City: string, JobTitle: string, Salary: string, Latitude: string, Longitude: string]

In [19]:
df.describe().show()

+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+-----------------+------------------+
|summary|               id|first_name|last_name|gender|               City|           JobTitle|   Salary|         Latitude|         Longitude|
+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+-----------------+------------------+
|  count|             1000|      1000|     1000|  1000|                999|                998|     1000|              999|              1000|
|   mean|            500.5|      null|     null|  null|               null|               null|     null|25.43151724234234|43.337564614499996|
| stddev|288.8194360957494|      null|     null|  null|               null|               null|     null| 24.5790825486909| 69.42064539970089|
|    min|                1|   Abagail|    Abbay|Female|             Abéché|Account Coordinator|$10101.92|       -54.281149|      -123.0419607|

### Playing with Columns

In [22]:
df.select(['first_name','JobTitle']).show()

+----------+--------------------+
|first_name|            JobTitle|
+----------+--------------------+
|   Melinde| Assistant Professor|
|  Kimberly|       Programmer II|
|    Alvera|                null|
|   Shannon|Budget/Accounting...|
|  Sherwood|            VP Sales|
|     Maris|      Civil Engineer|
|     Masha|                null|
|   Goddart|Desktop Support T...|
|      Roth|VP Product Manage...|
|      Bran|Mechanical System...|
|    Kylynn|Nuclear Power Eng...|
|       Rey|Systems Administr...|
|      Kerr|Compensation Analyst|
|    Mickie|Assistant Media P...|
|    Kaspar|  Analyst Programmer|
|    Norbie|              Editor|
|    Claude|Research Assistan...|
|     Thain|     Design Engineer|
|  Tiffanie|Senior Financial ...|
|    Ettore| Staff Accountant IV|
+----------+--------------------+
only showing top 20 rows



Add column

In [26]:
df=df.withcColumn('Incremented Salary',df['Salary']+2000)

Delete column

In [28]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------------+---------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|Incremented Salary|Full Name|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------------+---------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|              null|     null|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|              null|     null|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|              null|     null|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|              null|     null|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            

In [30]:
df=df.drop('Incremented Salary')

In [31]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|Full Name|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|     null|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|     null|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|     null|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|     null|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|     null|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16

In [33]:
df=df.drop('Full Name','gender')

In [34]:
df.show()

+---+----------+----------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|         Dachun|                null|$25090.87| 24.879416| 118.930111|
|  8|   Goddart|    

## Filter operations

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [29]:
df = spark.read.csv("test1.csv",header=True,inferSchema=True)
df.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [5]:
df.filter("Salary>=20000").show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
+---------+---+----------+------+



In [6]:
df.filter("Salary>=20000").select(["Name","age"]).show()

+---------+---+
|     Name|age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
|     Paul| 24|
+---------+---+



In [14]:
df.filter((df.Salary>=20000) & (df.age >25)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
+---------+---+----------+------+



## GroupBy & Aggregate Functions

In [36]:
df=spark.read.csv('test3.csv',header=True,inferSchema=True)
df.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [37]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [39]:
df.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [40]:
df.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [41]:
df.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



## Handling Missing Value

In [30]:
df = spark.read.csv('original.csv',header=True,inferSchema=True)
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [16]:
df.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [31]:
from pyspark.ml.feature import Imputer

In [32]:
cols=['Latitude','Longitude']
imputer = Imputer(
    inputCols=cols, 
    outputCols=["{}_imputed".format(c) for c in cols]
    ).setStrategy("median")

In [33]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- City: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [34]:
imputer.fit(df).transform(df).show()
#pyspark imputer does not take integer cols, it takes double or float

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|Latitude_imputed|Longitude_imputed|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      50.5774075|       16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|      48.8231572|      103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|      39.9947462|      116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|      44.5047212|       38.1300171|
|  5|  Sherwood|   M

## Pyspark ML

In [2]:
from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder.appName("ml").getOrCreate()

In [15]:
spark

In [16]:
training = spark.read.csv('test1.csv',header=True,inferSchema=True)
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [17]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [18]:
from pyspark.ml.feature import VectorAssembler
# we have to create independent features
independent_cols=["age","Experience"]
feature_assembler = VectorAssembler(inputCols=independent_cols,outputCol="Independent Features")
output = feature_assembler.transform(training)

In [19]:
output.show()

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [22]:
finalised_data = output.select("Independent Features","Salary")

In [23]:
finalised_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [36]:
from pyspark.ml.regression import LinearRegression

In [37]:
train_data , test_data = finalised_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol= 'Independent Features',labelCol="Salary")

In [39]:
regressor=regressor.fit(train_data)

In [40]:
regressor.coefficients

DenseVector([-90.5483, 1608.7819])

In [41]:
test_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|          [23.0,2.0]| 18000|
+--------------------+------+



In [42]:
pred_results=regressor.evaluate(test_data)

In [43]:
pred_results.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [23.0,2.0]| 18000|17214.09079632846|
+--------------------+------+-----------------+



In [44]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(785.909203671541, 617653.2764156357)