In [0]:
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType

READING ORDER CSV FILES

In [0]:

orders_path = 'dbfs:/FileStore/tables/bronze/orders.csv'

orders_schema = StructType([
    StructField('ORDER_ID', IntegerType(), False),
    StructField('ORDER_DATETIME', StringType(), False),
    StructField('CUSTOMER_ID', IntegerType(), False),
    StructField('ORDER_STATUS', StringType(), False),
    StructField('STORE_ID', IntegerType(), False)
])

orders = spark.read.csv(path=orders_path, header=True, schema=orders_schema)


In [0]:
orders.dtypes

[('ORDER_ID', 'int'),
 ('ORDER_DATETIME', 'string'),
 ('CUSTOMER_ID', 'int'),
 ('ORDER_STATUS', 'string'),
 ('STORE_ID', 'int')]

In [0]:
display(orders)

ORDER_ID,ORDER_DATETIME,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,06-JAN-22 09.35.42.00,355,COMPLETE,1
448,06-JAN-22 10.23.14.00,155,COMPLETE,1
449,06-JAN-22 01.21.54.00,242,COMPLETE,1
450,06-JAN-22 05.57.04.00,49,COMPLETE,1
451,06-JAN-22 10.39.07.00,204,COMPLETE,1
452,07-JAN-22 01.11.46.00,216,COMPLETE,1
453,07-JAN-22 06.53.06.00,4,COMPLETE,4
454,07-JAN-22 03.55.15.00,388,COMPLETE,1
455,07-JAN-22 06.38.38.00,291,COMPLETE,1
456,08-JAN-22 12.52.12.00,272,COMPLETE,1


READING ORDER_ITEMS CSV FILE

In [0]:
order_items_path = 'dbfs:/FileStore/tables/bronze/order_items.csv'

order_items_schema = StructType([
    StructField('ORDER_ID', IntegerType(), False),
    StructField('LINE_ITEM_ID', IntegerType(), False),
    StructField('PRODUCT_ID', IntegerType(), False),
    StructField('UNIT_PRICE', DoubleType(), False),
    StructField('QUANTITY', IntegerType(), False)

])

order_items = spark.read.csv(path=order_items_path, header=True, schema=order_items_schema)


In [0]:
order_items.dtypes

[('ORDER_ID', 'int'),
 ('LINE_ITEM_ID', 'int'),
 ('PRODUCT_ID', 'int'),
 ('UNIT_PRICE', 'double'),
 ('QUANTITY', 'int')]

In [0]:
display(order_items)

ORDER_ID,LINE_ITEM_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,1,26,48.75,1
334,2,46,39.16,4
334,3,12,10.48,4
335,1,32,5.65,2
336,1,2,29.55,5
336,2,20,28.21,5
337,1,32,5.65,4
337,2,29,24.71,4
337,3,45,31.68,3
338,1,35,7.18,2


READING CUSTOMERS CSV FILE

In [0]:
customers_path = 'dbfs:/FileStore/tables/bronze/customers.csv'

customers_schema = StructType([
    StructField('CUSTOMER_ID', StringType(), False),
    StructField('FULL_NAME', StringType(), False),
    StructField('EMAIL_ADDRESS', StringType(), False)
])

customers = spark.read.csv(path=customers_path, header=True, schema=customers_schema)

In [0]:
customers.dtypes

[('CUSTOMER_ID', 'string'),
 ('FULL_NAME', 'string'),
 ('EMAIL_ADDRESS', 'string')]

In [0]:
display(customers)

CUSTOMER_ID,FULL_NAME,EMAIL_ADDRESS
286,Wilfred Welch,wilfred.welch@internalmail
287,Kristina Nunez,kristina.nunez@internalmail
288,Mable Ballard,mable.ballard@internalmail
289,Diane Wilkerson,diane.wilkerson@internalmail
290,Sheryl Banks,sheryl.banks@internalmail
291,Opal Cruz,opal.cruz@internalmail
292,Dale Hughes,dale.hughes@internalmail
293,Diana Fowler,diana.fowler@internalmail
294,Travis Schwartz,travis.schwartz@internalmail
295,Anthony Boone,anthony.boone@internalmail


