### Pyspark advantages
- Run workloads 100x faster.
- Ease of use; write in java, scala, python, R
- Generality; Combine SQL, streaming and complex analytics.
- Runs everywhere; Hadoop, Kubernetes, standalone or cloud

### Creating venv and activate.
- Run env.sh passing in 1 param which is name of virtual env.
- In vscode bottom right select python version and set to virtual env.
- Run pip installs.

In [3]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

### Basic Pyspark operations

In [4]:
# Initialise pyspark sessions
spark = SparkSession.builder.appName("MySparkSession").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/07 21:04:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Open csv as pyspark dataframe
df_pyspark = spark.read.csv("test1.csv")
df_pyspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [4]:
# pyspark does not recognise the header row to column names, use read options
df_pyspark = spark.read.option("header", "true").csv("test1.csv")
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [5]:
# Pyspark has it's own df object type
print(type(df_pyspark))

# Show schema of df
df_pyspark.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



### Data Frame Operations

In [6]:
# In above print Schema all columns were noted as string, below will infer schema types and header in csv option
df_pyspark = spark.read.csv("test1.csv", header = True, inferSchema = True)
df_pyspark.show()
df_pyspark.printSchema()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [7]:
# Get column names
print(df_pyspark.columns)

# Get top 3 records in list format.
print(df_pyspark.head(3))

['Name', 'age', 'Experience', 'Salary']
[Row(Name='Krish', age=31, Experience=10, Salary=30000), Row(Name='Sudhanshu', age=30, Experience=8, Salary=25000), Row(Name='Sunny', age=29, Experience=4, Salary=20000)]


In [8]:
# Check data types
print(df_pyspark.dtypes)
df_pyspark.describe().show()

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]


23/11/06 18:07:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  NULL|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  NULL| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



In [9]:
# Selecting a particular column
df_pyspark.select("Name").show()

# Selecting multiple column names
df_pyspark.select(["Name", "Experience"]).show()

+---------+
|     Name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
|     Paul|
|   Harsha|
|  Shubham|
+---------+

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [10]:
# Adding new columns
df_pyspark = df_pyspark.withColumn("Experience After 2 Years", df_pyspark["Experience"] + 2)
df_pyspark.show()

# Drop columns
df_pyspark = df_pyspark.drop("Experience After 2 Years")
df_pyspark.show()

# Rename columns
df_pyspark.withColumnRenamed("Name", "Full Name").show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience After 2 Years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

+---------+---+----------+------+
|Full Name|age|Experience|Salary|


### Handling missing values

