# PYSPARK TUTORIAL

Source: https://www.youtube.com/watch?v=_C8kWso4ne4&t=753s

## Install and Warm Up

In [1]:
!pip install pyspark



In [2]:
import pyspark  



In [7]:
import pandas as pd
pd.read_csv('data.csv')

Unnamed: 0,Name,Age,Experience,Salary
0,Krish,31.0,10.0,30000.0
1,Sudhansh,30.0,8.0,25000.0
2,Sunny,29.0,4.0,20000.0
3,Paul,24.0,3.0,20000.0
4,Harsha,21.0,1.0,15000.0
5,Shubham,23.0,2.0,18000.0
6,Mahesh,,,40000.0
7,,34.0,10.0,38000.0
8,,36.0,,


In [8]:
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/17 11:35:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
spark

In [11]:
df_pyspark=spark.read.csv('data.csv')

In [12]:
df_pyspark.show()

+--------+----+----------+------+
|     _c0| _c1|       _c2|   _c3|
+--------+----+----------+------+
|    Name| Age|Experience|Salary|
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [13]:
df_pyspark = spark.read.option('header','true').csv('data.csv')

In [14]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [15]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [16]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



## Basic Operations

1. Read data in two different ways
2. Get column names
3. Slice the dataframe
4. Get summary statistics
5. Add column
6. Drop column
7. Rename column

In [17]:
df_pyspark = spark.read.option('header','True').csv('data.csv')

In [18]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [19]:
# Check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [20]:
# Infer column types
df_pyspark = spark.read.option('header','true').csv('data.csv',inferSchema=True)

In [21]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [22]:
# another way of reading a csv file with headers
df_pyspark = spark.read.csv('data.csv',header=True,inferSchema=True)
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [23]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [24]:
# get column names
df_pyspark.columns

['Name', 'Age', 'Experience', 'Salary']

In [25]:
# slice the dataframe
df_pyspark.select('Name').show()

+--------+
|    Name|
+--------+
|   Krish|
|Sudhansh|
|   Sunny|
|    Paul|
|  Harsha|
| Shubham|
|  Mahesh|
|    NULL|
|    NULL|
+--------+



In [26]:
type(df_pyspark.select('Name'))

pyspark.sql.dataframe.DataFrame

In [27]:
# slice Name and Experience columns
df_pyspark.select(['Name','Experience']).show()

+--------+----------+
|    Name|Experience|
+--------+----------+
|   Krish|        10|
|Sudhansh|         8|
|   Sunny|         4|
|    Paul|         3|
|  Harsha|         1|
| Shubham|         2|
|  Mahesh|      NULL|
|    NULL|        10|
|    NULL|      NULL|
+--------+----------+



In [28]:
# If try pandas type slicing, you'll only get the header object
df_pyspark['Name']
#print(df_pyspark['Name'].show())

Column<'Name'>

In [29]:
# print data types of columns. Similar to df.printSchema but less
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [30]:
# get summary statistics
df_pyspark.describe().show()

24/03/17 11:35:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------+------------------+------------------+-----------------+
|summary|  Name|               Age|        Experience|           Salary|
+-------+------+------------------+------------------+-----------------+
|  count|     7|                 8|                 7|                8|
|   mean|  NULL|              28.5| 5.428571428571429|          25750.0|
| stddev|  NULL|5.3718844791323335|3.8234863173611093|9361.776388210581|
|    min|Harsha|                21|                 1|            15000|
|    max| Sunny|                36|                10|            40000|
+-------+------+------------------+------------------+-----------------+



In [31]:
# add column to the dataframe
df_pyspark = df_pyspark.withColumn('Experience after 2 years', df_pyspark['Experience']+2)
df_pyspark.show()

+--------+----+----------+------+------------------------+
|    Name| Age|Experience|Salary|Experience after 2 years|
+--------+----+----------+------+------------------------+
|   Krish|  31|        10| 30000|                      12|
|Sudhansh|  30|         8| 25000|                      10|
|   Sunny|  29|         4| 20000|                       6|
|    Paul|  24|         3| 20000|                       5|
|  Harsha|  21|         1| 15000|                       3|
| Shubham|  23|         2| 18000|                       4|
|  Mahesh|NULL|      NULL| 40000|                    NULL|
|    NULL|  34|        10| 38000|                      12|
|    NULL|  36|      NULL|  NULL|                    NULL|
+--------+----+----------+------+------------------------+



In [32]:
# drop columns
df_pyspark = df_pyspark.drop('Experience after 2 years')
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [33]:
# rename column
df_pyspark.withColumnRenamed('Name','Forename').show()

+--------+----+----------+------+
|Forename| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



## Handling Missing Data

In [34]:
df_pyspark = spark.read.csv('data.csv',header=True,inferSchema=True)
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [35]:
# drop any row with a null value
df_pyspark.na.drop().show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
+--------+---+----------+------+



In [36]:
# drop rows with all null values
df_pyspark.na.drop(how='all').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [37]:
# keep rows with at least two non-null values are present and drop other rows
df_pyspark.na.drop(how='any',thresh=2).show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
+--------+----+----------+------+



In [38]:
# drop rows with null values in a specific column or columns
df_pyspark.na.drop(how='any',subset=['Experience']).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
|    NULL| 34|        10| 38000|
+--------+---+----------+------+