READING PRODUCT CSV FILE

In [0]:
products_path = 'dbfs:/FileStore/tables/bronze/products.csv'

products_schema = StructType([
    StructField('PRODUCT_ID', IntegerType(), False),
    StructField('PRODUCT_NAME', StringType(), False),
    StructField('UNIT_PRICE', DoubleType(), False)
])

products = spark.read.csv(path=products_path, header=True, schema=products_schema)

In [0]:
products.dtypes

[('PRODUCT_ID', 'int'), ('PRODUCT_NAME', 'string'), ('UNIT_PRICE', 'double')]

In [0]:
display(products)

PRODUCT_ID,PRODUCT_NAME,UNIT_PRICE
16,Women's Socks (Grey),39.89
17,Women's Sweater (Brown),24.46
18,Women's Jacket (Black),14.34
19,Men's Coat (Red),28.21
20,Girl's Shorts (Green),38.34
21,Girl's Pyjamas (White),39.78
22,Men's Shorts (Black),10.33
23,Men's Pyjamas (Blue),48.39
24,Boy's Sweater (Red),9.8
25,Girl's Jeans (Grey),48.75


READING STORE CSV FILE

In [0]:
store_path = 'dbfs:/FileStore/tables/bronze/stores.csv'

store_schema = StructType([
    StructField('STORE_ID', IntegerType(), False),
    StructField('STORE_NAME', StringType(), False),
    StructField('WEB_ADDRESS', StringType(), True),
    StructField('LATITUDE', DoubleType(), False),
    StructField('LONGITUDE', DoubleType(), False)
])

store = spark.read.csv(path=store_path, header=True, schema=store_schema)

In [0]:
store.dtypes

[('STORE_ID', 'int'),
 ('STORE_NAME', 'string'),
 ('WEB_ADDRESS', 'string'),
 ('LATITUDE', 'double'),
 ('LONGITUDE', 'double')]

In [0]:
display(store)

STORE_ID,STORE_NAME,WEB_ADDRESS,LATITUDE,LONGITUDE
1,Online,https://www.example.com,,
2,San Francisco,,37.529395,-122.267237
3,Seattle,,47.6053,-122.33221
4,New York City,,40.745216,-73.980518
5,Chicago,,41.878751,-87.636675
6,London,,51.519281,-0.087296
7,Bucharest,,44.43225,26.10626
8,Berlin,,52.5161,13.3873
9,Utrecht,,52.103263,5.061644
10,Madrid,,40.4929,-3.8737


ORDER TABLE

In [0]:
from pyspark.sql.functions import *

In [0]:
display(orders)

ORDER_ID,ORDER_DATETIME,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,06-JAN-22 09.35.42.00,355,COMPLETE,1
448,06-JAN-22 10.23.14.00,155,COMPLETE,1
449,06-JAN-22 01.21.54.00,242,COMPLETE,1
450,06-JAN-22 05.57.04.00,49,COMPLETE,1
451,06-JAN-22 10.39.07.00,204,COMPLETE,1
452,07-JAN-22 01.11.46.00,216,COMPLETE,1
453,07-JAN-22 06.53.06.00,4,COMPLETE,4
454,07-JAN-22 03.55.15.00,388,COMPLETE,1
455,07-JAN-22 06.38.38.00,291,COMPLETE,1
456,08-JAN-22 12.52.12.00,272,COMPLETE,1


In [0]:
orders_df = orders.select('ORDER_ID', \
    to_timestamp(orders['ORDER_DATETIME'], "dd-MMM-yy kk.mm.ss.SS").alias('ORDER_TIMESTAMP'), 'CUSTOMER_ID','ORDER_STATUS','STORE_ID')

In [0]:
orders_df.dtypes

[('ORDER_ID', 'int'),
 ('ORDER_TIMESTAMP', 'timestamp'),
 ('CUSTOMER_ID', 'int'),
 ('ORDER_STATUS', 'string'),
 ('STORE_ID', 'int')]

In [0]:
orders_df = orders_df.drop('ORDER_DATETIME')