In [11]:
df_pyspark2 = spark.read.csv("test2.csv", header = True, inferSchema= True)
df_pyspark2.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [12]:
# Drop all rows where na values are present. 
df_pyspark2.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [13]:
# Drop has a how arg, default to any, meaning if a row any column values as null/na then drop, or all the entire row has to be null/na
df_pyspark2.na.drop(how="all").show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [14]:
# Drop has a threshold arg, specifies the limit of number of NON-NULL values for a row. i.e. if a row has at least 2 non nulls keep it, otherwise omit.
df_pyspark2.na.drop(thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
+---------+----+----------+------+



In [15]:
# Drop has a subset arg, will drop rows only if the null/na values are in the subset columns. 
df_pyspark2.na.drop(subset = ["Age"]).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
|     NULL| 36|      NULL|  NULL|
+---------+---+----------+------+



In [16]:
# Fill missing values from particular columns with a specific value. Below places missing when na/null appears in experience or age column
df_pyspark2.na.fill("missing", ["Experience", "age"]).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [17]:
# Using imputer to fill missing values with a strategy i.e. mean, median or mode.
from pyspark.ml.feature import Imputer

imp = Imputer(
    inputCols = ["age", "Experience", "Salary"],
    outputCols = ["{}_impd".format(col) for col in ["age", "Experience", "Salary"]]
).setStrategy("mean")

In [18]:
imp.fit(df_pyspark2).transform(df_pyspark2).show()

+---------+----+----------+------+--------+---------------+-----------+
|     Name| age|Experience|Salary|age_impd|Experience_impd|Salary_impd|
+---------+----+----------+------+--------+---------------+-----------+
|    Krish|  31|        10| 30000|      31|             10|      30000|
|Sudhanshu|  30|         8| 25000|      30|              8|      25000|
|    Sunny|  29|         4| 20000|      29|              4|      20000|
|     Paul|  24|         3| 20000|      24|              3|      20000|
|   Harsha|  21|         1| 15000|      21|              1|      15000|
|  Shubham|  23|         2| 18000|      23|              2|      18000|
|   Mahesh|NULL|      NULL| 40000|      28|              5|      40000|
|     NULL|  34|        10| 38000|      34|             10|      38000|
|     NULL|  36|      NULL|  NULL|      36|              5|      25750|
+---------+----+----------+------+--------+---------------+-----------+



### Filter operations

In [4]:
df_pyspark = spark.read.csv("test1.csv", header = True, inferSchema= True)
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [5]:
# Find salaray of people less than or equal to 20000
df_pyspark.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [6]:
# Apply filter and then return only specific columns
df_pyspark.filter("Salary<=20000").select(["Name", "Age"]).show()

+-------+---+
|   Name|Age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [12]:
# Two different conditions
df_pyspark.filter((df_pyspark["Salary"]<=20000) & 
                  (df_pyspark["Salary"] >=16000)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [14]:
# Inverse operator
df_pyspark.filter(~(df_pyspark["Salary"]<=20000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



In [16]:
df_pyspark3 = spark.read.csv("test3.csv", header = True, inferSchema = True)
df_pyspark3.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [19]:
# Group by name and return highest salary
df_pyspark3.groupBy("Name").sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [22]:
# Group by department and find which gives max salary
df_pyspark3.groupBy("Departments").sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [23]:
# Group by department and find mean of each.
df_pyspark3.groupBy("Departments").mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [24]:
# Group by department and find number of entries per department
df_pyspark3.groupBy("Departments").count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [25]:
# Directly aggregate salary of all without any group by
df_pyspark3.agg({"Salary" : "sum"}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [26]:
# Maximum salary from all people
df_pyspark3.groupBy("Name").max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



### Examples of pyspark ML

In [5]:
# Supose we want to predict salary based on age and experience
training = spark.read.csv("test1.csv", header = True, inferSchema = True)
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [6]:
training.printSchema()
training.columns

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



['Name', 'age', 'Experience', 'Salary']

In [7]:
# Vector assembler is a transformer that combines a given list of columns into a single vector column. Now the columns are treated as one independent feature.
# In this case since we want to use both age and experience to predict salary we combine them
from pyspark.ml.feature import VectorAssembler
featAssembler = VectorAssembler(inputCols = ["age", "Experience"], outputCol = "Combined")

In [8]:
output = featAssembler.transform(training)
output.show()
output.columns

+---------+---+----------+------+-----------+
|     Name|age|Experience|Salary|   Combined|
+---------+---+----------+------+-----------+
|    Krish| 31|        10| 30000|[31.0,10.0]|
|Sudhanshu| 30|         8| 25000| [30.0,8.0]|
|    Sunny| 29|         4| 20000| [29.0,4.0]|
|     Paul| 24|         3| 20000| [24.0,3.0]|
|   Harsha| 21|         1| 15000| [21.0,1.0]|
|  Shubham| 23|         2| 18000| [23.0,2.0]|
+---------+---+----------+------+-----------+



['Name', 'age', 'Experience', 'Salary', 'Combined']

In [9]:
# Now extract the columns of importance in our case new combined column and salary
finalised = output.select("Combined", "Salary")
finalised.show()

+-----------+------+
|   Combined|Salary|
+-----------+------+
|[31.0,10.0]| 30000|
| [30.0,8.0]| 25000|
| [29.0,4.0]| 20000|
| [24.0,3.0]| 20000|
| [21.0,1.0]| 15000|
| [23.0,2.0]| 18000|
+-----------+------+



In [32]:
from pyspark.ml.regression import LinearRegression
# Create train and test split 75, 25
train, test = finalised.randomSplit([0.75, 0.25])
# Feature col's are the columns that are used to determine the labelCol
regressor = LinearRegression(featuresCol = "Combined", labelCol = "Salary")
regressor = regressor.fit(train)

23/11/07 21:07:42 WARN Instrumentation: [9e4c868d] regParam is zero, which might cause numerical instability and overfitting.


In [33]:
# Coefficients and intercepts
regressor.coefficients

DenseVector([28.4757, 1271.3568])

In [34]:
regressor.intercept

14299.832495812996

In [35]:
# Run prediction on test set
pred = regressor.evaluate(test)

In [36]:
pred.predictions.show()

+-----------+------+------------------+
|   Combined|Salary|        prediction|
+-----------+------+------------------+
|[31.0,10.0]| 30000|27896.147403685147|
+-----------+------+------------------+

