# Landing Zone

Setup file location for customer and order data


### Read Files

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, LongType, DateType, IntegerType
import pyspark.sql.functions as F

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")


check file delivery to location

In [0]:
%fs ls /FileStore/tables/customers.json

path,name,size,modificationTime
dbfs:/FileStore/tables/customers.json,customers.json,15815082,1704085320000


In [0]:
%fs ls /FileStore/tables/orders.csv

path,name,size,modificationTime
dbfs:/FileStore/tables/orders.csv,orders.csv,32138462,1704085327000



# Staging Zone

setup staging to ingest into tables

In [0]:
%sql 

-- Create a staging database

DROP DATABASE IF EXISTS staging_assignment CASCADE;

CREATE DATABASE staging_assignment;

In [0]:
%sql

-- Setup to use staging_assignment database as default

USE staging_assignment;

In [0]:
%sql

-- Uncomment and run this cell, if want to drop the table only (not the database)

DROP TABLE IF EXISTS customers;
DROP TABLE IF EXISTS orders;


## Customer Data

In [0]:
# Read cumstomer data and apply necessary data type transformations

customers_df =  spark.read.json("/FileStore/tables/customers.json") \
                    .withColumnRenamed("Customer Since","Customer_Since") \
                    .withColumnRenamed("E Mail","E_Mail") \
                    .withColumnRenamed("Place Name","Place_Name") \
                    .withColumnRenamed("age","Age") \
                    .withColumnRenamed("cust_id","Cust_Id") \
                    .withColumnRenamed("full_name","Full_Name") \
                    .withColumn("Customer_Since", F.to_date("Customer_Since","MM/dd/yyyy")) \
                    .withColumn("Cust_Id", F.col("Cust_Id").cast(IntegerType())) \
                    .withColumn("Age", F.col("Age").cast(IntegerType()))

In [0]:
customers_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Customer_Since: date (nullable = true)
 |-- E_Mail: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Place_Name: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: long (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Cust_Id: integer (nullable = true)
 |-- Full_Name: string (nullable = true)



In [0]:
display(customers_df.head(3))

City,County,Customer_Since,E_Mail,Gender,Place_Name,Region,State,Zip,Age,Cust_Id,Full_Name
Vinson,Harmon,2006-08-22,jani.titus@gmail.com,F,Vinson,South,OK,73571,43,60124,"Titus, Jani"
Graham,Bradford,1981-02-04,lee.eaker@gmail.com,M,Graham,South,FL,32042,28,42485,"Eaker, Lee"
Grand Forks,Grand Forks,2010-06-27,jason.simoneau@gmail.com,M,Grand Forks,Midwest,ND,58201,65,53620,"Simoneau, Jason"


In [0]:
# Save customer data to staging database

customers_df.write.format("delta").partitionBy("Region").saveAsTable("staging_assignment.customers")


## Order Data

In [0]:
# Define orders schema

orders_schema = StructType([
    StructField("Order_Id", StringType(), True),
    StructField("Order_Date", DateType(), True),
    StructField("Status", StringType(), True),
    StructField("Item_Id", DoubleType(), True),
    StructField("Qty_Ordered", DoubleType(), True),
    StructField("Price", DoubleType(), True),
    StructField("Value", DoubleType(), True),
    StructField("Discount_Amount", DoubleType(), True),
    StructField("Total", DoubleType(), True),
    StructField("Category", StringType(), True),
    StructField("Payment_Method", StringType(), True),
    StructField("Cust_Id", DoubleType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Month", StringType(), True)
])

In [0]:
# Read orders data and apply necessary data type transformations

orders_df = spark.read.csv("/FileStore/tables/orders.csv", header=True, schema=orders_schema) \
                    .withColumn("Cust_Id", F.col("Cust_Id").cast(IntegerType())) \
                    .withColumn("Order_Id", F.col("Order_Id").cast(IntegerType())) \
                    .withColumn("Item_Id", F.col("Item_Id").cast(IntegerType())) 

In [0]:
orders_df.printSchema()

root
 |-- Order_Id: integer (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Status: string (nullable = true)
 |-- Item_Id: integer (nullable = true)
 |-- Qty_Ordered: double (nullable = true)
 |-- Price: double (nullable = true)
 |-- Value: double (nullable = true)
 |-- Discount_Amount: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- Cust_Id: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)



In [0]:
display(orders_df.head(3))

Order_Id,Order_Date,Status,Item_Id,Qty_Ordered,Price,Value,Discount_Amount,Total,Category,Payment_Method,Cust_Id,Year,Month
100354678,2020-10-01,received,574772,21.0,89.9,1798.0,0.0,1798.0,Men's Fashion,cod,60124,2020,Oct-2020
100354678,2020-10-01,received,574774,11.0,19.0,190.0,0.0,190.0,Men's Fashion,cod,60124,2020,Oct-2020
100354680,2020-10-01,complete,574777,9.0,149.9,1199.2,0.0,1199.2,Men's Fashion,cod,60124,2020,Oct-2020


In [0]:
# Save orders data to staging database

orders_df.write.format("delta").partitionBy("Category").saveAsTable("staging_assignment.orders")

In [0]:
%sql

-- verify table creation to database

SHOW Tables;

database,tableName,isTemporary
staging_assignment,customers,False
staging_assignment,orders,False


In [0]:
%sql

select * from staging_assignment.customers limit 5;

City,County,Customer_Since,E_Mail,Gender,Place_Name,Region,State,Zip,Age,Cust_Id,Full_Name
Grand Forks,Grand Forks,2010-06-27,jason.simoneau@gmail.com,M,Grand Forks,Midwest,ND,58201,65,53620,"Simoneau, Jason"
Brownstown,Fayette,2017-03-31,elizbeth.raminez@gmail.com,F,Brownstown,Midwest,IL,62418,71,56449,"Raminez, Elizbeth"
Brighton,Jersey,2016-08-24,rachell.murguia@hotmail.com,F,Brighton,Midwest,IL,62012,66,60130,"Murguia, Rachell"
Snyder,Dodge,2013-01-24,romeo.hopps@outlook.com,M,Snyder,Midwest,NE,68664,38,37011,"Hopps, Romeo"
Saint Louis,St. Louis (city),2012-01-29,darrin.heyer@yahoo.co.in,M,Saint Louis,Midwest,MO,63110,60,60134,"Heyer, Darrin"


In [0]:
%sql

select * from staging_assignment.orders limit 5;

Order_Id,Order_Date,Status,Item_Id,Qty_Ordered,Price,Value,Discount_Amount,Total,Category,Payment_Method,Cust_Id,Year,Month
100354677,2020-10-01,canceled,574771,2.0,549.9,549.9,0.0,549.9,Appliances,Payaxis,42485,2020,Oct-2020
100356116,2020-10-08,order_refunded,577467,1.0,549.9,0.0,0.0,0.0,Appliances,Payaxis,42485,2020,Oct-2020
100358724,2020-10-21,order_refunded,581862,1.0,549.9,0.0,0.0,0.0,Appliances,cod,42485,2020,Oct-2020
100403034,2020-12-24,canceled,656937,2.0,254.8,254.8,39.80628,214.99372,Appliances,Easypay,42485,2020,Dec-2020
100403034,2020-12-24,canceled,656938,2.0,315.5,315.5,49.28916999999999,266.21083,Appliances,Easypay,42485,2020,Dec-2020
