In [None]:
# Install Hadoop and Spark on top of it
!apt-get install openjdk-8-jdk-headless -qq > /dev/null ### Installing Java
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz #### Installing Spark and Hadoop on the worker node
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark #### Creating a Sparksession

In [None]:
# Upload the dataset to Google Colab test.csv which describes the customer purchase behaviour
df = spark.read.csv('test.csv', header=True)

In [None]:
# Show Column details of the dataset
df.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3']

In [None]:
# Display the first 15 rows of the dataset
df.show(15)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|1000004| P00128942|     M|46-50|         7|            B|                         2|             1|                 1|                11|              null|
|1000009| P00113442|     M|26-35|        17|            C|                         0|             0|                 3|                 5|              null|
|1000010| P00288442|     F|36-45|         1|            B|                        4+|             1|                 5|                14|              null|
|1000010| P00145342|     F|36-45|         1|        

In [None]:
# Print the total number of rows in the dataset
df.count()

233599

In [None]:
# Determine the total Purchase per City Category
df1=df.select('City_Category','Product_Category_1','Product_Category_2','Product_Category_3')

In [None]:
df1.show()

+-------------+------------------+------------------+------------------+
|City_Category|Product_Category_1|Product_Category_2|Product_Category_3|
+-------------+------------------+------------------+------------------+
|            B|                 1|                11|              null|
|            C|                 3|                 5|              null|
|            B|                 5|                14|              null|
|            B|                 4|                 9|              null|
|            C|                 4|                 5|                12|
|            C|                 2|                 3|                15|
|            C|                 1|                11|                15|
|            C|                 2|                 4|                 9|
|            A|                10|                13|                16|
|            A|                 5|                14|              null|
|            B|                 1|                 

In [None]:
from pyspark.sql.functions import col
 
df2=df1.withColumn("Total", (col("Product_Category_1")+col("Product_Category_2")+col("Product_Category_3")))
df2.show()

+-------------+------------------+------------------+------------------+-----+
|City_Category|Product_Category_1|Product_Category_2|Product_Category_3|Total|
+-------------+------------------+------------------+------------------+-----+
|            B|                 1|                11|              null| null|
|            C|                 3|                 5|              null| null|
|            B|                 5|                14|              null| null|
|            B|                 4|                 9|              null| null|
|            C|                 4|                 5|                12| 21.0|
|            C|                 2|                 3|                15| 20.0|
|            C|                 1|                11|                15| 27.0|
|            C|                 2|                 4|                 9| 15.0|
|            A|                10|                13|                16| 39.0|
|            A|                 5|                14

In [None]:
import pyspark.sql.functions as fn
means = df2.agg(*[fn.mean(c).alias(c) 
 for c in df2.columns if c != 'City_Category']).toPandas().to_dict('records')[0]
means

{'Product_Category_1': 5.276542279718663,
 'Product_Category_2': 9.849586059346997,
 'Product_Category_3': 12.669453946534905,
 'Total': 22.27692610892915}

In [None]:
df3=df2.fillna(means)

In [None]:
df3.show()

+-------------+------------------+------------------+------------------+-----------------+
|City_Category|Product_Category_1|Product_Category_2|Product_Category_3|            Total|
+-------------+------------------+------------------+------------------+-----------------+
|            B|                 1|                11|12.669453946534905|22.27692610892915|
|            C|                 3|                 5|12.669453946534905|22.27692610892915|
|            B|                 5|                14|12.669453946534905|22.27692610892915|
|            B|                 4|                 9|12.669453946534905|22.27692610892915|
|            C|                 4|                 5|                12|             21.0|
|            C|                 2|                 3|                15|             20.0|
|            C|                 1|                11|                15|             27.0|
|            C|                 2|                 4|                 9|             15.0|

In [None]:
df3.groupBy('City_Category').sum('Total')

City_Category,sum(Total)
B,2194470.855920172
C,1613257.8087763384
A,1396138.9974246807


In [None]:
# Remove rows which have missing values
len(df.columns)

11

In [None]:
df.dropna(thresh=len(df.columns)).show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|1000011| P00053842|     F|26-35|         1|            C|                         1|             0|                 4|                 5|                12|
|1000013| P00350442|     M|46-50|         1|            C|                         3|             1|                 2|                 3|                15|
|1000013| P00155442|     M|46-50|         1|            C|                         3|             1|                 1|                11|                15|
|1000013|  P0094542|     M|46-50|         1|        

In [None]:
# Determine the number of users who have age in the age range 46-50 years
df.filter(col('Age')=='46-50').show(truncate=False)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|Age  |Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|1000004|P00128942 |M     |46-50|7         |B            |2                         |1             |1                 |11                |null              |
|1000013|P00350442 |M     |46-50|1         |C            |3                         |1             |2                 |3                 |15                |
|1000013|P00155442 |M     |46-50|1         |C            |3                         |1             |1                 |11                |15                |
|1000013|P0094542  |M     |46-50|1         |C       

In [None]:
import pyspark.sql.functions as fn
df.agg(
fn.count('User_ID').alias('count'),
fn.countDistinct('User_ID').alias('distinct')
).show()

+------+--------+
| count|distinct|
+------+--------+
|233599|    5891|
+------+--------+



In [None]:
# Upload daily_weather.csv file
df_weather = spark.read.csv("daily_weather.csv", header=True, inferSchema=True)

In [None]:
# number of duplicate rows

number_of_duplicate_rows = df_weather.count()-df_weather.distinct().count()
number_of_duplicate_rows

0

In [None]:
# Percentage of missing values
import pyspark.sql.functions as fn
df_weather.agg(*[
 (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
 for c in df.columns
]).show()

+--------------+------------------------+--------------------+------------------------------+--------------------------+------------------------------+--------------------------+-----------------------------+-------------------------+-----------------------------+-----------------------------+
|number_missing|air_pressure_9am_missing|air_temp_9am_missing|avg_wind_direction_9am_missing|avg_wind_speed_9am_missing|max_wind_direction_9am_missing|max_wind_speed_9am_missing|rain_accumulation_9am_missing|rain_duration_9am_missing|relative_humidity_9am_missing|relative_humidity_3pm_missing|
+--------------+------------------------+--------------------+------------------------------+--------------------------+------------------------------+--------------------------+-----------------------------+-------------------------+-----------------------------+-----------------------------+
|           0.0|    0.002739726027397249|0.004566210045662156|          0.003652968036529...|      0.00273972602739

In [None]:
# Calculate the number of rows which contains atleast one null value.
df_weather.count()-df_weather.dropna(how="any").count()

31

In [None]:
# Drop all null values and Calculate the mean values for each column.
df_remove_all = df_weather.dropna(how="any")

from pyspark.sql.functions import avg
imputeDF=df_weather
for c in imputeDF.columns:
    meanvalue = df_remove_all.agg(avg(c)).first()[0]
    print(c, meanvalue)
    imputeDF = imputeDF.na.fill(meanvalue, [c])

number 545.0018796992481
air_pressure_9am 918.9031798641051
air_temp_9am 65.02260949558733
avg_wind_direction_9am 142.30675564934037
avg_wind_speed_9am 5.48579305071369
max_wind_direction_9am 148.48042413321315
max_wind_speed_9am 6.999713658875691
rain_accumulation_9am 0.18202347650615522
rain_duration_9am 266.3936973996037
relative_humidity_9am 34.07743985327709
relative_humidity_3pm 35.14838093290533
