In [72]:
import pyspark
import pandas as pd

In [73]:
pd.read_csv('Untitled.csv')  ## READING FILE THROUGH PANDAS

Unnamed: 0,Name,Age
0,Subham,34
1,Soham,53
2,Swarnali,32
3,Tanvir,23


In [74]:
from pyspark.sql import SparkSession

In [75]:
spark = SparkSession.builder.appName('Trial_Session').getOrCreate() ## CREATING SPARK SESSION

In [76]:
spark

In [77]:
df_pyspark = spark.read.csv('Untitled.csv')  ## READING FILE THROUGH PYSPARK
df_pyspark

DataFrame[_c0: string, _c1: string]

In [78]:
type(df_pyspark)  ## type of dataframe

pyspark.sql.dataframe.DataFrame

In [79]:
df_pyspark.head(5)

[Row(_c0='Name', _c1='Age'),
 Row(_c0='Subham', _c1='34'),
 Row(_c0='Soham', _c1='53'),
 Row(_c0='Swarnali', _c1='32'),
 Row(_c0='Tanvir', _c1='23')]

In [80]:
df_pyspark.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [81]:
df_pyspark.show() ## TO SHOW THE DATABASE

+--------+---+
|     _c0|_c1|
+--------+---+
|    Name|Age|
|  Subham| 34|
|   Soham| 53|
|Swarnali| 32|
|  Tanvir| 23|
+--------+---+



In [82]:
df_pyspark2 =spark.read.option('header','true').csv("Untitled.csv") ## MAKING THE COLUMN NAMES AS MAIN COLUMN

In [83]:
type(df_pyspark2)

pyspark.sql.dataframe.DataFrame

In [84]:
spark.read.option('header','true').csv("Untitled.csv").show()

+--------+---+
|    Name|Age|
+--------+---+
|  Subham| 34|
|   Soham| 53|
|Swarnali| 32|
|  Tanvir| 23|
+--------+---+



### --------------------------------------------------------------------------------------

In [85]:
df_s = spark.read.option('header','true').csv("Untitled2.csv",inferSchema = True) ### To print the correct datatypes in the table
df_s.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
|  Subham| 34|        23|
|   Soham| 53|         6|
|Swarnali| 32|         4|
|  Tanvir| 23|         3|
+--------+---+----------+



In [86]:
## CHECK THE SCHEMA/ DATA TYPE

df_s.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [87]:
df_s = spark.read.csv('Untitled2.csv',header=True,inferSchema=True) 

In [88]:
df_s

DataFrame[Name: string, Age: int, Experience: int]

In [89]:
type(df_s)

pyspark.sql.dataframe.DataFrame

In [90]:
df_s.columns ## Get column names

['Name', 'Age', 'Experience']

In [91]:
df_s.show() ## Get all the columns information

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
|  Subham| 34|        23|
|   Soham| 53|         6|
|Swarnali| 32|         4|
|  Tanvir| 23|         3|
+--------+---+----------+



In [92]:
df_s.select("Name").show() ## Get a specific column information

+--------+
|    Name|
+--------+
|  Subham|
|   Soham|
|Swarnali|
|  Tanvir|
+--------+



In [93]:
df_s.select("Name","Experience").show() ## Get multiple columns information

+--------+----------+
|    Name|Experience|
+--------+----------+
|  Subham|        23|
|   Soham|         6|
|Swarnali|         4|
|  Tanvir|         3|
+--------+----------+



In [94]:
df_s.dtypes ## CHECKING DATA TYPES

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [95]:
df_s.describe().show()   ## Same function as pandas

+-------+------+-----------------+----------------+
|summary|  Name|              Age|      Experience|
+-------+------+-----------------+----------------+
|  count|     4|                4|               4|
|   mean|  NULL|             35.5|             9.0|
| stddev|  NULL|12.60952021291849|9.41629792788369|
|    min| Soham|               23|               3|
|    max|Tanvir|               53|              23|
+-------+------+-----------------+----------------+



In [96]:
### Adding Columns

df_s_new = df_s.withColumn('Experience After Two Years', df_s['Experience']+2)

In [97]:
df_s_new.show()


+--------+---+----------+--------------------------+
|    Name|Age|Experience|Experience After Two Years|
+--------+---+----------+--------------------------+
|  Subham| 34|        23|                        25|
|   Soham| 53|         6|                         8|
|Swarnali| 32|         4|                         6|
|  Tanvir| 23|         3|                         5|
+--------+---+----------+--------------------------+



In [98]:
### Dropping Columns

df_s_new2 = df_s_new.drop('Experience After Two Years')
df_s_new2.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
|  Subham| 34|        23|
|   Soham| 53|         6|
|Swarnali| 32|         4|
|  Tanvir| 23|         3|
+--------+---+----------+



In [99]:
### Renaming Columns
df_s_new2.withColumnRenamed('Name','New Name').show()

