In [0]:
%sql CREATE  TABLE  IF  NOT  EXISTS  DEA_DB.fact_jayant_kaushik 
(orderNumber int,
orderDate date,
customerNumber int,
customerName string,
customerCity string,
customerState string,
employeeNumber int,
employeeName string,
employeeDesignation string,
productId string,
productName string,
productVendor string,
priceEach decimal(10,2),
quantityOrdered int,
loadTimestamp timestamp,
mhfUser string
)
USING delta


In [0]:
orders = spark.read.format('csv').options(sep="|", header='true', inferSchema='true').load('/jayant_kaushik/orders.csv')
customers = spark.read.format('csv').options(sep="|", header='true', inferSchema='true').load('/jayant_kaushik/customers.csv')
employees = spark.read.format('csv').options(sep="|", header='true', inferSchema='true').load('/jayant_kaushik/employees.csv')
orderdetails = spark.read.format('csv').options(sep="|", header='true', inferSchema='true').load('/jayant_kaushik/orderdetails.csv')
products = spark.read.format('csv').options(sep="|", header='true', inferSchema='true').load('/jayant_kaushik/products.csv')


In [0]:
#display (orders.head(2))
#display (customers)
#display (employees.head(2))
#display (orderdetails.head(2))
#display (products.head(2))

In [0]:
"""
orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")
employees.createOrReplaceTempView("employees")
orderdetails.createOrReplaceTempView("orderdetails")
products.createOrReplaceTempView("products")
"""

In [0]:
joined_df2 = orders.join(customers, orders.customerNumber == customers.customerNumber, 'left' ).drop(orders.customerNumber)
joined_df2 = employees.join(joined_df2, employees.employeeNumber == joined_df2.salesRepEmployeeNumber, 'left')
joined_df2 = orderdetails.join(joined_df2, orderdetails.orderNumber == joined_df2.orderNumber, 'left').drop(orderdetails.orderNumber)
joined_df2 = products.join(joined_df2, products.productCode == joined_df2.productCode, 'left').drop(products.productCode)


In [0]:
final_df = joined_df2.select(['orderNumber','orderDate','customerNumber','customerName','city','state','employeeNumber','firstName', 'lastName', 'jobTitle','productCode','productName','productVendor','priceEach','quantityOrdered'])

In [0]:
from pyspark.sql import functions as sf
final_df = final_df.withColumn('employeeName', 

                    sf.concat(sf.col('firstName'),sf.lit(' '), sf.col('lastName'))).drop('firstName','lastName')


In [0]:
final_df = final_df.withColumnRenamed('productCode', 'productId') \
  .withColumnRenamed('jobTitle', 'employeeDesignation') \
  .withColumnRenamed('city', 'customerCity') \
  .withColumnRenamed('state', 'customerState')

In [0]:
from pyspark.sql.functions import lit, current_timestamp
final_df = final_df.withColumn('mhfUser', lit('jayant_kaushik'))
final_df = final_df.withColumn("loadTimestamp",current_timestamp())


In [0]:
final_df = final_df.na.fill({"customerState": "Unknown"})

In [0]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
final_df = final_df.withColumn("orderDate",final_df.orderDate.cast('date'))

In [0]:
final_df.createOrReplaceTempView("final_df_sql")

In [0]:
%sql
MERGE INTO DEA_DB.fact_jayant_kaushik as fact
USING final_df_sql as final
ON fact.orderNumber = final.orderNumber
and fact.productId = final.productId
WHEN MATCHED THEN
  UPDATE SET 
  fact.orderNumber = final.orderNumber,
fact.orderDate = final.orderDate,
fact.customerNumber = final.customerNumber,
fact.customerName = final.customerName,
fact.customerCity = final.customerCity,
fact.customerState = final.customerState,
fact.employeeNumber = final.employeeNumber,
fact.employeeName = final.employeeName,
fact.employeeDesignation = final.employeeDesignation,
fact.productId = final.productId,
fact.productName = final.productName,
fact.productVendor = final.productVendor,
fact.priceEach = final.priceEach,
fact.quantityOrdered = final.quantityOrdered,
fact.loadTimestamp = final.loadTimestamp,
fact.mhfUser = final.mhfUser

WHEN NOT MATCHED
  THEN INSERT *

In [0]:
%sql
Select * from DEA_DB.fact_jayant_kaushik
--DELETE FROM DEA_DB.fact_jayant_kaushik WHERE mhfUser = 'jayant_kaushik';

