In [0]:
oct_events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    inferSchema=True
)

nov_events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    header=True,
    inferSchema=True
)

## **Sample Dataset for practising**

In [0]:
# 1. Define the raw data as a list of tuples
Salesdata = [
    ("Prod001", 10, 300, "Virginia"),
    ("Prod002", 20, 500, "Virginia"),
    ("Prod003", 30, 460, "Virginia"),
    ("Prod023", 30, 460, "Virginia"),
    ("Prod004", 40, 987, "Virginia"),
    ("Prod005", 40, 987, "Virginia"),
    ("Prod001", 10, 1300, "Georgia"),
    ("Prod002", 20, 550, "Georgia"),
    ("Prod003", 30, 480, "Georgia"),
    ("Prod004", 40, 240, "Georgia"),
    ("Prod001", 10, 1100, "New York"),
    ("Prod002", 20, 530, "New York")
]

# 2. Define the schema string
SalesdataColumns = "product string, quantity int, salesamount int, state string"

# 3. Create the DataFrame
salesdf = spark.createDataFrame(data=Salesdata, schema=SalesdataColumns)

# 4. Display the results
salesdf.display()

product,quantity,salesamount,state
Prod001,10,300,Virginia
Prod002,20,500,Virginia
Prod003,30,460,Virginia
Prod023,30,460,Virginia
Prod004,40,987,Virginia
Prod005,40,987,Virginia
Prod001,10,1300,Georgia
Prod002,20,550,Georgia
Prod003,30,480,Georgia
Prod004,40,240,Georgia


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

window_criteria=Window.partitionBy("state").orderBy(desc("salesamount"))
final_result = salesdf.withColumn("rank",rank().over(window_criteria))
display(final_result)

product,quantity,salesamount,state,rank
Prod001,10,1300,Georgia,1
Prod002,20,550,Georgia,2
Prod003,30,480,Georgia,3
Prod004,40,240,Georgia,4
Prod001,10,1100,New York,1
Prod002,20,530,New York,2
Prod004,40,987,Virginia,1
Prod005,40,987,Virginia,1
Prod002,20,500,Virginia,3
Prod003,30,460,Virginia,4


In [0]:
final_result = salesdf.withColumn("rank_dense",dense_rank().over(window_criteria))
display(final_result)

product,quantity,salesamount,state,rank_dense
Prod001,10,1300,Georgia,1
Prod002,20,550,Georgia,2
Prod003,30,480,Georgia,3
Prod004,40,240,Georgia,4
Prod001,10,1100,New York,1
Prod002,20,530,New York,2
Prod004,40,987,Virginia,1
Prod005,40,987,Virginia,1
Prod002,20,500,Virginia,2
Prod003,30,460,Virginia,3


In [0]:
final_result = salesdf.withColumn("row_num",row_number().over(window_criteria))
display(final_result)

product,quantity,salesamount,state,row_num
Prod001,10,1300,Georgia,1
Prod002,20,550,Georgia,2
Prod003,30,480,Georgia,3
Prod004,40,240,Georgia,4
Prod001,10,1100,New York,1
Prod002,20,530,New York,2
Prod004,40,987,Virginia,1
Prod005,40,987,Virginia,2
Prod002,20,500,Virginia,3
Prod003,30,460,Virginia,4


In [0]:
final_result = salesdf.withColumn("previous_Value",lag("salesamount",1).over(window_criteria)) \
                      .withColumn("next_Value",lead("salesamount",1).over(window_criteria))
display(final_result)

product,quantity,salesamount,state,previous_Value,next_Value
Prod001,10,1300,Georgia,,550.0
Prod002,20,550,Georgia,1300.0,480.0
Prod003,30,480,Georgia,550.0,240.0
Prod004,40,240,Georgia,480.0,
Prod001,10,1100,New York,,530.0
Prod002,20,530,New York,1100.0,
Prod004,40,987,Virginia,,987.0
Prod005,40,987,Virginia,987.0,500.0
Prod002,20,500,Virginia,987.0,460.0
Prod003,30,460,Virginia,500.0,460.0


In [0]:
window_criteria_2 = Window.partitionBy("state")
final_result = (
    salesdf
    .withColumn("total_sales", sum("salesamount").over(window_criteria_2))
    .withColumn("avg_sales", round(avg("salesamount").over(window_criteria_2), 2))
    .withColumn("minimum_sales", min("salesamount").over(window_criteria_2))
    .withColumn("maximum_sales", max("salesamount").over(window_criteria_2))
)
display(final_result.orderBy(desc("total_sales")))

