### Installing Pyspark

In [1]:
!pip install pyspark




[notice] A new release of pip is available: 23.0.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


### Run the code below if pyspark operations throw random errors

In [2]:
import shutil
shutil.rmtree("C:/Users/m/AppData/Local/Temp", ignore_errors=True) 

### Importing library and loading CSV file

In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [4]:
# InferSchema checks the data values for the column and allocates an appropriate data type accordingly

df = spark.read.option('header','true').option('delimiter',';').csv("username.csv", inferSchema=True)
# or
df = spark.read.csv("username.csv", header=True, sep=";", inferSchema=True)

In [5]:
# To see the table
df.show()

+---------+-----------+----------+---------+
| Username| Identifier|First name|Last name|
+---------+-----------+----------+---------+
| booker12|       9012|    Rachel|   Booker|
|   grey07|       2070|     Laura|     Grey|
|johnson81|       4081|     Craig|  Johnson|
|jenkins46|       9346|      Mary|  Jenkins|
|  smith79|       5079|     Jamie|    Smith|
+---------+-----------+----------+---------+



In [6]:
#To see the schema
df.printSchema()

root
 |-- Username: string (nullable = true)
 |--  Identifier: integer (nullable = true)
 |-- First name: string (nullable = true)
 |-- Last name: string (nullable = true)



In [7]:
# To see a row of values from the dataframe
df.head()

Row(Username='booker12',  Identifier=9012, First name='Rachel', Last name='Booker')

In [8]:
# Selecting a column and displaying all of its values
df.select("Username").show()

+---------+
| Username|
+---------+
| booker12|
|   grey07|
|johnson81|
|jenkins46|
|  smith79|
+---------+



In [9]:
# Selecting more than one column and displaying all of its values
df.select(["Username"," Identifier"]).show()

+---------+-----------+
| Username| Identifier|
+---------+-----------+
| booker12|       9012|
|   grey07|       2070|
|johnson81|       4081|
|jenkins46|       9346|
|  smith79|       5079|
+---------+-----------+



In [10]:
# Gives summary of all the columns such as count, mean, average, standard deviation, minimum, maximum
df.describe().show()

+-------+--------+------------------+----------+---------+
|summary|Username|        Identifier|First name|Last name|
+-------+--------+------------------+----------+---------+
|  count|       5|                 5|         5|        5|
|   mean|    NULL|            5917.6|      NULL|     NULL|
| stddev|    NULL|3170.5525228262663|      NULL|     NULL|
|    min|booker12|              2070|     Craig|   Booker|
|    max| smith79|              9346|    Rachel|    Smith|
+-------+--------+------------------+----------+---------+



In [11]:
# To create a new column, in the example given below, we use one of the columns already in the dataframe
df = df.withColumn('Updated Identifier', df[' Identifier'] + 5)
df.show()

+---------+-----------+----------+---------+------------------+
| Username| Identifier|First name|Last name|Updated Identifier|
+---------+-----------+----------+---------+------------------+
| booker12|       9012|    Rachel|   Booker|              9017|
|   grey07|       2070|     Laura|     Grey|              2075|
|johnson81|       4081|     Craig|  Johnson|              4086|
|jenkins46|       9346|      Mary|  Jenkins|              9351|
|  smith79|       5079|     Jamie|    Smith|              5084|
+---------+-----------+----------+---------+------------------+



In [12]:
# Dropping a column
df = df.drop("Updated Identifier")
df.show()

+---------+-----------+----------+---------+
| Username| Identifier|First name|Last name|
+---------+-----------+----------+---------+
| booker12|       9012|    Rachel|   Booker|
|   grey07|       2070|     Laura|     Grey|
|johnson81|       4081|     Craig|  Johnson|
|jenkins46|       9346|      Mary|  Jenkins|
|  smith79|       5079|     Jamie|    Smith|
+---------+-----------+----------+---------+



In [13]:
# Renaming columns
df.withColumnRenamed("Username","New Name").show()

+---------+-----------+----------+---------+
| New Name| Identifier|First name|Last name|
+---------+-----------+----------+---------+
| booker12|       9012|    Rachel|   Booker|
|   grey07|       2070|     Laura|     Grey|
|johnson81|       4081|     Craig|  Johnson|
|jenkins46|       9346|      Mary|  Jenkins|
|  smith79|       5079|     Jamie|    Smith|
+---------+-----------+----------+---------+



