<a href="https://colab.research.google.com/github/madhubanidas/Madhu/blob/main/PYSPARK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark



# **Starting spark Session**

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = (SparkSession.builder
  .appName("Practice")
  .getOrCreate())

In [6]:
spark

# Pandas

In [46]:
import pandas as pd
pd.read_csv('test1.csv')


Unnamed: 0,Name,Age,Experience
0,Krish,31,7
1,Madhu,23,2
2,Jyoti,27,5


In [45]:
type(pd.read_csv('test1.csv'))

# **Reading the Dataset**

In [35]:
df_pyspark=spark.read.csv('test1.csv')

In [36]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string]

In [37]:
df_pyspark.show()

+-----+---+----------+
|  _c0|_c1|       _c2|
+-----+---+----------+
| Name|Age|Experience|
|Krish| 31|         7|
|Madhu| 23|         2|
|Jyoti| 27|         5|
+-----+---+----------+



In [47]:
df_pyspark=spark.read.option('header','true').csv('test1.csv',inferSchema=True)
##inferSchema=true will shows numbers as integers(or other datatypes) or everything will be considered as strings

In [48]:
df_pyspark.show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Krish| 31|         7|
|Madhu| 23|         2|
|Jyoti| 27|         5|
+-----+---+----------+



In [51]:
##alternative easier method
spark.read.csv('test1.csv',header=True,inferSchema=True).show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Krish| 31|         7|
|Madhu| 23|         2|
|Jyoti| 27|         5|
+-----+---+----------+



# PySpark **Dataframe**

In [49]:
type(df_pyspark)

# **Checking the Datatypes of the Column(Schema)**

In [50]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



# **Selecting Columns and Indexing**

In [52]:
df_pyspark.columns

['Name', 'Age', 'Experience']

In [42]:
df_pyspark.head(2)

[Row(Name='Krish', Age='31', Experience='7'),
 Row(Name='Madhu', Age='23', Experience='2')]

In [43]:
df_pyspark.tail(2)

[Row(Name='Madhu', Age='23', Experience='2'),
 Row(Name='Jyoti', Age='27', Experience='5')]

In [53]:
df_pyspark.select('Name')

DataFrame[Name: string]

In [54]:
df_pyspark.select('Name').show()

+-----+
| Name|
+-----+
|Krish|
|Madhu|
|Jyoti|
+-----+



In [55]:
type(df_pyspark.select('Name'))

In [57]:
df_pyspark.select(['Name','Experience'])

DataFrame[Name: string, Experience: int]

In [58]:
df_pyspark.select(['Name','Experience']).show()

+-----+----------+
| Name|Experience|
+-----+----------+
|Krish|         7|
|Madhu|         2|
|Jyoti|         5|
+-----+----------+



In [59]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

# **Check Describe Option similar to Pandas**

In [60]:
df_pyspark.describe()

DataFrame[summary: string, Name: string, Age: string, Experience: string]

In [62]:
df_pyspark.describe().show()

+-------+-----+----+-----------------+
|summary| Name| Age|       Experience|
+-------+-----+----+-----------------+
|  count|    3|   3|                3|
|   mean| NULL|27.0|4.666666666666667|
| stddev| NULL| 4.0|2.516611478423583|
|    min|Jyoti|  23|                2|
|    max|Madhu|  31|                7|
+-------+-----+----+-----------------+



# **Adding Columns**

In [65]:
df_pyspark=df_pyspark.withColumn('Experience after 2 year',df_pyspark['Experience']+2)

In [67]:
df_pyspark.show()

+-----+---+----------+-----------------------+
| Name|Age|Experience|Experience after 2 year|
+-----+---+----------+-----------------------+
|Krish| 31|         7|                      9|
|Madhu| 23|         2|                      4|
|Jyoti| 27|         5|                      7|
+-----+---+----------+-----------------------+



# **Dropping Columns**

In [71]:
df_pyspark=df_pyspark.drop('Experience after 2 year')

In [72]:
df_pyspark.show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Krish| 31|         7|
|Madhu| 23|         2|
|Jyoti| 27|         5|
+-----+---+----------+



# **Rename Columns**

In [73]:
df_pyspark.withColumnRenamed('Name','New Name').show()

+--------+---+----------+
|New Name|Age|Experience|
+--------+---+----------+
|   Krish| 31|         7|
|   Madhu| 23|         2|
|   Jyoti| 27|         5|
+--------+---+----------+



# **Dropping Rows**

In [74]:
df_pyspark2=spark.read.csv('test2.csv',header=True,inferSchema=True)

In [75]:
df_pyspark2.show()

