In [1]:
import pyspark
sc = pyspark.SparkContext(appName = "MyAPP")
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

In [2]:
df = spark.read.csv('2019-Oct.csv', header=True, inferSchema=True).limit(100000)

In [3]:
#drop rows with null value on category_code and brand
df = df.where("category_code is not null and brand is not null")

In [4]:
#event_type count during each user_session 
dactivity = df.groupBy('user_session').count()

In [5]:
#select rows of event_type = 'cart' or event_type = 'purchase'
#drop duplicates rows based on ['event_type', 'product_id','price', 'user_id','user_session']
df = df.select('*').where("event_type = 'cart' or event_type = 'purchase'").dropDuplicates(subset = ['event_type', 'product_id','price', 'user_id','user_session'])

In [6]:
#add new column is_purchased if event_type = purchase then 1 else 0
df = df.withColumn('is_purchased', when(col('event_type') == 'purchase', 1).otherwise(0))

In [7]:
#chop the dataset into 2 sets by event_type
da = df.select('*').where("event_type = 'cart'")
db = df.select('*').where("event_type = 'purchase'")

In [8]:
#left join rows of cart with rows of purchase, update the is_purchased of cart to is_purchased of corresponding purchase with key ['user_session', 'product_id']
#only consider goods put into cart and be purchased afterward
import pyspark.sql.functions as f

dr=da.alias('a').join(
    db.alias('b'), ['user_session', 'product_id'], how='left'
).select('user_session', 'product_id', 'a.event_time', 'a.event_type', 'a.product_id', 'a.category_id', 'a.category_code', 'a.brand', 'a.price', 'a.user_id',
    f.coalesce('b.is_purchased', 'a.is_purchased').alias('is_purchased')
)

In [9]:
#join the activity_count
dr=dr.join(dactivity, 'user_session') \
  .select('*') \
  .withColumnRenamed('count', 'activity_count')

In [10]:
#split the category_code by .
dr = dr.select('*', split('category_code',"\\.")[0], split('category_code',"\\.")[1], split('category_code',"\\.")[2])\
  .withColumnRenamed('split(category_code, \., -1)[0]', 'category_code_level1') \
  .withColumnRenamed('split(category_code, \., -1)[1]', 'category_code_level2') \
  .withColumnRenamed('split(category_code, \., -1)[2]', 'category_code_level4')

In [11]:
#add new field week_day and category_code_level3 to category_code_level2 if category_code_level4 is null
dr = dr.select('*', dayofweek(col('event_time')).alias('week_day'), f.coalesce('category_code_level2', 'category_code_level4').alias('category_code_level3'))

In [12]:
#tranform the string fields 'brand', 'category_code_level1', 'category_code_level2','category_code_level3' with StringIndexer for ML
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=['brand', 'category_code_level1', 'category_code_level2','category_code_level3'], outputCols=['brandIndex', 'category_code_level1_index', 'category_code_level2_index','category_code_level3_index'])
dindexed = indexer.fit(dr).transform(dr)
dindexed.show()

+--------------------+----------+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+------------+--------------+--------------------+--------------------+--------------------+--------+--------------------+----------+--------------------------+--------------------------+--------------------------+
|        user_session|product_id|          event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|is_purchased|activity_count|category_code_level1|category_code_level2|category_code_level4|week_day|category_code_level3|brandIndex|category_code_level1_index|category_code_level2_index|category_code_level3_index|
+--------------------+----------+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+------------+--------------+--------------------+--------------------+--------------------+--------+--------------------+----------+-----------

In [13]:
#finally select the result for training
dresult = dindexed.select('event_time','event_type','product_id','category_id','category_code','brandIndex',
                        'price','user_id','user_session','is_purchased','week_day',
                        'category_code_level1_index','category_code_level1_index','category_code_level3_index', 'activity_count') \
                 .withColumnRenamed('brandIndex', 'brand') \
                 .withColumnRenamed('category_code_level1_index', 'category_code_level1') \
                 .withColumnRenamed('category_code_level2_index', 'category_code_level2') \
                 .withColumnRenamed('category_code_level3_index', 'category_code_level3')

In [14]:
dresult.show()

+--------------------+----------+----------+-------------------+--------------------+-----+-------+---------+--------------------+------------+--------+--------------------+--------------------+--------------------+--------------+
|          event_time|event_type|product_id|        category_id|       category_code|brand|  price|  user_id|        user_session|is_purchased|week_day|category_code_level1|category_code_level1|category_code_level3|activity_count|
+--------------------+----------+----------+-------------------+--------------------+-----+-------+---------+--------------------+------------+--------+--------------------+--------------------+--------------------+--------------+
|2019-10-01 03:49:...|      cart|   3601489|2053013563810775923|appliances.kitche...|  0.0| 476.18|550367893|010f6005-cb1c-4a7...|           0|       3|                 1.0|                 1.0|                 5.0|             8|
|2019-10-01 04:11:...|      cart|   1004870|2053013555631882655|electronics.

In [15]:
dresult.groupBy('is_purchased').count().show()

+------------+-----+
|is_purchased|count|
+------------+-----+
|           1|  432|
|           0|  337|
+------------+-----+