+--------+---+----------+
|New Name|Age|Experience|
+--------+---+----------+
|  Subham| 34|        23|
|   Soham| 53|         6|
|Swarnali| 32|         4|
|  Tanvir| 23|         3|
+--------+---+----------+



### --------------------------------------------------------------------------------------

### Column and Row Operations (Adding/Dropping)

In [100]:
df_s_new =spark.read.csv("Untitled3.csv",header=True,inferSchema = True)
df_s_new.show()

+--------+----+----------+-------+
|    Name| Age|Experience| Salary|
+--------+----+----------+-------+
|  Subham|  34|        23|  50000|
|   Soham|  53|         6|   3323|
|Swarnali|  32|         4|  21421|
|  Tanvir|  23|         3|  23134|
|     Sam|  13|         2|  43432|
|  Ronnie|  43|        33| 654345|
|     Ram|NULL|      NULL| 245243|
|    NULL|  87|        45|1000000|
|    NULL|  56|      NULL|   NULL|
+--------+----+----------+-------+



In [101]:
type(df_s_new)

pyspark.sql.dataframe.DataFrame

In [102]:
### Drop the Rows which have Null Values

df_s_new.na.drop().show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|  Subham| 34|        23| 50000|
|   Soham| 53|         6|  3323|
|Swarnali| 32|         4| 21421|
|  Tanvir| 23|         3| 23134|
|     Sam| 13|         2| 43432|
|  Ronnie| 43|        33|654345|
+--------+---+----------+------+



In [103]:
### Drop the Rows which have Null Values

df_s_new.na.drop(how="any").show() # Will drop the rows which have any number of null values

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|  Subham| 34|        23| 50000|
|   Soham| 53|         6|  3323|
|Swarnali| 32|         4| 21421|
|  Tanvir| 23|         3| 23134|
|     Sam| 13|         2| 43432|
|  Ronnie| 43|        33|654345|
+--------+---+----------+------+



In [104]:
### Threshold

df_s_new.na.drop(how="any",thresh=2).show() ## Thresh = 2 means that at least 2 non-null values should be present in the rows

+--------+----+----------+-------+
|    Name| Age|Experience| Salary|
+--------+----+----------+-------+
|  Subham|  34|        23|  50000|
|   Soham|  53|         6|   3323|
|Swarnali|  32|         4|  21421|
|  Tanvir|  23|         3|  23134|
|     Sam|  13|         2|  43432|
|  Ronnie|  43|        33| 654345|
|     Ram|NULL|      NULL| 245243|
|    NULL|  87|        45|1000000|
+--------+----+----------+-------+



In [105]:
### Subset

df_s_new.na.drop(how="any",subset=['Experience']).show() ## The entire row will be deleted whenver there is a null value in that user-defined column

+--------+---+----------+-------+
|    Name|Age|Experience| Salary|
+--------+---+----------+-------+
|  Subham| 34|        23|  50000|
|   Soham| 53|         6|   3323|
|Swarnali| 32|         4|  21421|
|  Tanvir| 23|         3|  23134|
|     Sam| 13|         2|  43432|
|  Ronnie| 43|        33| 654345|
|    NULL| 87|        45|1000000|
+--------+---+----------+-------+



In [106]:
### Fill the missing values

df_s_new.na.fill('Missing Values').show() ## df_s_new.na.fill('Missing Values','Column_Name'/['Column_Name 1','Column_Name 2']).show() ## if only the user want for a specific column

+--------------+----+----------+-------+
|          Name| Age|Experience| Salary|
+--------------+----+----------+-------+
|        Subham|  34|        23|  50000|
|         Soham|  53|         6|   3323|
|      Swarnali|  32|         4|  21421|
|        Tanvir|  23|         3|  23134|
|           Sam|  13|         2|  43432|
|        Ronnie|  43|        33| 654345|
|           Ram|NULL|      NULL| 245243|
|Missing Values|  87|        45|1000000|
|Missing Values|  56|      NULL|   NULL|
+--------------+----+----------+-------+



In [107]:
df_s_new.show()

+--------+----+----------+-------+
|    Name| Age|Experience| Salary|
+--------+----+----------+-------+
|  Subham|  34|        23|  50000|
|   Soham|  53|         6|   3323|
|Swarnali|  32|         4|  21421|
|  Tanvir|  23|         3|  23134|
|     Sam|  13|         2|  43432|
|  Ronnie|  43|        33| 654345|
|     Ram|NULL|      NULL| 245243|
|    NULL|  87|        45|1000000|
|    NULL|  56|      NULL|   NULL|
+--------+----+----------+-------+



### MISSING VALUE TREATMENT

In [116]:
## IMPUTER FUNCTION/ FILLING THE MISSING VALUES by Mean/Median

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols= ['Age','Experience','Salary'],
    outputCols= ["{}_imputed".format(c) for c in ['Age','Experience','Salary']]).setStrategy("mean")


In [124]:
imputer.fit(df_s_new).transform(df_s_new).show()

