Url: https://www.youtube.com/watch?v=_C8kWso4ne4

### 1. Installing Pyspark

In [1]:
!pip install pyspark



In [28]:
import pyspark

In [27]:
import pandas as pd
type(pd.read_csv("test1.csv"))

pandas.core.frame.DataFrame

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName("Practise").getOrCreate()

23/03/19 06:35:25 WARN Utils: Your hostname, MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.29.110 instead (on interface en0)
23/03/19 06:35:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/19 06:35:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

### 2. Reading from csv files 
1. PySpark DataFrame
1. Reading the dataset
1. Checking the datatypes of the column (schema)
1. selecting columns and indexing
1. Check Describe option similar to Pandas
1. Adding columns
1. Dropping columns
1. Renaming columns

In [21]:
## read the dataset
df_pyspark = spark.read.option("header", "true").csv("test1.csv", inferSchema=True)
df_pyspark.show()

+-------+----+-----------+
|   Name| Age| Experience|
+-------+----+-----------+
|Sukumar|31.0|        6.0|
|Sitansu|28.0|        5.0|
|Sushree|28.0|        6.0|
+-------+----+-----------+



In [22]:
## Checking the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |--  Age: double (nullable = true)
 |--  Experience: double (nullable = true)



In [52]:
## Another way of reading csv file
df_pyspark = spark.read.csv("test1.csv",header=True, inferSchema=True)
df_pyspark.show()
df_pyspark.printSchema()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Sukumar|31.0|       6.0|
|Sitansu|28.0|       5.0|
|Sushree|28.0|       6.0|
+-------+----+----------+

root
 |-- Name: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Experience: double (nullable = true)



In [53]:
# Listing columns
df_pyspark.columns

['Name', 'Age', 'Experience']

In [28]:
df_pyspark.head(3)

[Row(Name='Sukumar',  Age=31.0,  Experience=6.0),
 Row(Name='Sitansu',  Age=28.0,  Experience=5.0),
 Row(Name='Sushree',  Age=28.0,  Experience=6.0)]

Usually in pandas, we get the outputs in a dataframe format
In PySpark, we get the outputs in a list format

In [29]:
df_pyspark.show()

+-------+----+-----------+
|   Name| Age| Experience|
+-------+----+-----------+
|Sukumar|31.0|        6.0|
|Sitansu|28.0|        5.0|
|Sushree|28.0|        6.0|
+-------+----+-----------+



In [36]:
# Selecting a column
df_pyspark.select("Name").show()

+-------+
|   Name|
+-------+
|Sukumar|
|Sitansu|
|Sushree|
+-------+



In [54]:
# Selecting 2 columns
df_pyspark.select(["Name", "Experience"]).show()

+-------+----------+
|   Name|Experience|
+-------+----------+
|Sukumar|       6.0|
|Sitansu|       5.0|
|Sushree|       6.0|
+-------+----------+



In [55]:
# Checking datatypes
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'double'), ('Experience', 'double')]

In [56]:
# Using describe function
df_pyspark.describe().show()

[Stage 32:>                                                         (0 + 1) / 1]

+-------+-------+------------------+------------------+
|summary|   Name|               Age|        Experience|
+-------+-------+------------------+------------------+
|  count|      3|                 3|                 3|
|   mean|   null|              29.0| 5.666666666666667|
| stddev|   null|1.7320508075688772|0.5773502691896258|
|    min|Sitansu|              28.0|               5.0|
|    max|Sushree|              31.0|               6.0|
+-------+-------+------------------+------------------+



                                                                                

In [59]:
# Adding columns
df_pyspark = df_pyspark.withColumn("Experience after 2 years", df_pyspark["Experience"]+2)

In [60]:
df_pyspark.show()

+-------+----+----------+------------------------+
|   Name| Age|Experience|Experience after 2 years|
+-------+----+----------+------------------------+
|Sukumar|31.0|       6.0|                     8.0|
|Sitansu|28.0|       5.0|                     7.0|
|Sushree|28.0|       6.0|                     8.0|
+-------+----+----------+------------------------+



In [62]:
# Dropping columns
df_pyspark = df_pyspark.drop("Experience after 2 years")
df_pyspark.show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Sukumar|31.0|       6.0|
|Sitansu|28.0|       5.0|
|Sushree|28.0|       6.0|
+-------+----+----------+



In [63]:
# Renaming columns
df_pyspark.withColumnRenamed("Name", "New Name").show()

+--------+----+----------+
|New Name| Age|Experience|
+--------+----+----------+
| Sukumar|31.0|       6.0|
| Sitansu|28.0|       5.0|
| Sushree|28.0|       6.0|
+--------+----+----------+



### 3. Handling Missing values
1. Droppping columns
1. Dropping rows
1. Various Parameter in Dropping functionalities
1. Handling missing values by mean

In [5]:
df_pyspark = spark.read.csv("test2.csv", header = True, inferSchema=True)
df_pyspark.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3|  2000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [7]:
# Dropping a single column
df_pyspark.drop("Age").show()

+---------+----------+------+
|     Name|Experience|Salary|
+---------+----------+------+
|    Krish|        10| 30000|
|Sudhanshu|         8| 25000|
|    Sunny|         4| 20000|
|     Paul|         3|  2000|
|   Mahesh|      null| 40000|
|     null|        10| 38000|
|     null|      null|  null|
+---------+----------+------+



In [9]:
# Dropping more than 1 column
df_pyspark.drop("Age", "Experience").show()

