In [12]:
sc

In [13]:
from pyspark.sql.types import StringType, StructField, StructType, TimestampType, ArrayType, IntegerType, LongType

event_schema = StructType([
    StructField("timestamp", StringType()),
    StructField("visitorid", IntegerType()),
    StructField("event", StringType()),
    StructField("itemid", IntegerType()),
    StructField("transactionid", IntegerType())
])

In [14]:
fp = "/home/admin123/recsys/retail_rocket/events.csv"
df = spark.read.csv(fp, header = True, schema = event_schema)

In [15]:
df.show()

+-------------+---------+---------+------+-------------+
|    timestamp|visitorid|    event|itemid|transactionid|
+-------------+---------+---------+------+-------------+
|1433221332117|   257597|     view|355908|         null|
|1433224214164|   992329|     view|248676|         null|
|1433221999827|   111016|     view|318965|         null|
|1433221955914|   483717|     view|253185|         null|
|1433221337106|   951259|     view|367447|         null|
|1433224086234|   972639|     view| 22556|         null|
|1433221923240|   810725|     view|443030|         null|
|1433223291897|   794181|     view|439202|         null|
|1433220899221|   824915|     view|428805|         null|
|1433221204592|   339335|     view| 82389|         null|
|1433222162373|   176446|     view| 10572|         null|
|1433221701252|   929206|     view|410676|         null|
|1433224229496|    15795|     view| 44872|         null|
|1433223697356|   598426|     view|156489|         null|
|1433224078165|   223343|     v

In [16]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- transactionid: integer (nullable = true)



In [17]:
from pyspark.sql import functions as f
from pyspark.sql import types as t

In [18]:
format = "yyyy-MM-dd HH:mm:ss"
df = df.withColumn('timestamp2', f.from_unixtime(df.timestamp/1000).cast(TimestampType()))


In [19]:
df.show()

+-------------+---------+---------+------+-------------+-------------------+
|    timestamp|visitorid|    event|itemid|transactionid|         timestamp2|
+-------------+---------+---------+------+-------------+-------------------+
|1433221332117|   257597|     view|355908|         null|2015-06-02 10:32:12|
|1433224214164|   992329|     view|248676|         null|2015-06-02 11:20:14|
|1433221999827|   111016|     view|318965|         null|2015-06-02 10:43:19|
|1433221955914|   483717|     view|253185|         null|2015-06-02 10:42:35|
|1433221337106|   951259|     view|367447|         null|2015-06-02 10:32:17|
|1433224086234|   972639|     view| 22556|         null|2015-06-02 11:18:06|
|1433221923240|   810725|     view|443030|         null|2015-06-02 10:42:03|
|1433223291897|   794181|     view|439202|         null|2015-06-02 11:04:51|
|1433220899221|   824915|     view|428805|         null|2015-06-02 10:24:59|
|1433221204592|   339335|     view| 82389|         null|2015-06-02 10:30:04|

In [20]:
type(df)

pyspark.sql.dataframe.DataFrame

In [21]:
df = df.drop('timestamp')

In [22]:
df = df.withColumnRenamed('timestamp2', 'timestamp')
df.show()

+---------+---------+------+-------------+-------------------+
|visitorid|    event|itemid|transactionid|          timestamp|
+---------+---------+------+-------------+-------------------+
|   257597|     view|355908|         null|2015-06-02 10:32:12|
|   992329|     view|248676|         null|2015-06-02 11:20:14|
|   111016|     view|318965|         null|2015-06-02 10:43:19|
|   483717|     view|253185|         null|2015-06-02 10:42:35|
|   951259|     view|367447|         null|2015-06-02 10:32:17|
|   972639|     view| 22556|         null|2015-06-02 11:18:06|
|   810725|     view|443030|         null|2015-06-02 10:42:03|
|   794181|     view|439202|         null|2015-06-02 11:04:51|
|   824915|     view|428805|         null|2015-06-02 10:24:59|
|   339335|     view| 82389|         null|2015-06-02 10:30:04|
|   176446|     view| 10572|         null|2015-06-02 10:46:02|
|   929206|     view|410676|         null|2015-06-02 10:38:21|
|    15795|     view| 44872|         null|2015-06-02 11

In [23]:
df.show()

+---------+---------+------+-------------+-------------------+
|visitorid|    event|itemid|transactionid|          timestamp|
+---------+---------+------+-------------+-------------------+
|   257597|     view|355908|         null|2015-06-02 10:32:12|
|   992329|     view|248676|         null|2015-06-02 11:20:14|
|   111016|     view|318965|         null|2015-06-02 10:43:19|
|   483717|     view|253185|         null|2015-06-02 10:42:35|
|   951259|     view|367447|         null|2015-06-02 10:32:17|
|   972639|     view| 22556|         null|2015-06-02 11:18:06|
|   810725|     view|443030|         null|2015-06-02 10:42:03|
|   794181|     view|439202|         null|2015-06-02 11:04:51|
|   824915|     view|428805|         null|2015-06-02 10:24:59|
|   339335|     view| 82389|         null|2015-06-02 10:30:04|
|   176446|     view| 10572|         null|2015-06-02 10:46:02|
|   929206|     view|410676|         null|2015-06-02 10:38:21|
|    15795|     view| 44872|         null|2015-06-02 11

In [24]:
df.printSchema()

root
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- transactionid: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [25]:
dfs = df.sample(fraction= 0.005)

In [26]:
dfs.show()

