In [1]:
from pyspark.sql import functions as F

from IPython.display import Markdown, display
def printmd(string, color=None):
    string = '**' + string.replace('_',' ').title() + '**'
    display(Markdown(string))
    
import pandas as pd
pd.set_option('display.max_rows',None)

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid", palette="pastel")

from functools import reduce

In [2]:
sc

In [2]:
df = spark.read.parquet("./dataset/dataset.parquet")

                                                                                

In [3]:
df.createOrReplaceTempView("cus_behaviors")

In [4]:
sql_script = \
"""
    SELECT *
    FROM cus_behaviors
    LIMIT 2
"""
spark.sql(sql_script).toPandas()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-17 08:43:00,view,2501799,2053013564003713919,appliances.kitchen.oven,elenberg,46.31,563237118,4368d099-6d19-47cb-95f8-1a235bc2d701
1,2019-11-17 08:43:00,view,6400335,2053013554121933129,computers.components.cpu,intel,435.28,551129779,4db2c365-ee85-4437-87ad-23d70bfc01a9


In [10]:
sql_script = \
"""
    SELECT 
        user_session,
        event_type,
        COUNT(event_type) AS count
    FROM cus_behaviors
    GROUP BY user_session, event_type
    ORDER BY event_type DESC, count
    ;
"""
temp = spark.sql(sql_script).groupBy('user_session').pivot('event_type').sum('count').alias('count').fillna(0)
temp.coalesce(1).write.mode('overwrite').save('./temp/num_action_by_session.parquet', format='parquet') 

num_action_by_session = pd.read_parquet('./temp/num_action_by_session.parquet',engine='pyarrow')
print(num_action_by_session.shape)
display(num_action_by_session.head(20))

(13776051, 4)


Unnamed: 0,user_session,cart,purchase,view
0,000506dd-c8f9-46e2-895c-70a78cde8e17,1,0,14
1,001be2de-46a6-429d-b568-fd52d8af3f7d,1,1,1
2,003a912b-50c7-4114-bbdd-b7f078283268,0,0,3
3,0043bdfa-2cb9-488f-98fd-b7cf00622a43,0,0,2
4,005072f4-6030-4acd-bc83-6749f47e7c63,0,0,2
5,006909c3-9450-407a-9007-c87c643336f2,0,0,3
6,007d9125-7fa0-43b2-ad0a-f127bce1c753,1,0,1
7,00804bcc-d570-4b52-85ef-6ebf1a364092,0,0,1
8,008a70ba-f41c-4766-ac68-0291f25eee65,0,0,1
9,0093b5fd-88ac-4224-9b80-b30b97e7affc,0,0,14


In [8]:
sql_script = \
"""
    SELECT 
        user_session,
        MIN(event_time) AS event_time_first,
        MAX(event_time) AS event_time_last
    FROM cus_behaviors
    GROUP BY user_session
    ;
"""
temp = spark.sql(sql_script).withColumn('duration', F.col('event_time_last').cast('long') - F.col('event_time_first').cast('long'))
temp.coalesce(1).write.mode('overwrite').save('./temp/duration.parquet', format='parquet') 

duration = pd.read_parquet('./temp/duration.parquet',engine='pyarrow')
print(duration.shape)
display(duration.head(20))

(13776051, 4)