In [39]:
# Fill missing values
df_pyspark.na.fill('Missing').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|      NULL| 40000|
| Missing|  34|        10| 38000|
| Missing|  36|      NULL|  NULL|
+--------+----+----------+------+



Pay attention to the values filled in. Only filled in the values in columns with string type.

In [40]:
# Fill missing values in the Experience column
df_pyspark.na.fill(50,'Experience').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|        50| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|        50|  NULL|
+--------+----+----------+------+



In [41]:
# another function that does the same thing
df_pyspark.fillna(50,'Experience').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|NULL|        50| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|        50|  NULL|
+--------+----+----------+------+



In [42]:
# fill null values with the mean of a particular column
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age','Experience','Salary'],
    outputCols=[f"{c}_imputed" for c in ['Age','Experience','Salary']]
).setStrategy('mean')

In [43]:
# add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+--------+----+----------+------+-----------+------------------+--------------+
|    Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
|   Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhansh|  30|         8| 25000|         30|                 8|         25000|
|   Sunny|  29|         4| 20000|         29|                 4|         20000|
|    Paul|  24|         3| 20000|         24|                 3|         20000|
|  Harsha|  21|         1| 15000|         21|                 1|         15000|
| Shubham|  23|         2| 18000|         23|                 2|         18000|
|  Mahesh|NULL|      NULL| 40000|         28|                 5|         40000|
|    NULL|  34|        10| 38000|         34|                10|         38000|
|    NULL|  36|      NULL|  NULL|         36|                 5|         25750|
+--------+----+----------+------+-------

## Filter Operations

In [45]:
df_pyspark_nonull = df_pyspark.na.drop(how='any')
df_pyspark_nonull.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
+--------+---+----------+------+



In [46]:
# filter people with salaries less than 20,000
df_pyspark_nonull.filter("Salary <= 20000").show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [47]:
# filter people with salaries less than 20,000 and select only Name and Age columns
df_pyspark_nonull.filter("Salary <= 20000").select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [49]:
# another way to filter
df_pyspark_nonull.filter(df_pyspark_nonull['Salary'] <= 20_000).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [52]:
# filter for multiple conditions
df_pyspark_nonull.filter((df_pyspark_nonull['Salary'] <= 20_000) & 
                         (df_pyspark_nonull['Salary'] >= 18_000)).show()


+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [55]:
# filter with 'not' operation. Make sure you put the condition into parantheses
df_pyspark_nonull.filter(~(df_pyspark_nonull['Salary'] <= 20_000)).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
+--------+---+----------+------+



## Group By and Aggregate Functions

In [56]:
df_pyspark = spark.read.csv('departments.csv',header=True,inferSchema=True)
df_pyspark.show()

+---------+------------+------+
|     Name|  Department|Salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [57]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [61]:
# Group by Name and find the employee with the max earnings
df_pyspark.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [65]:
# Group by Department and find which deparment pays the most in total
df_pyspark.groupBy('Department').sum().show()

+------------+-----------+
|  Department|sum(Salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [66]:
# Group by Department and find the average salary paid by each deparment
df_pyspark.groupBy('Department').mean().show()

+------------+-----------+
|  Department|avg(Salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [67]:
# Group by Department and find how many people work in each department
df_pyspark.groupBy('Department').count().show()

+------------+-----+
|  Department|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [68]:
# find the total expenditure of all departments
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



## PySpark ML

In [69]:
# dataset to use
df_pyspark_nonull.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
+--------+---+----------+------+



In [70]:
# import library
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols=['Age','Experience'],outputCol="Independent Features")

In [71]:
# create a new column with independent features to be used for training the model
output = feature_assembler.transform(df_pyspark_nonull)
output.show()

+--------+---+----------+------+--------------------+
|    Name|Age|Experience|Salary|Independent Features|
+--------+---+----------+------+--------------------+
|   Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhansh| 30|         8| 25000|          [30.0,8.0]|
|   Sunny| 29|         4| 20000|          [29.0,4.0]|
|    Paul| 24|         3| 20000|          [24.0,3.0]|
|  Harsha| 21|         1| 15000|          [21.0,1.0]|
| Shubham| 23|         2| 18000|          [23.0,2.0]|
+--------+---+----------+------+--------------------+



In [73]:
final_data = output.select(['Independent Features','Salary'])
final_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [74]:
# train test split and regressor fit
from pyspark.ml.regression import LinearRegression
train_data, test_data = final_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol='Independent Features',labelCol='Salary')
regressor = regressor.fit(train_data)

24/03/17 12:43:20 WARN Instrumentation: [7be332f4] regParam is zero, which might cause numerical instability and overfitting.
24/03/17 12:43:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/03/17 12:43:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [76]:
# coefficients
regressor.coefficients

DenseVector([-176.3341, 1320.1856])

In [77]:
# intercepts
regressor.intercept

19812.064965197405

In [79]:
# predict
preds = regressor.evaluate(test_data)

In [80]:
preds.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [21.0,1.0]| 15000|17429.234338747105|
|         [31.0,10.0]| 30000| 27547.56380510443|
+--------------------+------+------------------+



In [81]:
# see scores
preds.meanAbsoluteError, preds.meanSquaredError

(2440.8352668213374, 5957811.381290973)

24/03/17 18:40:01 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 911209 ms exceeds timeout 120000 ms
24/03/17 18:40:01 WARN SparkContext: Killing executors is not supported by current scheduler.
24/03/17 18:40:06 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$