product,quantity,salesamount,state,total_sales,avg_sales,minimum_sales,maximum_sales
Prod001,10,300,Virginia,3694,615.67,300,987
Prod002,20,500,Virginia,3694,615.67,300,987
Prod003,30,460,Virginia,3694,615.67,300,987
Prod023,30,460,Virginia,3694,615.67,300,987
Prod004,40,987,Virginia,3694,615.67,300,987
Prod005,40,987,Virginia,3694,615.67,300,987
Prod001,10,1300,Georgia,2570,642.5,240,1300
Prod002,20,550,Georgia,2570,642.5,240,1300
Prod003,30,480,Georgia,2570,642.5,240,1300
Prod004,40,240,Georgia,2570,642.5,240,1300


In [0]:

display(oct_events.limit(10))

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-01T00:00:00.000Z,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
2019-10-01T00:00:00.000Z,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2019-10-01T00:00:01.000Z,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
2019-10-01T00:00:01.000Z,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
2019-10-01T00:00:04.000Z,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d
2019-10-01T00:00:05.000Z,view,1480613,2053013561092866779,computers.desktop,pulser,908.62,512742880,0d0d91c2-c9c2-4e81-90a5-86594dec0db9
2019-10-01T00:00:08.000Z,view,17300353,2053013553853497655,,creed,380.96,555447699,4fe811e9-91de-46da-90c3-bbd87ed3a65d
2019-10-01T00:00:08.000Z,view,31500053,2053013558031024687,,luminarc,41.16,550978835,6280d577-25c8-4147-99a7-abc6048498d6
2019-10-01T00:00:10.000Z,view,28719074,2053013565480109009,apparel.shoes.keds,baden,102.71,520571932,ac1cd4e5-a3ce-4224-a2d7-ff660a105880
2019-10-01T00:00:11.000Z,view,1004545,2053013555631882655,electronics.smartphone,huawei,566.01,537918940,406c46ed-90a4-4787-a43b-59a410c1a5fb


In [0]:
display(nov_events.limit(10))

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-11-01T00:00:00.000Z,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
2019-11-01T00:00:00.000Z,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2019-11-01T00:00:01.000Z,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
2019-11-01T00:00:01.000Z,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
2019-11-01T00:00:01.000Z,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2
2019-11-01T00:00:01.000Z,view,1306894,2053013558920217191,computers.notebook,hp,360.09,520772685,816a59f3-f5ae-4ccd-9b23-82aa8c23d33c
2019-11-01T00:00:01.000Z,view,1306421,2053013558920217191,computers.notebook,hp,514.56,514028527,df8184cc-3694-4549-8c8c-6b5171877376
2019-11-01T00:00:02.000Z,view,15900065,2053013558190408249,,rondell,30.86,518574284,5e6ef132-4d7c-4730-8c7f-85aa4082588f
2019-11-01T00:00:02.000Z,view,12708937,2053013553559896355,,michelin,72.72,532364121,0a899268-31eb-46de-898d-09b2da950b24
2019-11-01T00:00:02.000Z,view,1004258,2053013555631882655,electronics.smartphone,apple,732.07,532647354,d2d3d2c6-631d-489e-9fb5-06f340b85be0


In [0]:
append_events = oct_events.unionByName(nov_events)
display(append_events.limit(10))

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-01T00:00:00.000Z,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
2019-10-01T00:00:00.000Z,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2019-10-01T00:00:01.000Z,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
2019-10-01T00:00:01.000Z,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
2019-10-01T00:00:04.000Z,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d
2019-10-01T00:00:05.000Z,view,1480613,2053013561092866779,computers.desktop,pulser,908.62,512742880,0d0d91c2-c9c2-4e81-90a5-86594dec0db9
2019-10-01T00:00:08.000Z,view,17300353,2053013553853497655,,creed,380.96,555447699,4fe811e9-91de-46da-90c3-bbd87ed3a65d
2019-10-01T00:00:08.000Z,view,31500053,2053013558031024687,,luminarc,41.16,550978835,6280d577-25c8-4147-99a7-abc6048498d6
2019-10-01T00:00:10.000Z,view,28719074,2053013565480109009,apparel.shoes.keds,baden,102.71,520571932,ac1cd4e5-a3ce-4224-a2d7-ff660a105880
2019-10-01T00:00:11.000Z,view,1004545,2053013555631882655,electronics.smartphone,huawei,566.01,537918940,406c46ed-90a4-4787-a43b-59a410c1a5fb


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