Unnamed: 0,user_session,event_time_first,event_time_last,duration
0,eddf5cd0-c926-4920-a5b4-b2a65b7779b7,2019-11-17 01:29:09,2019-11-17 01:43:58,889
1,5c1e2dcb-f090-4014-b5f2-77db180404e5,2019-11-17 01:33:04,2019-11-17 02:21:00,2876
2,50125c8f-d912-46ec-9a17-f83024bd0973,2019-11-17 01:39:32,2019-11-17 01:44:22,290
3,330ca4bd-02d9-4f70-b956-2a2e225fef6a,2019-11-17 01:31:36,2019-11-17 01:49:27,1071
4,bee47dc9-2b95-4949-873d-22b810b1c5bd,2019-11-17 01:39:11,2019-11-17 01:43:07,236
5,5c2101d8-ffb0-4fe5-95d1-e8fcd9052967,2019-11-17 01:32:33,2019-11-17 01:56:41,1448
6,7d923a03-3b0d-46e1-87b8-413fbf1eafa4,2019-11-17 01:43:10,2019-11-17 01:43:46,36
7,b3cbb161-0e85-4d87-9f91-9d1764bc44f2,2019-11-17 01:30:15,2019-11-17 02:06:37,2182
8,a86b63f0-5a8b-4e25-97d5-f0f65a1235b4,2019-11-17 01:18:08,2019-11-17 01:43:37,1529
9,5e1230a5-56e1-45cd-94b1-f5cc2f19600d,2019-11-17 01:43:25,2019-11-17 01:53:18,593


In [12]:
sql_script = \
"""
    SELECT
        user_session,
        COUNT(DISTINCT category_id) AS num_distinct_category
    FROM cus_behaviors
    GROUP BY user_session
"""

temp = spark.sql(sql_script).fillna(0)
temp.coalesce(1).write.mode('overwrite').save('./temp/distinct_category.parquet', format='parquet') 

distinct_category = pd.read_parquet('./temp/distinct_category.parquet',engine='pyarrow')
print(distinct_category.shape)
display(distinct_category.sort_values('num_distinct_category',ascending = False).head(20))

(13776051, 2)


Unnamed: 0,user_session,num_distinct_category
98078,afb5848d-98e5-43e8-bd9d-c0ad5ef9d8bf,99
1393986,4921edd9-3cbb-43a4-bb0d-d12912b74f98,89
6940625,fc749a4e-c432-4dae-a0a1-04de89f1e4ea,89
2812281,d6433d7b-3846-456a-88de-748c3fac2675,87
9701391,b556f0c7-3a23-44f5-9f34-e713fefa9686,86
4171151,d99d91bf-40f8-4e29-9593-54b4a1826542,81
2803136,88206fc3-b5ea-4e3b-be68-67edfbf7009b,80
6902679,af4ad1c0-a131-4fab-94f2-5d6d647adcb7,69
12416116,5759a13c-5b35-4e79-ab26-5453f422be31,68
11155378,9ac8fcf2-973b-41ca-a052-f670260c77c7,63


In [7]:
sql_script = \
"""
    SELECT
        user_session,
        COUNT(DISTINCT category_id) AS num_distinct_cart_cat,
        COUNT(DISTINCT product_id) AS num_distinct_cart_product
    FROM cus_behaviors
    WHERE event_type = 'cart'
    GROUP BY user_session
"""

temp = spark.sql(sql_script).fillna(0)
temp.coalesce(1).write.mode('overwrite').save('./temp/num_distinct_cart_cat.parquet', format='parquet') 

num_distinct_cart_cat = pd.read_parquet('./temp/num_distinct_cart_cat.parquet',engine='pyarrow')
print(num_distinct_cart_cat.shape)
display(num_distinct_cart_cat.sort_values('num_distinct_cart_cat',ascending = False).head(20))

(1743343, 3)


Unnamed: 0,user_session,num_distinct_cart_cat,num_distinct_cart_product
1546784,f38c9080-eb01-4e42-94de-81201179c15d,16,36
35710,13a0cd04-cbec-4842-866e-629f73fa66c0,15,50
1531630,3d60451e-ef71-4bb7-9258-6194a850dd67,14,17
887494,68b52b9a-97c8-4525-aba2-604ede028da8,13,56
881498,c1ada587-2d56-442f-9779-486c1d390c96,12,17
1109292,1c140b4b-61dc-4c85-8b86-3ed1cc9b29ec,12,20
471596,6941b303-3d18-4e9d-9c2f-9931d2b63bf9,12,12
1288987,c10be987-02cc-48f9-8c14-397378636b39,12,17
867584,912a78ca-392a-40f0-b096-cdd5afc83895,12,12
1074692,7422055c-a866-4aa7-9a57-58431ca270b6,11,13