+---------+------+
|     Name|Salary|
+---------+------+
|    Krish| 30000|
|Sudhanshu| 25000|
|    Sunny| 20000|
|     Paul|  2000|
|   Mahesh| 40000|
|     null| 38000|
|     null|  null|
+---------+------+



In [10]:
# Dropping rows 
## Does not work on pyspark sql dataframe
## https://spark.apache.org/docs/3.3.2/api/python/reference/pyspark.sql/dataframe.html
df_pyspark.drop(index=[0,1]).show()

TypeError: drop() got an unexpected keyword argument 'index'

In [12]:
df_pyspark.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3|  2000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [11]:
# Dropping records with na values
df_pyspark.na.drop().show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3|  2000|
+---------+---+----------+------+



In [13]:
df_pyspark.na.drop(how="any").show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3|  2000|
+---------+---+----------+------+



In [14]:
# Dropping rows - Using threshold parameter
df_pyspark.na.drop(thresh=2).show()
# This means there should atleast be 2 non-null values in the row to NOT be dropped

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3|  2000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [15]:
## Using subset parameter
df_pyspark.na.drop(how="any", subset=["Age"]).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3|  2000|
|     null| 34|        10| 38000|
|     null| 36|      null|  null|
+---------+---+----------+------+



In [21]:
## Filling missing values
df_pyspark.na.fill("Missing", subset=None).show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3|  2000|
|   Mahesh|null|      null| 40000|
|  Missing|  34|        10| 38000|
|  Missing|  36|      null|  null|
+---------+----+----------+------+



In [22]:
df_pyspark.na.fill({"Name": "unknown", "Age" : 0 }).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3|  2000|
|   Mahesh|  0|      null| 40000|
|  unknown| 34|        10| 38000|
|  unknown| 36|      null|  null|
+---------+---+----------+------+



In [23]:
# Fill missing values with mean or median
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=["Age", "Experience", "Salary"],
    outputCols=["{}_imputed".format(c) for c in ["Age", "Experience", "Salary"]]
).setStrategy("mean")

In [25]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3|  2000|         24|                 3|          2000|
|   Mahesh|null|      null| 40000|         30|                 7|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 7|         25833|
+---------+----+----------+------+-----------+------------------+--------------+



23/03/18 12:50:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 374206 ms exceeds timeout 120000 ms
23/03/18 12:50:00 WARN SparkContext: Killing executors is not supported by current scheduler.
23/03/18 12:50:03 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:643)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1057)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
	at sc

### 4. Filter Operation
- Filter Operation
- &,|,==
- ~

In [4]:
df_pyspark = spark.read.csv("test3.csv", header=True, inferSchema=True)
df_pyspark.show()

                                                                                

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3|  2000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [5]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [6]:
### Salary of the people less than or equal to 20000
df_pyspark.filter(df_pyspark["Salary"]<=20000).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3|  2000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [7]:
### Salary of the people less than or equal to 20000
df_pyspark.filter("Salary <= 20000").show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3|  2000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [10]:
### Salary of the people between 20000 and 30000
df_pyspark.filter((df_pyspark["Salary"]>=20000) & 
                  (df_pyspark["Salary"]<=30000)).show()
# df_pyspark.filter(20000 <= df_pyspark["Salary"] <= 30000)

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
+---------+---+----------+------+



In [11]:
df_pyspark.filter("Salary <= 20000").select(["Name", "Age"]).show()

+-------+---+
|   Name|Age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [12]:
# Inverse filter condition ~
df_pyspark.filter(~(df_pyspark["Salary"]<=20000)).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



### 5. GroupBy and Aggregate Functions

In [13]:
df_pyspark = spark.read.csv("test4.csv", header=True,inferSchema=True)
df_pyspark.show()
df_pyspark.printSchema()

+---------+------------+------+
|     Name|  Department|Salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [18]:
df_pyspark.groupBy("Name").sum().show()

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [20]:
df_pyspark.groupBy("Name").agg({"Salary":"avg"}).show()

+---------+------------------+
|     Name|       avg(Salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



### 7. Examples of PySpark ML

In [23]:
training = spark.read.csv("test3.csv", header=True, inferSchema=True)

In [24]:
training.printSchema()
training.show()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3|  2000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [25]:
training.columns

['Name', 'Age', 'Experience', 'Salary']

In [29]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["Age", "Experience"], outputCol = "Independent Features")

In [30]:
output = featureassembler.transform(training)

In [31]:
output.show()

+---------+---+----------+------+--------------------+
|     Name|Age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3|  2000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [32]:
output.columns

['Name', 'Age', 'Experience', 'Salary', 'Independent Features']

In [33]:
finalized_data = output.select("Independent Features", "Salary")

In [34]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]|  2000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [35]:
from pyspark.ml.regression import LinearRegression
## train test split
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol = "Independent Features", labelCol = "Salary")
regressor = regressor.fit(train_data)

23/03/19 07:18:25 WARN Instrumentation: [ade4e84f] regParam is zero, which might cause numerical instability and overfitting.
23/03/19 07:18:26 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/19 07:18:26 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/03/19 07:18:26 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [36]:
## Coefficients
regressor.coefficients

DenseVector([1433.441, 966.9896])

In [38]:
## Intercept(s)
regressor.intercept

-25498.026551849594

In [39]:
### Prediction
pred_results = regressor.evaluate(test_data)

In [40]:
pred_results.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [21.0,1.0]| 15000|5571.223537854112|
+--------------------+------+-----------------+



23/03/19 09:17:15 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 939418 ms exceeds timeout 120000 ms
23/03/19 09:17:15 WARN SparkContext: Killing executors is not supported by current scheduler.
23/03/19 09:17:16 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:643)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1057)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
	at sc