display("oct_events total rows : ",oct_events.count())
display("nov_events total rows : ",nov_events.count())

'oct_events total rows : '

42448764

'nov_events total rows : '

67501979

In [0]:
display("appnd_tables rows : ",append_events.count())

'appnd_tables rows : '

109950743

In [0]:
event_type_sales_million = (
    append_events
    .filter(col("event_type") == "purchase")
    .groupBy("event_type")
    .agg(
        round(sum("price") / 1_000_000, 2).alias("total_sales_mn"),
        round(count("*") / 1_000_000, 2).alias("total_orders_mn")
    )
)
display(event_type_sales_million)

event_type,total_sales_mn,total_orders_mn
purchase,505.15,1.66


In [0]:
brand_sales = (
    append_events
    .filter(
        (col("event_type") == "purchase") &
        (col("brand").isNotNull())
    )
    .groupBy("brand")
    .agg(
        round(sum("price") / 1_000_000, 2).alias("total_sales_mn"),
        count("*").alias("total_orders")
    )
)

In [0]:
display(brand_sales.orderBy(desc("total_sales_mn")))

brand,total_sales_mn,total_orders
apple,238.72,308937
samsung,101.28,372923
xiaomi,20.45,124908
huawei,9.66,47204
lg,8.63,21606
acer,6.92,13284
lucente,6.65,26137
sony,6.34,17038
oppo,5.9,25971
lenovo,4.45,11125