In [0]:
orders_df.display()

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,2022-01-06T09:35:42.000+0000,355,COMPLETE,1
448,2022-01-06T10:23:14.000+0000,155,COMPLETE,1
449,2022-01-06T01:21:54.000+0000,242,COMPLETE,1
450,2022-01-06T05:57:04.000+0000,49,COMPLETE,1
451,2022-01-06T10:39:07.000+0000,204,COMPLETE,1
452,2022-01-07T01:11:46.000+0000,216,COMPLETE,1
453,2022-01-07T06:53:06.000+0000,4,COMPLETE,4
454,2022-01-07T03:55:15.000+0000,388,COMPLETE,1
455,2022-01-07T06:38:38.000+0000,291,COMPLETE,1
456,2022-01-08T12:52:12.000+0000,272,COMPLETE,1


In [0]:
orders_df = orders_df.filter(orders_df['ORDER_STATUS']=='COMPLETE')

In [0]:
store_df = store.withColumnRenamed('STORE_ID', 'STORE_ID_store')

In [0]:
orders_df = orders_df.join(store_df, orders_df.STORE_ID==store_df.STORE_ID_store)

In [0]:
orders_df.display()

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,ORDER_STATUS,STORE_ID,STORE_ID_store,STORE_NAME,WEB_ADDRESS,LATITUDE,LONGITUDE
447,2022-01-06T09:35:42.000+0000,355,COMPLETE,1,1,Online,https://www.example.com,,
448,2022-01-06T10:23:14.000+0000,155,COMPLETE,1,1,Online,https://www.example.com,,
449,2022-01-06T01:21:54.000+0000,242,COMPLETE,1,1,Online,https://www.example.com,,
450,2022-01-06T05:57:04.000+0000,49,COMPLETE,1,1,Online,https://www.example.com,,
451,2022-01-06T10:39:07.000+0000,204,COMPLETE,1,1,Online,https://www.example.com,,
452,2022-01-07T01:11:46.000+0000,216,COMPLETE,1,1,Online,https://www.example.com,,
453,2022-01-07T06:53:06.000+0000,4,COMPLETE,4,4,New York City,,40.745216,-73.980518
454,2022-01-07T03:55:15.000+0000,388,COMPLETE,1,1,Online,https://www.example.com,,
455,2022-01-07T06:38:38.000+0000,291,COMPLETE,1,1,Online,https://www.example.com,,
456,2022-01-08T12:52:12.000+0000,272,COMPLETE,1,1,Online,https://www.example.com,,


In [0]:
orders_df = orders_df.select('ORDER_ID', 'ORDER_TIMESTAMP', 'CUSTOMER_ID', 'STORE_NAME')

In [0]:
display(orders_df)

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,STORE_NAME
447,2022-01-06T09:35:42.000+0000,355,Online
448,2022-01-06T10:23:14.000+0000,155,Online
449,2022-01-06T01:21:54.000+0000,242,Online
450,2022-01-06T05:57:04.000+0000,49,Online
451,2022-01-06T10:39:07.000+0000,204,Online
452,2022-01-07T01:11:46.000+0000,216,Online
453,2022-01-07T06:53:06.000+0000,4,New York City
454,2022-01-07T03:55:15.000+0000,388,Online
455,2022-01-07T06:38:38.000+0000,291,Online
456,2022-01-08T12:52:12.000+0000,272,Online


In [0]:
order_items_df = order_items.drop('LINE_ITEM_ID')
order_items_df.display()

ORDER_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,26,48.75,1
334,46,39.16,4
334,12,10.48,4
335,32,5.65,2
336,2,29.55,5
336,20,28.21,5
337,32,5.65,4
337,29,24.71,4
337,45,31.68,3
338,35,7.18,2


In [0]:
products_df = products
customers_df = customers

SAVING TABLES AS PARQUET FILES

In [0]:
orders_df.write.parquet('dbfs:/FileStore/tables/silver/orders', mode='overwrite')
order_items_df.write.parquet('dbfs:/FileStore/tables/silver/orders_items', mode='overwrite')
products_df.write.parquet('dbfs:/FileStore/tables/silver/products', mode='overwrite')
customers_df.write.parquet('dbfs:/FileStore/tables/silver/customers', mode='overwrite')