In [1]:
#setting up environement
import os
os.environ["SPARK_HOME"] = "/usr/share/spark-3.1.1-bin-hadoop3.2"

In [2]:
#Importing All required packages
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from pyspark.sql import functions as sf
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,ArrayType,DateType,FloatType,TimestampType

In [3]:
#defining sparksession
spark = SparkSession.builder.appName('SparkSQL_UseCase').master('local[*]').getOrCreate()

In [4]:
#Specifying File Schemas (i.e. Mentioning column name, datatype and Null value status of each column):-

#1 Aisles Schema:-
aisles_schema= StructType([StructField('aisle_id',IntegerType(),False),
                          StructField('aisle',StringType(),True)])

#2 Departments_schema:-
department_schema=StructType([StructField('department_id',IntegerType(),False),
                                StructField('department',StringType(),True)])
#3 order_schema:-
orders_schema=StructType([StructField('order_id',IntegerType(),False),
                              StructField('user_id',IntegerType(),True),
                              StructField('eval_set',StringType(),True),
                              StructField('order_number',IntegerType(),True),
                              StructField('order_dow',IntegerType(),True),
                              StructField('order_hour_of_day',IntegerType(),True),
                              StructField('days_since_prior_order',IntegerType(),True)])

#4 prior_order_schema and train_order_schema:-
prior_order_schema=StructType([StructField('order_id',IntegerType(),True),
                              StructField('product_id',IntegerType(),True),
                              StructField('add_to_cart_order',IntegerType(),True),
                              StructField('reordered',IntegerType(),True)])
#5 Products_schema:-
products_schema=StructType([StructField('product_id',IntegerType(),False),
                              StructField('product_name',StringType(),True),
                              StructField('aisle_id',StringType(),True),
                              StructField('department_id',StringType(),True)])

In [5]:
# A) Extracting Data:-
#defining file path from where to read the files and output path
#Note:- The data has been copied to local and then given the local path here as I was facing issues with Insofe cluster for my IP.
dataset_path='/home/fai10101/Project/Data_sets/'
output_path=dataset_path+"/output/"

#reading files as dataframes:-

#aisles
aisles_df = spark.read\
        .schema(aisles_schema)\
        .option("delimeter",",").option("header","True")\
        .csv(dataset_path+'aisles.csv')

#departments:-
department_df = spark.read\
                .schema(department_schema)\
                .option("header","True")\
                .csv(dataset_path+'departments.csv')
#orders:-
orders_df = spark.read\
                .schema(orders_schema)\
                .option("header","True")\
                .csv(dataset_path+'orders.csv')

#prior_order:-
prior_order_df = spark.read\
                .schema(prior_order_schema)\
                .option("header","True")\
                .csv(dataset_path+'prior_order.csv')

#products:- reading products file as rdd as it has some noises later on it has been converted to data frame after removing noises. 
#All other files have been read as csv
products_rdd = spark.sparkContext\
                .textFile(dataset_path+'products.csv')

#train_order:-
train_order_df= spark.read\
                .schema(prior_order_schema)\
                .option("header","True")\
                .csv(dataset_path+'train_order.csv')

In [6]:
#removing noises from products data:- removing unwanted characters from records like:- '\' , '"' , ',' etc
#after removing noises we will convert it to dataframe
def remove_noise(row):
    if '"' in row:
        first=row.index('"')
        last=row.index('"',first+1)
        part_a=row[0:first]
        part_b=row[first:last+1].replace(", "," - ").replace('"','')
        part_c=row[last+1:]
        row=(part_a+part_b+part_c).replace('\"',"").split(",")
        return [int(row[0]),row[1],row[2],row[3]]
    else:
        row = row.replace('\"',"").split(",")
        return [int(row[0]),row[1],row[2],row[3]]

header=products_rdd.first()
products_rdd_mo=products_rdd.filter(lambda x : x!=header).map(lambda x : remove_noise(x))
products_df=products_rdd_mo.toDF(products_schema) # product dataframe creation from product rdd after removing noises.

In [7]:
#showing columns of all data frames:
print('Showing columns of all data frames:')
print('\naisles :\n',aisles_df.columns)
print('\nproducts :\n',products_df.columns)
print('\ndepartments :\n',department_df.columns)
print('\norders :\n',orders_df.columns)
print('\nprior order :\n',prior_order_df.columns)
print('\ntrain order :\n',train_order_df.columns)

Showing columns of all data frames:

aisles :
 ['aisle_id', 'aisle']

products :
 ['product_id', 'product_name', 'aisle_id', 'department_id']

departments :
 ['department_id', 'department']

orders :
 ['order_id', 'user_id', 'eval_set', 'order_number', 'order_dow', 'order_hour_of_day', 'days_since_prior_order']

prior order :
 ['order_id', 'product_id', 'add_to_cart_order', 'reordered']

train order :
 ['order_id', 'product_id', 'add_to_cart_order', 'reordered']