+-----+----+----------+-------+
| Name| Age|Experience| Salary|
+-----+----+----------+-------+
|Krish|  31|         7|2000000|
|Madhu|  23|         2| 900000|
|Jyoti|  27|         5|1500000|
|Harsh|  26|         4| 900000|
|Rathi|  33|         9|2100000|
| Mala|NULL|      NULL|5000000|
| NULL|  34|        10|2500000|
| NULL|  36|      NULL|   NULL|
+-----+----+----------+-------+



In [78]:
df_pyspark2.na.drop().show() #all colums with null value dropped

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
+-----+---+----------+-------+



# **Various Parameter In Dropping Functionalities**

In [79]:
##any=how
df_pyspark2.na.drop(how='all').show() #all-will drop only those columns where all values in a row are null

+-----+----+----------+-------+
| Name| Age|Experience| Salary|
+-----+----+----------+-------+
|Krish|  31|         7|2000000|
|Madhu|  23|         2| 900000|
|Jyoti|  27|         5|1500000|
|Harsh|  26|         4| 900000|
|Rathi|  33|         9|2100000|
| Mala|NULL|      NULL|5000000|
| NULL|  34|        10|2500000|
| NULL|  36|      NULL|   NULL|
+-----+----+----------+-------+



In [84]:
df_pyspark2.na.drop(how='any').show() #all colums with null value dropped same as original

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
+-----+---+----------+-------+



In [82]:
##threshold
df_pyspark2.na.drop(how='any',thresh=2).show()
##atleast 2 non null values should be present

+-----+----+----------+-------+
| Name| Age|Experience| Salary|
+-----+----+----------+-------+
|Krish|  31|         7|2000000|
|Madhu|  23|         2| 900000|
|Jyoti|  27|         5|1500000|
|Harsh|  26|         4| 900000|
|Rathi|  33|         9|2100000|
| Mala|NULL|      NULL|5000000|
| NULL|  34|        10|2500000|
+-----+----+----------+-------+



In [85]:
df_pyspark2.na.drop(thresh=3).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
| NULL| 34|        10|2500000|
+-----+---+----------+-------+



In [86]:
##Subset
df_pyspark2.na.drop(subset=['Experience']).show()
##only columns with null values in experience dropped

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
| NULL| 34|        10|2500000|
+-----+---+----------+-------+



# **Filling the Missing value**

In [96]:
df_pyspark2.na.fill('Missing Values').show() #string can fill in string

+--------------+----+----------+-------+
|          Name| Age|Experience| Salary|
+--------------+----+----------+-------+
|         Krish|  31|         7|2000000|
|         Madhu|  23|         2| 900000|
|         Jyoti|  27|         5|1500000|
|         Harsh|  26|         4| 900000|
|         Rathi|  33|         9|2100000|
|          Mala|NULL|      NULL|5000000|
|Missing Values|  34|        10|2500000|
|Missing Values|  36|      NULL|   NULL|
+--------------+----+----------+-------+



In [94]:
df_pyspark2.na.fill(1,'Experience').show() #only integers can fill in integer

+-----+----+----------+-------+
| Name| Age|Experience| Salary|
+-----+----+----------+-------+
|Krish|  31|         7|2000000|
|Madhu|  23|         2| 900000|
|Jyoti|  27|         5|1500000|
|Harsh|  26|         4| 900000|
|Rathi|  33|         9|2100000|
| Mala|NULL|         1|5000000|
| NULL|  34|        10|2500000|
| NULL|  36|         1|   NULL|
+-----+----+----------+-------+



In [98]:
df_pyspark2.na.fill(30,['Experience','age']).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
| Mala| 30|        30|5000000|
| NULL| 34|        10|2500000|
| NULL| 36|        30|   NULL|
+-----+---+----------+-------+



# **Handling Missing values by Mean,Median and Mode**

In [108]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy("mean")

In [109]:
###Add imputation cols to df
imputer.fit(df_pyspark2).transform(df_pyspark2).show()

+-----+----+----------+-------+-----------+------------------+--------------+
| Name| Age|Experience| Salary|Age_imputed|Experience_imputed|Salary_imputed|
+-----+----+----------+-------+-----------+------------------+--------------+
|Krish|  31|         7|2000000|         31|                 7|       2000000|
|Madhu|  23|         2| 900000|         23|                 2|        900000|
|Jyoti|  27|         5|1500000|         27|                 5|       1500000|
|Harsh|  26|         4| 900000|         26|                 4|        900000|
|Rathi|  33|         9|2100000|         33|                 9|       2100000|
| Mala|NULL|      NULL|5000000|         30|                 6|       5000000|
| NULL|  34|        10|2500000|         34|                10|       2500000|
| NULL|  36|      NULL|   NULL|         36|                 6|       2128571|
+-----+----+----------+-------+-----------+------------------+--------------+