+---------+---------+------+-------------+-------------------+
|visitorid|    event|itemid|transactionid|          timestamp|
+---------+---------+------+-------------+-------------------+
|   157954|     view|143486|         null|2015-06-02 02:42:05|
|   183568|     view|335913|         null|2015-06-02 09:25:01|
|    63121|     view|310791|         null|2015-06-01 20:56:49|
|  1181810|     view|444365|         null|2015-06-01 21:07:09|
|   164708|     view| 23828|         null|2015-06-01 23:47:12|
|   449199|     view|298868|         null|2015-06-01 23:12:07|
|  1382480|     view|390591|         null|2015-06-01 22:51:33|
|  1250313|     view|148545|         null|2015-06-01 22:57:14|
|   761891|     view|239052|         null|2015-06-02 04:59:00|
|   131869|addtocart|183411|         null|2015-06-02 01:42:29|
|  1086719|     view|440248|         null|2015-06-01 15:12:39|
|   400325|     view|393134|         null|2015-06-01 18:02:07|
|   563755|     view|389814|         null|2015-06-01 13

In [27]:
from pyspark.sql.functions import count
dfsgb = dfs.groupBy(["visitorid", "itemid"]).agg(count("timestamp").alias("interactions"))

In [28]:
dfsgb.show()

+---------+------+------------+
|visitorid|itemid|interactions|
+---------+------+------------+
|  1361418|450774|           1|
|  1394773|248023|           1|
|   205215| 93101|           1|
|   804658|136329|           1|
|   503857|202891|           1|
|   678548|167269|           1|
|   199911|350475|           1|
|   388205|384528|           1|
|  1255217|421862|           1|
|   542391|451676|           1|
|   848570|434782|           1|
|   377124|349254|           1|
|  1345555|394351|           1|
|   757936|247427|           1|
|   355503|321861|           1|
|   695015|461199|           1|
|   103899|231009|           1|
|   982515|350183|           1|
|   775034|143063|           1|
|   339108|411817|           1|
+---------+------+------------+
only showing top 20 rows



In [29]:
from pyspark.sql.functions import max, min
dfsgb.agg(max("interactions")).show()

+-----------------+
|max(interactions)|
+-----------------+
|                4|
+-----------------+



In [30]:
dfsui = dfs.groupby("visitorid").agg(count("timestamp").alias("total_user_interaction_count"))

In [31]:
dfsui.agg(max("total_user_interaction_count")).show()

+---------------------------------+
|max(total_user_interaction_count)|
+---------------------------------+
|                               32|
+---------------------------------+



In [32]:
from pyspark.sql.functions import col, countDistinct
dfs.agg(countDistinct("visitorid")).show()

+-------------------------+
|count(DISTINCT visitorid)|
+-------------------------+
|                    13117|
+-------------------------+



In [33]:
dfs.agg(countDistinct("itemid")).show()

+----------------------+
|count(DISTINCT itemid)|
+----------------------+
|                 11334|
+----------------------+



In [34]:
type(dfsgb)

pyspark.sql.dataframe.DataFrame

In [35]:
from pyspark.sql.functions import *

df1 = dfsgb.alias('df1')
df2 = dfsui.alias('df2')

In [36]:
dfr = df1.join(df2, df1.visitorid == df2.visitorid).select('df1.*', 'df2.total_user_interaction_count')

In [37]:
dfr = dfr.withColumn('rui', dfr.interactions/dfr.total_user_interaction_count)

In [38]:
dfr.agg(min("rui")).show()

+--------+
|min(rui)|
+--------+
| 0.03125|
+--------+



In [39]:
from pyspark.sql.functions import mean
dfr.agg(mean("rui")).show()

+------------------+
|          avg(rui)|
+------------------+
|0.9537555442448921|
+------------------+



In [40]:
dfr.approxQuantile("rui", [0.1, 0.2, 0.3, 0.4, 0.5], 0.25)

[0.03125, 0.03125, 1.0, 1.0, 1.0]

In [41]:
dfs.groupBy("event").count().show()

+-----------+-----+
|      event|count|
+-----------+-----+
|transaction|   98|
|  addtocart|  333|
|       view|13376|
+-----------+-----+



In [94]:
def assign_ratings(e):
    
    rating = 0
    if e == "view":
        rating = 2
    else:
        rating = 5
    
    return rating

In [97]:
from pyspark.sql import Row
dfs.select("event").rdd.map(lambda x : assign_ratings(x[0])).map(lambda x: Row(x)).toDF(["ratings"]).show()

+-------+
|ratings|
+-------+
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      5|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      5|
+-------+
only showing top 20 rows



In [98]:
from pyspark.sql.functions import col, when
dfs = dfs.withColumn("ratings", when(col("event")=='view', 2).otherwise(5))

In [99]:
dfs.rdd.countApproxDistinct()

+---------+---------+------+-------------+-------------------+-------+
|visitorid|    event|itemid|transactionid|          timestamp|ratings|
+---------+---------+------+-------------+-------------------+-------+
|   157954|     view|143486|         null|2015-06-02 02:42:05|      2|
|   183568|     view|335913|         null|2015-06-02 09:25:01|      2|
|    63121|     view|310791|         null|2015-06-01 20:56:49|      2|
|  1181810|     view|444365|         null|2015-06-01 21:07:09|      2|
|   164708|     view| 23828|         null|2015-06-01 23:47:12|      2|
|   449199|     view|298868|         null|2015-06-01 23:12:07|      2|
|  1382480|     view|390591|         null|2015-06-01 22:51:33|      2|
|  1250313|     view|148545|         null|2015-06-01 22:57:14|      2|
|   761891|     view|239052|         null|2015-06-02 04:59:00|      2|
|   131869|addtocart|183411|         null|2015-06-02 01:42:29|      5|
|  1086719|     view|440248|         null|2015-06-01 15:12:39|      2|
|   40