In [8]:
#showing Data types of all data frames:

print('\nShowing Data types of all data frames:')
print('Aisles')
aisles_df.printSchema()
print('Department')
department_df.printSchema()
print('Products')
products_df.printSchema()
print('Orders')
orders_df.printSchema()
print('Prior Order')
prior_order_df.printSchema()
print('Train Order')
train_order_df.printSchema()


Showing Data types of all data frames:
Aisles
root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)

Department
root
 |-- department_id: integer (nullable = true)
 |-- department: string (nullable = true)

Products
root
 |-- product_id: integer (nullable = false)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)

Orders
root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: integer (nullable = true)

Prior Order
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)

Train Order
root
 |-- order_id: integer (nullable = true

In [9]:
#B) Transformation:- Data Processing Part

#Creating Tables from dataframes for aggregation purposes:-
aisles_df.createOrReplaceTempView('aisles') # aisles table
department_df.createOrReplaceTempView('department') # department table
orders_df.createOrReplaceTempView('orders') # orders table
prior_order_df.createOrReplaceTempView('prior_order') #prior_order table
products_df.createOrReplaceTempView('products') #products table
train_order_df.createOrReplaceTempView('train_order') #train_order table

In [10]:
#Displaying the records from tables
print('\nDisplaying records from tables')
spark.sql("select * from aisles").show(2)
spark.sql("select * from department").show(2)
spark.sql("select * from orders").show(2)
spark.sql("select * from prior_order").show(2)
spark.sql("select * from products").show(2)
spark.sql("select * from train_order").show(2)


Displaying records from tables
+--------+--------------------+
|aisle_id|               aisle|
+--------+--------------------+
|       1|prepared soups sa...|
|       2|   specialty cheeses|
+--------+--------------------+
only showing top 2 rows

+-------------+----------+
|department_id|department|
+-------------+----------+
|            1|    frozen|
|            2|     other|
+-------------+----------+
only showing top 2 rows

+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 1363380|     50|   prior|           1|        3|                9|                  null|
| 3131103|     50|   prior|           2|        6|               12|                  null|
+--------+-------+--------+------------+---------+-----------------+----------------------+
only showing

In [11]:
#aggregating products, prior_order and train_order data first (just to make the process easy abd simple)
aggregated_table_part_1 =spark.sql('''SELECT p.product_id, product_name, aisle_id, department_id, order_id, add_to_cart_order, reordered
                                      FROM products p INNER JOIN train_order to ON to.product_id=p.product_id
                                      UNION ALL
                                      SELECT p.product_id, product_name, aisle_id, department_id, order_id,add_to_cart_order,reordered
                                      FROM products p INNER JOIN prior_order po ON po.product_id=p.product_id''')

#creating table from aggregated_table_part_1 dataframe for further aggregation
aggregated_table_part_1.createOrReplaceTempView("Combined_table")

#aggregating all tables as per the data model
fully_combined_table = spark.sql('''SELECT product_id, product_name, t.aisle_id,aisle, d.department_id, department, o.order_id, user_id, 
                                    add_to_cart_order, reordered,eval_set, order_number, order_dow, order_hour_of_day, days_since_prior_order
                                   FROM Combined_table t 
                                   INNER JOIN orders o ON o.order_id=t.order_id 
                                   INNER JOIN aisles a ON a.aisle_id=t.aisle_id
                                   INNER JOIN department d ON d.department_id=t.department_id''')

                
#C Loading results to destination:- writing back tranformed data to destination (data lake):-

fully_combined_table.coalesce(1).write.option("header",True).csv(output_path)
print("done")

done


In [12]:
#Checking Null Values
print('Null Value count:')
{col : aisles_df.filter(aisles_df[col].isNull()).count() for col in aisles_df.columns}

Null Value count:


{'aisle_id': 0, 'aisle': 0}

In [13]:
{col : department_df.filter(department_df[col].isNull()).count() for col in department_df.columns}

{'department_id': 0, 'department': 0}

In [14]:
{col : products_df.filter(products_df[col].isNull()).count() for col in products_df.columns}

{'product_id': 0, 'product_name': 0, 'aisle_id': 0, 'department_id': 0}

In [15]:
{col : orders_df.filter(orders_df[col].isNull()).count() for col in orders_df.columns}

{'order_id': 0,
 'user_id': 0,
 'eval_set': 0,
 'order_number': 0,
 'order_dow': 0,
 'order_hour_of_day': 0,
 'days_since_prior_order': 82683}

In [16]:
{col : prior_order_df.filter(prior_order_df[col].isNull()).count() for col in prior_order_df.columns}

{'order_id': 0, 'product_id': 0, 'add_to_cart_order': 0, 'reordered': 0}

In [17]:
{col : train_order_df.filter(train_order_df[col].isNull()).count() for col in train_order_df.columns}

{'order_id': 0, 'product_id': 0, 'add_to_cart_order': 0, 'reordered': 0}