In [111]:

imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy("median")

In [112]:
###Add imputation cols to df
imputer.fit(df_pyspark2).transform(df_pyspark2).show()

+-----+----+----------+-------+-----------+------------------+--------------+
| Name| Age|Experience| Salary|Age_imputed|Experience_imputed|Salary_imputed|
+-----+----+----------+-------+-----------+------------------+--------------+
|Krish|  31|         7|2000000|         31|                 7|       2000000|
|Madhu|  23|         2| 900000|         23|                 2|        900000|
|Jyoti|  27|         5|1500000|         27|                 5|       1500000|
|Harsh|  26|         4| 900000|         26|                 4|        900000|
|Rathi|  33|         9|2100000|         33|                 9|       2100000|
| Mala|NULL|      NULL|5000000|         31|                 5|       5000000|
| NULL|  34|        10|2500000|         34|                10|       2500000|
| NULL|  36|      NULL|   NULL|         36|                 5|       2000000|
+-----+----+----------+-------+-----------+------------------+--------------+



# **Filter Operation**

In [115]:
df_pyspark3=spark.read.csv('test3.csv',header=True,inferSchema=True)

In [116]:
df_pyspark3.show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
+-----+---+----------+-------+



In [119]:
##Salary of people less than or equal to 1500000
df_pyspark3.filter('Salary<=1500000').show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
+-----+---+----------+-------+



In [121]:
df_pyspark3.filter('Salary<=1500000').select(['Name','Age']).show()##only shows name and age who fit the criteria

+-----+---+
| Name|Age|
+-----+---+
|Madhu| 23|
|Jyoti| 27|
|Harsh| 26|
+-----+---+



In [123]:
df_pyspark3.filter(df_pyspark3['Salary']>1400000).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Jyoti| 27|         5|1500000|
|Rathi| 33|         9|2100000|
+-----+---+----------+-------+



# **& | ==**

In [124]:
df_pyspark3.filter((df_pyspark3['Salary']>1400000)& (df_pyspark3['Salary']<2000000)).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Jyoti| 27|         5|1500000|
+-----+---+----------+-------+



In [131]:
df_pyspark3.filter((df_pyspark3['Salary']>=2100000) | (df_pyspark3['Salary']<1000000)).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Madhu| 23|         2| 900000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
+-----+---+----------+-------+



In [136]:
df_pyspark3.filter((df_pyspark3['Salary']==1500000) | (df_pyspark3['Salary']<1000000)).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
+-----+---+----------+-------+



# **~**

In [132]:
df_pyspark3.filter(~((df_pyspark3['Salary']>=2100000) | (df_pyspark3['Salary']<1000000))).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Jyoti| 27|         5|1500000|
+-----+---+----------+-------+



In [135]:
df_pyspark3.filter(~(df_pyspark3['Salary']<=1500000)).show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Rathi| 33|         9|2100000|
+-----+---+----------+-------+



# **PySpark GROUP BY and Aggregate Function**

In [11]:
df_pyspark4= spark.read.csv('test4.csv',header=True,inferSchema=True)

In [12]:
df_pyspark4.show()

+-----+--------------+-------+
| Name|    Department| Salary|
+-----+--------------+-------+
|Krish|  Data Analyst|2000000|
|Madhu|  Data Analyst|1200000|
|Madhu|           SQL| 900000|
|Jyoti|Cyber Security|1500000|
|Jyoti|    Data Entry| 700000|
|Jyoti|Senior Analyst|1700000|
|Krish|Cyber Security|2300000|
|Harsh|  Data Analyst| 900000|
|Harsh|Senior Analyst|2100000|
+-----+--------------+-------+



In [13]:
df_pyspark4.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



## GROUP BY

In [14]:
df_pyspark4.groupBy('Name')

GroupedData[grouping expressions: [Name], value: [Name: string, Department: string ... 1 more field], type: GroupBy]

In [16]:
df_pyspark4.groupBy('Name').sum().show() #Group to find maximum salary

+-----+-----------+
| Name|sum(Salary)|
+-----+-----------+
|Madhu|    2100000|
|Jyoti|    3900000|
|Krish|    4300000|
|Harsh|    3000000|
+-----+-----------+



In [22]:
df_pyspark4.groupBy('Name').max().show() #max salary of each

+-----+-----------+
| Name|max(Salary)|
+-----+-----------+
|Madhu|    1200000|
|Jyoti|    1700000|
|Krish|    2300000|
|Harsh|    2100000|
+-----+-----------+



In [23]:
df_pyspark4.groupBy('Name').min().show() #min salary of each