### Handling Missing Values
- Dropping columns
- Dropping rows
- Various parameter in dropping functionalities
- Handling missing values in mean, median and mode

### Handling Missing Values
- Dropping columns
- Dropping rows
- Various parameter in dropping functionalities
- Handling missing values in mean, median and mode

In [14]:
# Reading another csv
df = spark.read.csv('biostats.csv', header=True, inferSchema=True)

In [15]:
# By default drops any row with null value
df.na.drop().show()

+----+---+---+-----------+------------+
|Name|Sex|Age|Height (in)|Weight (lbs)|
+----+---+---+-----------+------------+
|Alex|  M| 41|         74|         170|
|Bert|  M| 42|         68|         166|
|Carl|  M| 32|         70|         155|
|Dave|  M| 39|         72|         167|
|Elly|  F| 30|         66|         124|
|Fran|  F| 33|         66|         115|
|Gwen|  F| 26|         64|         121|
|Hank|  M| 30|         71|         158|
|Ivan|  M| 53|         72|         175|
|Kate|  F| 47|         69|         139|
|Luke|  M| 34|         72|         163|
|Myra|  F| 23|         62|          98|
|Neil|  M| 36|         75|         160|
|Page|  F| 31|         67|         135|
|Ruth|  F| 28|         65|         131|
+----+---+---+-----------+------------+



In [16]:
# Assigning value how="all" will drop only the columns with all values as null 
df.na.drop(how="all").show()

+----+----+---+-----------+------------+
|Name| Sex|Age|Height (in)|Weight (lbs)|
+----+----+---+-----------+------------+
|Alex|   M| 41|         74|         170|
|Bert|   M| 42|         68|         166|
|Carl|   M| 32|         70|         155|
|Dave|   M| 39|         72|         167|
|Elly|   F| 30|         66|         124|
|Fran|   F| 33|         66|         115|
|Gwen|   F| 26|         64|         121|
|Hank|   M| 30|         71|         158|
|Ivan|   M| 53|         72|         175|
|Jake|NULL| 32|         69|         143|
|Kate|   F| 47|         69|         139|
|Luke|   M| 34|         72|         163|
|Myra|   F| 23|         62|          98|
|Neil|   M| 36|         75|         160|
|Omar|   M| 38|       NULL|        NULL|
|Page|   F| 31|         67|         135|
|Ruth|   F| 28|         65|         131|
+----+----+---+-----------+------------+



In [17]:
# Assigning value how="any" and threshold to 2 so it will discard if there are 2 null values in a row 
df.na.drop(how="all", thresh=2).show()

+----+----+---+-----------+------------+
|Name| Sex|Age|Height (in)|Weight (lbs)|
+----+----+---+-----------+------------+
|Alex|   M| 41|         74|         170|
|Bert|   M| 42|         68|         166|
|Carl|   M| 32|         70|         155|
|Dave|   M| 39|         72|         167|
|Elly|   F| 30|         66|         124|
|Fran|   F| 33|         66|         115|
|Gwen|   F| 26|         64|         121|
|Hank|   M| 30|         71|         158|
|Ivan|   M| 53|         72|         175|
|Jake|NULL| 32|         69|         143|
|Kate|   F| 47|         69|         139|
|Luke|   M| 34|         72|         163|
|Myra|   F| 23|         62|          98|
|Neil|   M| 36|         75|         160|
|Omar|   M| 38|       NULL|        NULL|
|Page|   F| 31|         67|         135|
|Ruth|   F| 28|         65|         131|
+----+----+---+-----------+------------+



In [18]:
# Replacing all the null values with a value
df.na.fill("Nothing").show()

+-------+-------+----+-----------+------------+
|   Name|    Sex| Age|Height (in)|Weight (lbs)|
+-------+-------+----+-----------+------------+
|   Alex|      M|  41|         74|         170|
|   Bert|      M|  42|         68|         166|
|   Carl|      M|  32|         70|         155|
|   Dave|      M|  39|         72|         167|
|   Elly|      F|  30|         66|         124|
|   Fran|      F|  33|         66|         115|
|   Gwen|      F|  26|         64|         121|
|   Hank|      M|  30|         71|         158|
|   Ivan|      M|  53|         72|         175|
|   Jake|Nothing|  32|         69|         143|
|   Kate|      F|  47|         69|         139|
|   Luke|      M|  34|         72|         163|
|   Myra|      F|  23|         62|          98|
|   Neil|      M|  36|         75|         160|
|   Omar|      M|  38|       NULL|        NULL|
|   Page|      F|  31|         67|         135|
|Nothing|Nothing|NULL|       NULL|        NULL|
|   Ruth|      F|  28|         65|      