orderNumber,orderDate,customerNumber,customerName,customerCity,customerState,employeeNumber,employeeName,employeeDesignation,productId,productName,productVendor,priceEach,quantityOrdered,loadTimestamp,mhfUser
10217.0,2004-02-04,166.0,Handji Gifts& Co,Singapore,Unknown,1612.0,Peter Marsh,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,118.66,38.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10305.0,2004-10-13,286.0,Marta's Replicas Co.,Cambridge,MA,1216.0,Steve Patterson,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,112.6,22.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10175.0,2003-11-06,324.0,"Stylish Desk Decors, Co.",London,Unknown,1501.0,Larry Bott,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,102.92,47.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10270.0,2004-07-19,282.0,Souveniers And Things Co.,Chatswood,NSW,1611.0,Andy Fixter,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,107.76,38.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10357.0,2004-12-10,124.0,Mini Gifts Distributors Ltd.,San Rafael,CA,1165.0,Leslie Jennings,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,105.34,28.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10229.0,2004-03-11,124.0,Mini Gifts Distributors Ltd.,San Rafael,CA,1165.0,Leslie Jennings,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,119.87,41.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10245.0,2004-05-04,455.0,Super Scale Inc.,New Haven,CT,1286.0,Foon Yue Tseng,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,111.39,21.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10370.0,2005-01-20,276.0,"Anna's Decorations, Ltd",North Sydney,NSW,1611.0,Andy Fixter,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,105.34,29.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10126.0,2003-05-28,458.0,"Corrida Auto Replicas, Ltd",Madrid,Unknown,1702.0,Martin Gerard,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,102.92,50.0,2021-06-28T11:35:06.793+0000,jayant_kaushik
10164.0,2003-10-21,452.0,Mini Auto Werke,Graz,Unknown,1401.0,Pamela Castillo,Sales Rep,S18_4600,1940s Ford truck,Motor City Art Classics,107.76,45.0,2021-06-28T11:35:06.793+0000,jayant_kaushik


In [0]:
%sql

--drop table DEA_DB.fact_jayant_kaushik
 --(orderNumber,orderDate,customerNumber,customerName,customerCity,customerState,employeeNumber,employeeName,employeeDesignation,productId,productName,productVendor,priceEach,quantityOrdered,loadTimestamp,mhfUser)
 -- VALUES (final.orderNumber,final.orderDate,final.customerNumber,final.customerName,final.customerCity,final.customerState,final.employeeNumber,final.employeeName,final.employeeDesignation,final.productId,final.productName,final.productVendor,final.priceEach,final.quantityOrdered,final.loadTimestamp,final.mhfUser)

In [0]:
%sql
Select f.customerName as Customers, sum(f.priceEach * f.quantityOrdered) as Revenue from DEA_DB.fact_jayant_kaushik as f
where f.orderNumber IS NOT NULL
group by f.customerName
order by Revenue desc
limit 5

Customers,Revenue
Euro+ Shopping Channel,820689.54
Mini Gifts Distributors Ltd.,591827.34
"Australian Collectors, Co.",180585.07
Muscle Machine Inc,177913.95
La Rochelle Gifts,158573.12


In [0]:
%sql
Select productName as Product, sum(quantityOrdered) as Quantity from DEA_DB.fact_jayant_kaushik as f
where f.orderNumber IS NOT NULL
group by f.productName
order by Quantity desc
limit 5

Product,Quantity
1992 Ferrari 360 Spider red,1808
1937 Lincoln Berline,1111
American Airlines: MD-11S,1085
1941 Chevrolet Special Deluxe Cabriolet,1076
1930 Buick Marquette Phaeton,1074


In [0]:
%sql
Select f.customerCity as City, sum(f.priceEach * f.quantityOrdered) as Revenue from DEA_DB.fact_jayant_kaushik as f
where f.orderNumber IS NOT NULL
group by f.customerCity
order by Revenue desc
limit 5

City,Revenue
Madrid,979880.77
San Rafael,591827.34
NYC,497941.5
Auckland,292082.87
Singapore,263997.78


In [0]:
%sql
Select (YEAR(f.orderDate)) as Year, sum(f.priceEach * f.quantityOrdered) as Revenue from DEA_DB.fact_jayant_kaushik as f
where f.orderNumber IS NOT NULL
group by YEAR(f.orderDate)
order by Year
limit 5

Year,Revenue
2003,3317348.39
2004,4515905.51
2005,1770936.71