+-----+-----------+
| Name|min(Salary)|
+-----+-----------+
|Madhu|     900000|
|Jyoti|     700000|
|Krish|    2000000|
|Harsh|     900000|
+-----+-----------+



In [17]:
#Groupby Department which gives most salary
df_pyspark4.groupBy('Department').sum().show()

+--------------+-----------+
|    Department|sum(Salary)|
+--------------+-----------+
|Senior Analyst|    3800000|
|    Data Entry|     700000|
|  Data Analyst|    4100000|
|           SQL|     900000|
|Cyber Security|    3800000|
+--------------+-----------+



In [18]:
#mean salary of departments
df_pyspark4.groupBy('Department').mean().show()

+--------------+------------------+
|    Department|       avg(Salary)|
+--------------+------------------+
|Senior Analyst|         1900000.0|
|    Data Entry|          700000.0|
|  Data Analyst|1366666.6666666667|
|           SQL|          900000.0|
|Cyber Security|         1900000.0|
+--------------+------------------+



In [19]:
#how many employees working in each department
df_pyspark4.groupBy('Department').count().show()

+--------------+-----+
|    Department|count|
+--------------+-----+
|Senior Analyst|    2|
|    Data Entry|    1|
|  Data Analyst|    3|
|           SQL|    1|
|Cyber Security|    2|
+--------------+-----+



In [26]:
#avg salary of employees working in each department
df_pyspark4.groupBy('Department').avg().show()

+--------------+------------------+
|    Department|       avg(Salary)|
+--------------+------------------+
|Senior Analyst|         1900000.0|
|    Data Entry|          700000.0|
|  Data Analyst|1366666.6666666667|
|           SQL|          900000.0|
|Cyber Security|         1900000.0|
+--------------+------------------+



## Aggregate

In [21]:
#total sum of all salaries
df_pyspark4.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|   13300000|
+-----------+



In [25]:
df_pyspark4.agg({'Name':'sum'}).show() #sum/avg works but gives null for string

+---------+
|sum(Name)|
+---------+
|     NULL|
+---------+



In [29]:
df_pyspark4.agg({'Salary':'avg'}).show() #sum works but gives null for string

+------------------+
|       avg(Salary)|
+------------------+
|1477777.7777777778|
+------------------+



# **PySpark ML**

In [31]:
training=spark.read.csv('test3.csv',header=True,inferSchema=True)

In [32]:
training.show()

+-----+---+----------+-------+
| Name|Age|Experience| Salary|
+-----+---+----------+-------+
|Krish| 31|         7|2000000|
|Madhu| 23|         2| 900000|
|Jyoti| 27|         5|1500000|
|Harsh| 26|         4| 900000|
|Rathi| 33|         9|2100000|
+-----+---+----------+-------+



In [33]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [34]:
training.columns

['Name', 'Age', 'Experience', 'Salary']

## Salary Prediction

In [35]:
from pyspark.ml.feature import VectorAssembler
#forming which is dependent(in the form of vector) and independent variable
featureassembler=VectorAssembler(inputCols=['Age','Experience'],outputCol='Independent Features')

In [36]:
#shaping the data
output=featureassembler.transform(training)

In [37]:
output.show()

+-----+---+----------+-------+--------------------+
| Name|Age|Experience| Salary|Independent Features|
+-----+---+----------+-------+--------------------+
|Krish| 31|         7|2000000|          [31.0,7.0]|
|Madhu| 23|         2| 900000|          [23.0,2.0]|
|Jyoti| 27|         5|1500000|          [27.0,5.0]|
|Harsh| 26|         4| 900000|          [26.0,4.0]|
|Rathi| 33|         9|2100000|          [33.0,9.0]|
+-----+---+----------+-------+--------------------+



In [39]:
output.columns

['Name', 'Age', 'Experience', 'Salary', 'Independent Features']

In [40]:
#which columns we are using in the final model
finalized_data=output.select('Independent Features','Salary')

## **Linear Regression**

In [41]:
from pyspark.ml.regression import LinearRegression
#train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features',labelCol='Salary')
regressor=regressor.fit(train_data)

In [42]:
#Coefficients
regressor.coefficients

DenseVector([-2400000.0, 3600000.0])

In [43]:
##intercepts
regressor.intercept

48899999.99932456

## **Prediction**

In [44]:
pred_results=regressor.evaluate(test_data)

In [46]:
pred_results.predictions.show()

+--------------------+-------+------------------+
|Independent Features| Salary|        prediction|
+--------------------+-------+------------------+
|          [27.0,5.0]|1500000|2099999.9999888614|
|          [31.0,7.0]|2000000|-299999.9999725893|
+--------------------+-------+------------------+



In [47]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(1449999.9999807253, 2824999999930.272)