In [16]:
sql_script = \
"""
    SELECT 
        user_session,
        event_type,
        SUM(price) AS potential_revenue
    FROM cus_behaviors
    GROUP BY user_session, event_type
    ORDER BY event_type DESC, potential_revenue
    ;
"""
temp = spark.sql(sql_script).groupBy('user_session').pivot('event_type').sum('potential_revenue').alias('potential_revenue').fillna(0)
temp.coalesce(1).write.mode('overwrite').save('./temp/potential_revenue.parquet', format='parquet') 

potential_revenue = pd.read_parquet('./temp/potential_revenue.parquet',engine='pyarrow')
print(potential_revenue.shape)
display(potential_revenue.head(20)) 

(13776051, 4)


Unnamed: 0,user_session,cart,purchase,view
0,000506dd-c8f9-46e2-895c-70a78cde8e17,486.24,0.0,5589.57
1,001be2de-46a6-429d-b568-fd52d8af3f7d,8.21,8.21,8.21
2,003a912b-50c7-4114-bbdd-b7f078283268,0.0,0.0,509.55
3,0043bdfa-2cb9-488f-98fd-b7cf00622a43,0.0,0.0,673.58
4,005072f4-6030-4acd-bc83-6749f47e7c63,0.0,0.0,1654.43
5,006909c3-9450-407a-9007-c87c643336f2,0.0,0.0,1542.69
6,007d9125-7fa0-43b2-ad0a-f127bce1c753,159.33,0.0,159.33
7,00804bcc-d570-4b52-85ef-6ebf1a364092,0.0,0.0,64.35
8,008a70ba-f41c-4766-ac68-0291f25eee65,0.0,0.0,149.9
9,0093b5fd-88ac-4224-9b80-b30b97e7affc,0.0,0.0,1741.7


In [8]:
num_action_by_session = pd.read_parquet('./temp/num_action_by_session.parquet',engine='pyarrow')
duration = pd.read_parquet('./temp/duration.parquet',engine='pyarrow')
distinct_category = pd.read_parquet('./temp/distinct_category.parquet',engine='pyarrow')
num_distinct_cart_cat = pd.read_parquet('./temp/num_distinct_cart_cat.parquet',engine='pyarrow')
num_action_by_session.columns = [num_action_by_session.columns[0]] + ['c_'+x for x in num_action_by_session.columns[1:]]
potential_revenue = pd.read_parquet('./temp/potential_revenue.parquet',engine='pyarrow')
potential_revenue.columns = [potential_revenue.columns[0]] + ['r_'+x for x in potential_revenue.columns[1:]]


In [9]:
tab_names = ['duration','distinct_category','num_distinct_cart_cat','num_action_by_session','potential_revenue']
tab_list = [eval(tab) for tab in tab_names]
df = reduce(lambda df1,df2: pd.merge(df1,df2,left_on='user_session',right_on='user_session',how = 'outer'),tab_list)

In [12]:
df.shape

(13776051, 12)

In [10]:
df.fillna(0)
df.dropna()
df.shape
df.to_parquet('./dataset/temp.parquet',engine='pyarrow')

In [2]:
df = spark.read.parquet('./dataset/temp.parquet').drop('__index_level_0__')

                                                                                

In [3]:
from pyspark.sql import functions as F
df = df.withColumn('r_view',F.round(F.col('r_view'),2))
df = df.withColumn('mean_cart',F.round(F.col('r_cart')/F.col('c_cart'),2)).fillna(0)
df = df.withColumn('mean_purchase',F.round(F.col('r_purchase')/F.col('c_purchase'),2)).fillna(0)
df = df.withColumn('mean_view',F.round(F.col('r_view')/F.col('c_view'),2)).fillna(0)
df = df.withColumn("hour", F.hour(F.col("event_time_first")))
df = df.withColumn("day_name", F.date_format("event_time_first",'E'))

In [4]:
df.write.mode('overwrite').parquet('./dataset/refined_feature_dataset.parquet')

                                                                                