+--------+----+----------+-------+-----------+------------------+--------------+
|    Name| Age|Experience| Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+-------+-----------+------------------+--------------+
|  Subham|  34|        23|  50000|         34|                23|         50000|
|   Soham|  53|         6|   3323|         53|                 6|          3323|
|Swarnali|  32|         4|  21421|         32|                 4|         21421|
|  Tanvir|  23|         3|  23134|         23|                 3|         23134|
|     Sam|  13|         2|  43432|         13|                 2|         43432|
|  Ronnie|  43|        33| 654345|         43|                33|        654345|
|     Ram|NULL|      NULL| 245243|         42|                16|        245243|
|    NULL|  87|        45|1000000|         87|                45|       1000000|
|    NULL|  56|      NULL|   NULL|         56|                16|        255112|
+--------+----+----------+--

### --------------------------------------------------------------------------------------

In [129]:
df_ss = spark.read.csv("Untitled4.csv",header=True,inferSchema=True)

In [130]:
df_ss.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|  Subham| 34|        23| 50000|
|   Soham| 53|         6|  3323|
|Swarnali| 32|         4| 21421|
|  Tanvir| 23|         3| 23134|
|     Sam| 13|         2| 43432|
|  Ronnie| 43|        33|654345|
|     Ram| 97|         4|245243|
+--------+---+----------+------+



### FILTER OPERATIONS

In [133]:
## Salary of People <= 20000

df_ss.filter("Salary >=50000").show() ## Another way of writing df_ss.filter(df_ss["Salary"] >=50000).show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|Subham| 34|        23| 50000|
|Ronnie| 43|        33|654345|
|   Ram| 97|         4|245243|
+------+---+----------+------+



In [134]:
df_ss.filter("Salary >=50000").select(["Name","Age"]).show()  ## Single Condition ## Only showing the columns with the condition provided

+------+---+
|  Name|Age|
+------+---+
|Subham| 34|
|Ronnie| 43|
|   Ram| 97|
+------+---+



In [135]:
df_ss.filter((df_ss["Salary"] >=50000) & (df_ss["Age"] >=50)).show() ## Multiple Conditions

+----+---+----------+------+
|Name|Age|Experience|Salary|
+----+---+----------+------+
| Ram| 97|         4|245243|
+----+---+----------+------+



In [138]:
df_ss.filter(~(df_ss["Salary"] >=50000)).show() ## INVERSE OPERATION

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Soham| 53|         6|  3323|
|Swarnali| 32|         4| 21421|
|  Tanvir| 23|         3| 23134|
|     Sam| 13|         2| 43432|
+--------+---+----------+------+



### --------------------------------------------------------------------------------------

### GroupBy and Aggregrate Function

In [139]:
df_sss = spark.read.csv("test3.csv",header=True,inferSchema=True)

In [142]:
df_sss.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [148]:
## Groupby operation ## Groupby and aggregrate function works together and side by side

df_sss.groupBy('Name').sum().show() ## Who has the max salary by summing. Grouped to find the maximum salary

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [150]:
df_sss.groupBy('Departments').sum().show() ## Grouped by to find who gives the maximum salary

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [151]:
df_sss.groupBy('Departments').mean().show() ## Grouped by to find who gives the mean salary

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [152]:
df_sss.groupBy('Departments').count().show() ## Grouped by to find the total number of employees

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [154]:
df_sss.agg({'Salary':"sum"}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [155]:
df_sss.groupBy('Name').max().show() ## Who has the max salary 

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



### --------------------------------------------------------------------------------------

### EXAMPLES OF PYSPARK ML

In [158]:
training = spark.read.csv('test1.csv',header =True,inferSchema=True)

In [159]:
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [162]:
training.columns

['Name', 'age', 'Experience', 'Salary']

In [166]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['age','Experience'],outputCol='Independent Features')

In [171]:
output = featureassembler.transform(training)
output.show()

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [172]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent Features']

In [173]:
finalized_data = output.select("Independent Features","Salary")

In [193]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [194]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.5,0.5])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor=regressor.fit(train_data)

In [195]:
train_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|          [21.0,1.0]| 15000|
|          [24.0,3.0]| 20000|
|          [29.0,4.0]| 20000|
+--------------------+------+



In [196]:
test_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|          [23.0,2.0]| 18000|
|          [30.0,8.0]| 25000|
|         [31.0,10.0]| 30000|
+--------------------+------+



In [197]:
### Coefficients
regressor.coefficients

DenseVector([-714.2857, 3571.4286])

In [198]:

### Intercepts
regressor.intercept

26428.57142857082

In [199]:
### Prediction
pred_results=regressor.evaluate(test_data)

In [200]:
pred_results.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [23.0,2.0]| 18000|17142.85714285713|
|          [30.0,8.0]| 25000| 33571.4285714283|
|         [31.0,10.0]| 30000| 39999.9999999996|
+--------------------+------+-----------------+



In [201]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError


(6476.190476190258, 58068027.21088016)