In [0]:
# create sales dataframe
file_location = "/FileStore/tables/sales.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
sales_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

sales_df.printSchema()

root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- SALES: double (nullable = true)
 |-- ORDERDATE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: integer (nullable = true)
 |-- MONTH_ID: integer (nullable = true)
 |-- YEAR_ID: integer (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- DEALSIZE: string (nullable = true)
 |-- CUSTOMERID: integer (nullable = true)



In [0]:
# create customer dimension

file_location = "/FileStore/tables/customers.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
customers_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(customers_df.take(10))

CUSTOMERID,CUSTOMERNAME,EMAIL,CITY,COUNTRY,TERRITORY,CONTACTFIRSTNAME,CONTACTLASTNAME
1,Land of Toys Inc.,gladys.rim@rim.org,NYC,USA,,James,Butt
2,Reims Collectables,yuki_whobrey@aol.com,Reims,France,EMEA,Josephine,Darakjy
3,Lyon Souveniers,fletcher.flosi@yahoo.com,Paris,France,EMEA,Art,Venere
4,Toys4GrownUps.com,bette_nicka@cox.net,Pasadena,USA,,Lenna,Paprocki
5,Corporate Gift Ideas Co.,vinouye@aol.com,San Francisco,USA,,Donette,Foller
6,Technics Stores Inc.,gladys.rim@rim.org,Burlingame,USA,,Simona,Morasca
7,Daedalus Designs Imports,yuki_whobrey@aol.com,Lille,France,EMEA,Mitsue,Tollner
8,Herkku Gifts,fletcher.flosi@yahoo.com,Bergen,Norway,EMEA,Leota,Dilliard
9,Mini Wheels Co.,bette_nicka@cox.net,San Francisco,USA,,Sage,Wieser
10,Auto Canal Petit,vinouye@aol.com,Paris,France,EMEA,Kris,Marrier


In [0]:
from pyspark.sql.functions import col

for column in sales_df.columns:
  sales_df = sales_df.withColumn(column.lower(),col(column))

for column in customers_df.columns:
  customers_df = customers_df.withColumn(column.lower(),col(column))
  

display(sales_df.take(10))

ordernumber,quantityordered,priceeach,orderlinenumber,sales,orderdate,status,qtr_id,month_id,year_id,productline,msrp,productcode,dealsize,customerid
10107,30,95.7,2,2871.0,2003-02-24 00:00:00,Shipped,1,2,2003,Motorcycles,95,S10_1678,Small,1
10121,34,81.35,5,2765.9,2003-05-07 00:00:00,Shipped,2,5,2003,Motorcycles,95,S10_1678,Small,2
10134,41,94.74,2,3884.34,2003-07-01 00:00:00,Shipped,3,7,2003,Motorcycles,95,S10_1678,Medium,3
10145,45,83.26,6,3746.7,2003-08-25 00:00:00,Shipped,3,8,2003,Motorcycles,95,S10_1678,Medium,4
10159,49,100.0,14,5205.27,2003-10-10 00:00:00,Shipped,4,10,2003,Motorcycles,95,S10_1678,Medium,5
10168,36,96.66,1,3479.76,2003-10-28 00:00:00,Shipped,4,10,2003,Motorcycles,95,S10_1678,Medium,6
10180,29,86.13,9,2497.77,2003-11-11 00:00:00,Shipped,4,11,2003,Motorcycles,95,S10_1678,Small,7
10188,48,100.0,1,5512.32,2003-11-18 00:00:00,Shipped,4,11,2003,Motorcycles,95,S10_1678,Medium,8
10201,22,98.57,2,2168.54,2003-12-01 00:00:00,Shipped,4,12,2003,Motorcycles,95,S10_1678,Small,9
10211,41,100.0,14,4708.44,2004-01-15 00:00:00,Shipped,1,1,2004,Motorcycles,95,S10_1678,Medium,10


In [0]:
display(customers_df.take(10))

customerid,customername,email,city,country,territory,contactfirstname,contactlastname
1,Land of Toys Inc.,gladys.rim@rim.org,NYC,USA,,James,Butt
2,Reims Collectables,yuki_whobrey@aol.com,Reims,France,EMEA,Josephine,Darakjy
3,Lyon Souveniers,fletcher.flosi@yahoo.com,Paris,France,EMEA,Art,Venere
4,Toys4GrownUps.com,bette_nicka@cox.net,Pasadena,USA,,Lenna,Paprocki
5,Corporate Gift Ideas Co.,vinouye@aol.com,San Francisco,USA,,Donette,Foller
6,Technics Stores Inc.,gladys.rim@rim.org,Burlingame,USA,,Simona,Morasca
7,Daedalus Designs Imports,yuki_whobrey@aol.com,Lille,France,EMEA,Mitsue,Tollner
8,Herkku Gifts,fletcher.flosi@yahoo.com,Bergen,Norway,EMEA,Leota,Dilliard
9,Mini Wheels Co.,bette_nicka@cox.net,San Francisco,USA,,Sage,Wieser
10,Auto Canal Petit,vinouye@aol.com,Paris,France,EMEA,Kris,Marrier


In [0]:
display(sales_df.dtypes)

_1,_2
ordernumber,int
quantityordered,int
priceeach,double
orderlinenumber,int
sales,double
orderdate,string
status,string
qtr_id,int
month_id,int
year_id,int


In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

### Parser orderdate column to timestamp column

In [0]:
# Create a view or table
from pyspark.sql.functions import from_unixtime,unix_timestamp,split
sales_df = sales_df.withColumn('orderdate',split(sales_df.orderdate.cast('string'),' ').getItem(0))
sales_df = sales_df.withColumn('orderdate',unix_timestamp(sales_df.orderdate,'MM/dd/yyyy'))
sales_df = sales_df.withColumn('orderdate',from_unixtime(sales_df.orderdate))
display(sales_df)

ordernumber,quantityordered,priceeach,orderlinenumber,sales,orderdate,status,qtr_id,month_id,year_id,productline,msrp,productcode,dealsize,customerid
10107,30,95.7,2,2871.0,2003-02-24 00:00:00,Shipped,1,2,2003,Motorcycles,95,S10_1678,Small,1
10121,34,81.35,5,2765.9,2003-05-07 00:00:00,Shipped,2,5,2003,Motorcycles,95,S10_1678,Small,2
10134,41,94.74,2,3884.34,2003-07-01 00:00:00,Shipped,3,7,2003,Motorcycles,95,S10_1678,Medium,3
10145,45,83.26,6,3746.7,2003-08-25 00:00:00,Shipped,3,8,2003,Motorcycles,95,S10_1678,Medium,4
10159,49,100.0,14,5205.27,2003-10-10 00:00:00,Shipped,4,10,2003,Motorcycles,95,S10_1678,Medium,5
10168,36,96.66,1,3479.76,2003-10-28 00:00:00,Shipped,4,10,2003,Motorcycles,95,S10_1678,Medium,6
10180,29,86.13,9,2497.77,2003-11-11 00:00:00,Shipped,4,11,2003,Motorcycles,95,S10_1678,Small,7
10188,48,100.0,1,5512.32,2003-11-18 00:00:00,Shipped,4,11,2003,Motorcycles,95,S10_1678,Medium,8
10201,22,98.57,2,2168.54,2003-12-01 00:00:00,Shipped,4,12,2003,Motorcycles,95,S10_1678,Small,9
10211,41,100.0,14,4708.44,2004-01-15 00:00:00,Shipped,1,1,2004,Motorcycles,95,S10_1678,Medium,10


### Aggregation using direct method over grouped dataframe

In [0]:
sales_cnt_agg_yr = sales_df.groupby('year_id').count()
display(sales_cnt_agg_yr)

sales_sum_agg_yr = sales_df.groupby('year_id','qtr_id').sum('sales')
display(sales_sum_agg_yr)

year_id,count
2003,1000
2004,1345
2005,478


year_id,qtr_id,sum(sales)
2003,2,562365.22
2003,3,649514.5399999999
2003,1,445094.6900000002
2004,4,2014774.9199999997
2005,1,1071992.3600000003
2004,2,766260.7299999996
2003,4,1860005.0899999987
2004,1,833730.6800000005
2004,3,1109396.2700000005
2005,2,719494.3500000001


### Aggregation by passing aggregate expressions over columns to .agg() method

In [0]:
from pyspark.sql import functions as f

display(sales_df.groupby("year_id","qtr_id","month_id")\
        .agg(f.sum("sales").cast("decimal(10,4)").alias("total_sales"),
             f.avg("sales").cast("decimal(10,4)").alias("average_sales"),
             f.countDistinct("ordernumber").alias("total_orders"))
          )

year_id,qtr_id,month_id,total_sales,average_sales,total_orders
2005,1,3,374262.76,3530.7808,12
2003,1,3,174504.9,3490.098,6
2003,4,12,261876.46,3741.0923,7
2003,1,1,129753.6,3327.0154,5
2003,4,10,568290.97,3596.7783,17
2004,4,12,372802.66,3389.1151,11
2004,2,4,206148.12,3221.0644,10
2004,4,10,552924.25,3477.511,13
2003,2,4,201609.55,3476.0267,7
2003,1,2,140836.19,3435.029,3


### Aggreation by passing dictionary of columns and aggreation exp to .agg() method

In [0]:
display(sales_df.groupby('customerid').agg({'sales':'sum','ordernumber':'min'}).limit(10))

customerid,min(ordernumber),sum(sales)
148,10270,4302.08
463,10278,4667.86
471,10365,2611.8
496,10346,1516.62
833,10125,6483.46
1088,10383,3340.48
1238,10386,1266.1
1342,10425,4325.16
1580,10285,2733.12
1591,10105,6341.21


In [0]:
display(sales_df.summary())

summary,ordernumber,quantityordered,priceeach,orderlinenumber,sales,orderdate,status,qtr_id,month_id,year_id,productline,msrp,productcode,dealsize,customerid
count,2823.0,2823.0,2823.0,2823.0,2823.0,2823,2823,2823.0,2823.0,2823.0,2823,2823.0,2823,2823,2823.0
mean,10258.725115125751,35.09280906836698,83.65854410201929,6.466170740347148,3553.88907190932,,,2.717676230959972,7.092454835281616,2003.8150903294368,,100.71555083244776,,,1412.0
stddev,92.0854775957196,9.74144273706958,20.174276527840536,4.22584096469094,1841.865105740184,,,1.203878088001756,3.656633307661765,0.6996701541300869,,40.18791167720266,,,815.0742297484322
min,10100.0,6.0,26.88,1.0,482.13,2003-01-06 00:00:00,Cancelled,1.0,1.0,2003.0,Classic Cars,33.0,S10_1678,Large,1.0
25%,10180.0,27.0,68.8,3.0,2203.11,,,2.0,4.0,2003.0,,68.0,,,706.0
50%,10262.0,35.0,95.7,6.0,3184.8,,,3.0,8.0,2004.0,,99.0,,,1412.0
75%,10334.0,43.0,100.0,9.0,4508.0,,,4.0,11.0,2004.0,,124.0,,,2118.0
max,10425.0,97.0,100.0,18.0,14082.8,2005-05-31 00:00:00,Shipped,4.0,12.0,2005.0,Vintage Cars,214.0,S72_3212,Small,2823.0