In [19]:
# Converting columns to a specific data type

from pyspark.sql.types import FloatType
df = df.withColumn("Height (in)", df["Height (in)"].cast(FloatType()))
df = df.withColumn("Weight (lbs)", df["Weight (lbs)"].cast(FloatType()))

In [20]:
# To replace null values with mean/median/mode instead of a fixed value, use imputer for the specific columns

from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=["Age", "Height (in)", "Weight (lbs)"],
    outputCols=["{}_imputed".format(c) for c in ["Age", "Height (in)", "Weight (lbs)"]]
).setStrategy("mean")

In [21]:
# Fitting the data here to calculate the mean and then using the same data here to fill the null values
imputer.fit(df).transform(df).show()

+----+----+----+-----------+------------+-----------+-------------------+--------------------+
|Name| Sex| Age|Height (in)|Weight (lbs)|Age_imputed|Height (in)_imputed|Weight (lbs)_imputed|
+----+----+----+-----------+------------+-----------+-------------------+--------------------+
|Alex|   M|  41|       74.0|       170.0|         41|               74.0|               170.0|
|Bert|   M|  42|       68.0|       166.0|         42|               68.0|               166.0|
|Carl|   M|  32|       70.0|       155.0|         32|               70.0|               155.0|
|Dave|   M|  39|       72.0|       167.0|         39|               72.0|               167.0|
|Elly|   F|  30|       66.0|       124.0|         30|               66.0|               124.0|
|Fran|   F|  33|       66.0|       115.0|         33|               66.0|               115.0|
|Gwen|   F|  26|       64.0|       121.0|         26|               64.0|               121.0|
|Hank|   M|  30|       71.0|       158.0|         

### Filter Operstions

In [22]:
# Filtering out data similar to SQL commands
df.filter("Age<=40").show()

df.filter("Sex='M'").select(["Name","Age"]).show()

df.filter("Sex='M' and Age<=40").select(["Name","Age"]).show()

+----+----+---+-----------+------------+
|Name| Sex|Age|Height (in)|Weight (lbs)|
+----+----+---+-----------+------------+
|Carl|   M| 32|       70.0|       155.0|
|Dave|   M| 39|       72.0|       167.0|
|Elly|   F| 30|       66.0|       124.0|
|Fran|   F| 33|       66.0|       115.0|
|Gwen|   F| 26|       64.0|       121.0|
|Hank|   M| 30|       71.0|       158.0|
|Jake|NULL| 32|       69.0|       143.0|
|Luke|   M| 34|       72.0|       163.0|
|Myra|   F| 23|       62.0|        98.0|
|Neil|   M| 36|       75.0|       160.0|
|Omar|   M| 38|       NULL|        NULL|
|Page|   F| 31|       67.0|       135.0|
|Ruth|   F| 28|       65.0|       131.0|
+----+----+---+-----------+------------+

+----+---+
|Name|Age|
+----+---+
|Alex| 41|
|Bert| 42|
|Carl| 32|
|Dave| 39|
|Hank| 30|
|Ivan| 53|
|Luke| 34|
|Neil| 36|
|Omar| 38|
+----+---+

+----+---+
|Name|Age|
+----+---+
|Carl| 32|
|Dave| 39|
|Hank| 30|
|Luke| 34|
|Neil| 36|
|Omar| 38|
+----+---+



In [23]:
# Filtering out data similar to pandas style

df.filter(df["Sex"]=="F").show()

df.filter(~(df["Sex"]=="F")).show()

+----+---+---+-----------+------------+
|Name|Sex|Age|Height (in)|Weight (lbs)|
+----+---+---+-----------+------------+
|Elly|  F| 30|       66.0|       124.0|
|Fran|  F| 33|       66.0|       115.0|
|Gwen|  F| 26|       64.0|       121.0|
|Kate|  F| 47|       69.0|       139.0|
|Myra|  F| 23|       62.0|        98.0|
|Page|  F| 31|       67.0|       135.0|
|Ruth|  F| 28|       65.0|       131.0|
+----+---+---+-----------+------------+