In [0]:
window_spec = (
    Window
    .partitionBy("brand")
    .orderBy("event_time")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

In [0]:
running_sales_df = (
    append_events
    .filter(
        (col("event_type") == "purchase") &
        (col("brand").isNotNull())
        )
    .withColumn("running_sales", sum("price").over(window_spec))
)

In [0]:
display(running_sales_df.select("brand", "event_time", "price", "running_sales").limit(30))

brand,event_time,price,running_sales
a-case,2019-10-01T06:12:28.000Z,2.55,2.55
a-case,2019-10-01T06:14:29.000Z,2.55,5.1
a-case,2019-10-02T04:42:39.000Z,4.61,9.71
a-case,2019-10-02T08:25:02.000Z,5.12,14.830000000000002
a-case,2019-10-02T09:07:22.000Z,5.12,19.950000000000003
a-case,2019-10-02T09:16:40.000Z,5.12,25.070000000000004
a-case,2019-10-02T15:33:44.000Z,4.09,29.160000000000004
a-case,2019-10-03T03:16:15.000Z,1.26,30.420000000000005
a-case,2019-10-03T05:33:45.000Z,4.09,34.510000000000005
a-case,2019-10-04T04:28:11.000Z,5.12,39.63


In [0]:
from pyspark.sql import functions as F, types as T

rows_customers = [
    (1, "Asha", "IN", True),
    (2, "Bob", "US", False),
    (3, "Chen", "CN", True),
    (4, "Diana", "US", None),
    (None, "Ghost", "UK", False),      # NULL key to dem
]

rows_orders = [
    (101, 1, 120.0, "IN"),
    (102, 1, 80.0, "IN"),
    (103, 2, 50.0, "US"),
    (104, 5, 30.0, "DE"),             # no matching cus
    (105, 3, 200.0, "CN"),
    (106, None, 15.0, "UK"),          # NULL key won't
    (107, 3, 40.0, "CN"),
    (108, 2, 75.0, "US"),
]

schema_customers = T.StructType([
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("name",        T.StringType(),  True),
    T.StructField("country",     T.StringType(),  True),
    T.StructField("vip",         T.BooleanType(), True),
])

schema_orders = T.StructType([
    T.StructField("order_id",    T.IntegerType(), True),
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("amount",      T.DoubleType(),  True),
    T.StructField("country",     T.StringType(),  True), 
])

df_customers = spark.createDataFrame(rows_customers, schema_customers)
df_orders    = spark.createDataFrame(rows_orders,    schema_orders)

display(df_customers)
display(df_orders)

customer_id,name,country,vip
1.0,Asha,IN,True
2.0,Bob,US,False
3.0,Chen,CN,True
4.0,Diana,US,
,Ghost,UK,False


order_id,customer_id,amount,country
101,1.0,120.0,IN
102,1.0,80.0,IN
103,2.0,50.0,US
104,5.0,30.0,DE
105,3.0,200.0,CN
106,,15.0,UK
107,3.0,40.0,CN
108,2.0,75.0,US


In [0]:
df_inner =df_orders.join(df_customers,on='customer_id',how='inner')
display(df_inner)

customer_id,order_id,amount,country,name,country.1,vip
1,101,120.0,IN,Asha,IN,True
1,102,80.0,IN,Asha,IN,True
2,103,50.0,US,Bob,US,False
3,105,200.0,CN,Chen,CN,True
3,107,40.0,CN,Chen,CN,True
2,108,75.0,US,Bob,US,False


In [0]:
df_left=df_orders.join(df_customers,on='customer_id',how='left')
display(df_left)

customer_id,order_id,amount,country,name,country.1,vip
1.0,101,120.0,IN,Asha,IN,True
1.0,102,80.0,IN,Asha,IN,True
2.0,103,50.0,US,Bob,US,False
5.0,104,30.0,DE,,,
3.0,105,200.0,CN,Chen,CN,True
,106,15.0,UK,,,
3.0,107,40.0,CN,Chen,CN,True
2.0,108,75.0,US,Bob,US,False


In [0]:
df_full = df_orders.join(df_customers,on='customer_id',how='full')
display(df_full)

customer_id,order_id,amount,country,name,country.1,vip
1.0,101.0,120.0,IN,Asha,IN,True
1.0,102.0,80.0,IN,Asha,IN,True
2.0,103.0,50.0,US,Bob,US,False
5.0,104.0,30.0,DE,,,
3.0,105.0,200.0,CN,Chen,CN,True
,106.0,15.0,UK,,,
3.0,107.0,40.0,CN,Chen,CN,True
2.0,108.0,75.0,US,Bob,US,False
,,,,Ghost,UK,False
4.0,,,,Diana,US,


In [0]:
df_full = df_orders.join(df_customers,on='customer_id',how='left_semi')
display(df_full)

customer_id,order_id,amount,country
1,101,120.0,IN
1,102,80.0,IN
2,103,50.0,US
3,105,200.0,CN
3,107,40.0,CN
2,108,75.0,US


In [0]:
df_full = df_orders.join(df_customers,on='customer_id',how='left_anti')
display(df_full)

customer_id,order_id,amount,country
5.0,104,30.0,DE
,106,15.0,UK


In [0]:
feature_df = oct_events.withColumn(
    "high_price_flag",
    expr("CASE WHEN price > 1000 THEN 1 ELSE 0 END")
)

In [0]:
feature_df = feature_df.withColumn(
    "is_purchase",
    expr("CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END")
    )

In [0]:
display(feature_df.limit(20))

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,high_price_flag,is_purchase
2019-10-01T00:00:00.000Z,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c,0,0
2019-10-01T00:00:00.000Z,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc,0,0
2019-10-01T00:00:01.000Z,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8,0,0
2019-10-01T00:00:01.000Z,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713,0,0
2019-10-01T00:00:04.000Z,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d,1,0
2019-10-01T00:00:05.000Z,view,1480613,2053013561092866779,computers.desktop,pulser,908.62,512742880,0d0d91c2-c9c2-4e81-90a5-86594dec0db9,0,0
2019-10-01T00:00:08.000Z,view,17300353,2053013553853497655,,creed,380.96,555447699,4fe811e9-91de-46da-90c3-bbd87ed3a65d,0,0
2019-10-01T00:00:08.000Z,view,31500053,2053013558031024687,,luminarc,41.16,550978835,6280d577-25c8-4147-99a7-abc6048498d6,0,0
2019-10-01T00:00:10.000Z,view,28719074,2053013565480109009,apparel.shoes.keds,baden,102.71,520571932,ac1cd4e5-a3ce-4224-a2d7-ff660a105880,0,0
2019-10-01T00:00:11.000Z,view,1004545,2053013555631882655,electronics.smartphone,huawei,566.01,537918940,406c46ed-90a4-4787-a43b-59a410c1a5fb,0,0