+----+---+---+-----------+------------+
|Name|Sex|Age|Height (in)|Weight (lbs)|
+----+---+---+-----------+------------+
|Alex|  M| 41|       74.0|       170.0|
|Bert|  M| 42|       68.0|       166.0|
|Carl|  M| 32|       70.0|       155.0|
|Dave|  M| 39|       72.0|       167.0|
|Hank|  M| 30|       71.0|       158.0|
|Ivan|  M| 53|       72.0|       175.0|
|Luke|  M| 34|       72.0|       163.0|
|Neil|  M| 36|       75.0|       160.0|
|Omar|  M| 38|       NULL|        NULL|
+----+---+---+-----------+------------+



### Group By and Aggregate

In [24]:
# Cleaning the data to use it in the group by and aggregate operations
df = df.na.drop()

In [25]:
# To group data based on "Sex" and then calculate mean for the rest
df.groupBy("Sex").mean().show()

+---+------------------+-----------------+------------------+
|Sex|          avg(Age)| avg(Height (in))| avg(Weight (lbs))|
+---+------------------+-----------------+------------------+
|  F|31.142857142857142|65.57142857142857|123.28571428571429|
|  M|            38.375|            71.75|            164.25|
+---+------------------+-----------------+------------------+



In [26]:
# To get the maximum value of column "Age"
df.agg({'Age':'max'}).show()

+--------+
|max(Age)|
+--------+
|      53|
+--------+



In [27]:
# To assemble a vector with list of columns to give as training input

from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["Weight (lbs)", "Height (in)"], outputCol="vector")
output=featureassembler.transform(df)
output.show()

+----+---+---+-----------+------------+------------+
|Name|Sex|Age|Height (in)|Weight (lbs)|      vector|
+----+---+---+-----------+------------+------------+
|Alex|  M| 41|       74.0|       170.0|[170.0,74.0]|
|Bert|  M| 42|       68.0|       166.0|[166.0,68.0]|
|Carl|  M| 32|       70.0|       155.0|[155.0,70.0]|
|Dave|  M| 39|       72.0|       167.0|[167.0,72.0]|
|Elly|  F| 30|       66.0|       124.0|[124.0,66.0]|
|Fran|  F| 33|       66.0|       115.0|[115.0,66.0]|
|Gwen|  F| 26|       64.0|       121.0|[121.0,64.0]|
|Hank|  M| 30|       71.0|       158.0|[158.0,71.0]|
|Ivan|  M| 53|       72.0|       175.0|[175.0,72.0]|
|Kate|  F| 47|       69.0|       139.0|[139.0,69.0]|
|Luke|  M| 34|       72.0|       163.0|[163.0,72.0]|
|Myra|  F| 23|       62.0|        98.0| [98.0,62.0]|
|Neil|  M| 36|       75.0|       160.0|[160.0,75.0]|
|Page|  F| 31|       67.0|       135.0|[135.0,67.0]|
|Ruth|  F| 28|       65.0|       131.0|[131.0,65.0]|
+----+---+---+-----------+------------+-------

In [28]:
# Getting the final data to for the regression model
data = output.select(["Vector", "Age"])
data.show()

+------------+---+
|      Vector|Age|
+------------+---+
|[170.0,74.0]| 41|
|[166.0,68.0]| 42|
|[155.0,70.0]| 32|
|[167.0,72.0]| 39|
|[124.0,66.0]| 30|
|[115.0,66.0]| 33|
|[121.0,64.0]| 26|
|[158.0,71.0]| 30|
|[175.0,72.0]| 53|
|[139.0,69.0]| 47|
|[163.0,72.0]| 34|
| [98.0,62.0]| 23|
|[160.0,75.0]| 36|
|[135.0,67.0]| 31|
|[131.0,65.0]| 28|
+------------+---+



In [31]:
# Splitting the data into train and test
train_df, test_df = data.randomSplit([0.75,0.25])

In [32]:
# Fitting the regression model using the test data
from pyspark.ml.regression import LinearRegression
regressor = LinearRegression(featuresCol="Vector", labelCol="Age")
regressor = regressor.fit(train_df)

In [33]:
# Evaluating based on the test data
results = regressor.evaluate(test_df)
results.predictions.show()

+------------+---+------------------+
|      Vector|Age|        prediction|
+------------+---+------------------+
| [98.0,62.0]| 23|24.281106476696316|
|[155.0,70.0]| 32|  37.8372949122352|
+------------+---+------------------+



In [34]:
# To see the Mean Squared Error for the test data/evaluation
results.meanSquaredError

17.857622848520105

In [40]:
# Predicting using the regression model
regressor.predict(test_df.select("Vector").first()["Vector"])

24.281106476696316