In [1]:
sc

In [3]:
from pyspark.sql.types import StringType, StructField, StructType, TimestampType, ArrayType, IntegerType, LongType

event_schema = StructType([
    StructField("timestamp", StringType()),
    StructField("visitorid", IntegerType()),
    StructField("event", StringType()),
    StructField("itemid", IntegerType()),
    StructField("transactionid", IntegerType())
])

In [4]:
fp = "gs://dataproc-81e79564-8ae2-4a68-9b71-fbaaa2a28716-asia-south1/spark_data/events.csv"
df = spark.read.csv(fp, header = True, schema = event_schema)

In [5]:
df.show()

+-------------+---------+---------+------+-------------+
|    timestamp|visitorid|    event|itemid|transactionid|
+-------------+---------+---------+------+-------------+
|1433221332117|   257597|     view|355908|         null|
|1433224214164|   992329|     view|248676|         null|
|1433221999827|   111016|     view|318965|         null|
|1433221955914|   483717|     view|253185|         null|
|1433221337106|   951259|     view|367447|         null|
|1433224086234|   972639|     view| 22556|         null|
|1433221923240|   810725|     view|443030|         null|
|1433223291897|   794181|     view|439202|         null|
|1433220899221|   824915|     view|428805|         null|
|1433221204592|   339335|     view| 82389|         null|
|1433222162373|   176446|     view| 10572|         null|
|1433221701252|   929206|     view|410676|         null|
|1433224229496|    15795|     view| 44872|         null|
|1433223697356|   598426|     view|156489|         null|
|1433224078165|   223343|     v

In [6]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- transactionid: integer (nullable = true)



In [7]:
from pyspark.sql import functions as f
from pyspark.sql import types as t

In [8]:
format = "yyyy-MM-dd HH:mm:ss"
df = df.withColumn('timestamp2', f.from_unixtime(df.timestamp/1000).cast(TimestampType()))


In [9]:
df.show()

+-------------+---------+---------+------+-------------+-------------------+
|    timestamp|visitorid|    event|itemid|transactionid|         timestamp2|
+-------------+---------+---------+------+-------------+-------------------+
|1433221332117|   257597|     view|355908|         null|2015-06-02 05:02:12|
|1433224214164|   992329|     view|248676|         null|2015-06-02 05:50:14|
|1433221999827|   111016|     view|318965|         null|2015-06-02 05:13:19|
|1433221955914|   483717|     view|253185|         null|2015-06-02 05:12:35|
|1433221337106|   951259|     view|367447|         null|2015-06-02 05:02:17|
|1433224086234|   972639|     view| 22556|         null|2015-06-02 05:48:06|
|1433221923240|   810725|     view|443030|         null|2015-06-02 05:12:03|
|1433223291897|   794181|     view|439202|         null|2015-06-02 05:34:51|
|1433220899221|   824915|     view|428805|         null|2015-06-02 04:54:59|
|1433221204592|   339335|     view| 82389|         null|2015-06-02 05:00:04|

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [11]:
df = df.drop('timestamp')

In [12]:
df = df.withColumnRenamed('timestamp2', 'timestamp')
df.show()

+---------+---------+------+-------------+-------------------+
|visitorid|    event|itemid|transactionid|          timestamp|
+---------+---------+------+-------------+-------------------+
|   257597|     view|355908|         null|2015-06-02 05:02:12|
|   992329|     view|248676|         null|2015-06-02 05:50:14|
|   111016|     view|318965|         null|2015-06-02 05:13:19|
|   483717|     view|253185|         null|2015-06-02 05:12:35|
|   951259|     view|367447|         null|2015-06-02 05:02:17|
|   972639|     view| 22556|         null|2015-06-02 05:48:06|
|   810725|     view|443030|         null|2015-06-02 05:12:03|
|   794181|     view|439202|         null|2015-06-02 05:34:51|
|   824915|     view|428805|         null|2015-06-02 04:54:59|
|   339335|     view| 82389|         null|2015-06-02 05:00:04|
|   176446|     view| 10572|         null|2015-06-02 05:16:02|
|   929206|     view|410676|         null|2015-06-02 05:08:21|
|    15795|     view| 44872|         null|2015-06-02 05

In [13]:
df.show()

+---------+---------+------+-------------+-------------------+
|visitorid|    event|itemid|transactionid|          timestamp|
+---------+---------+------+-------------+-------------------+
|   257597|     view|355908|         null|2015-06-02 05:02:12|
|   992329|     view|248676|         null|2015-06-02 05:50:14|
|   111016|     view|318965|         null|2015-06-02 05:13:19|
|   483717|     view|253185|         null|2015-06-02 05:12:35|
|   951259|     view|367447|         null|2015-06-02 05:02:17|
|   972639|     view| 22556|         null|2015-06-02 05:48:06|
|   810725|     view|443030|         null|2015-06-02 05:12:03|
|   794181|     view|439202|         null|2015-06-02 05:34:51|
|   824915|     view|428805|         null|2015-06-02 04:54:59|
|   339335|     view| 82389|         null|2015-06-02 05:00:04|
|   176446|     view| 10572|         null|2015-06-02 05:16:02|
|   929206|     view|410676|         null|2015-06-02 05:08:21|
|    15795|     view| 44872|         null|2015-06-02 05

In [14]:
df.printSchema()

root
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- transactionid: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [15]:
dfs = df.sample(fraction= 0.005)

In [16]:
dfs.show()

+---------+-----+------+-------------+-------------------+
|visitorid|event|itemid|transactionid|          timestamp|
+---------+-----+------+-------------+-------------------+
|    11598| view| 62549|         null|2015-06-01 21:13:16|
|   209390| view| 67487|         null|2015-06-01 21:36:20|
|  1296338| view|147825|         null|2015-06-01 16:00:23|
|   234299| view|369179|         null|2015-06-01 17:58:21|
|   412017| view|340093|         null|2015-06-01 17:53:56|
|   146980| view|413990|         null|2015-06-01 17:46:41|
|   393609| view|147662|         null|2015-06-01 17:24:28|
|   465556| view|272897|         null|2015-06-01 23:49:07|
|   661192| view|145995|         null|2015-06-01 23:19:52|
|  1278847| view| 60621|         null|2015-06-02 01:50:17|
|   717876| view|189270|         null|2015-06-02 01:56:46|
|   519385| view|106848|         null|2015-06-02 02:12:17|
|  1294301| view|370653|         null|2015-06-01 19:54:57|
|   759071| view|138115|         null|2015-06-01 19:52:5

In [17]:
from pyspark.sql.functions import count
dfsgb = dfs.groupBy(["visitorid", "itemid"]).agg(count("timestamp").alias("interactions"))

In [18]:
dfsgb.show()

+---------+------+------------+
|visitorid|itemid|interactions|
+---------+------+------------+
|   387301|260707|           1|
|   130549|367059|           1|
|   329988|280685|           1|
|   754613|283218|           1|
|   447572|293231|           1|
|   947917|256432|           1|
|   651022|407952|           1|
|   948732| 19423|           1|
|   684514|148446|           1|
|   644514| 50910|           1|
|   735167|429094|           1|
|  1243742| 62734|           1|
|   246596|161571|           1|
|   953029|  8233|           1|
|  1072182|351535|           1|
|   907226|202355|           1|
|   437368| 68415|           1|
|   535621| 71530|           1|
|   186000|240170|           1|
|   530559|337671|           1|
+---------+------+------------+
only showing top 20 rows



In [19]:
from pyspark.sql.functions import max, min
dfsgb.agg(max("interactions")).show()

+-----------------+
|max(interactions)|
+-----------------+
|                3|
+-----------------+



In [20]:
dfsui = dfs.groupby("visitorid").agg(count("timestamp").alias("total_user_interaction_count"))

In [21]:
dfsui.agg(max("total_user_interaction_count")).show()

+---------------------------------+
|max(total_user_interaction_count)|
+---------------------------------+
|                               41|
+---------------------------------+



In [22]:
from pyspark.sql.functions import col, countDistinct
dfs.agg(countDistinct("visitorid")).show()

+-------------------------+
|count(DISTINCT visitorid)|
+-------------------------+
|                    13113|
+-------------------------+



In [23]:
dfs.agg(countDistinct("itemid")).show()

+----------------------+
|count(DISTINCT itemid)|
+----------------------+
|                 11269|
+----------------------+



In [24]:
type(dfsgb)

pyspark.sql.dataframe.DataFrame

In [25]:
from pyspark.sql.functions import *

df1 = dfsgb.alias('df1')
df2 = dfsui.alias('df2')

In [26]:
dfr = df1.join(df2, df1.visitorid == df2.visitorid).select('df1.*', 'df2.total_user_interaction_count')

In [27]:
dfr = dfr.withColumn('rui', dfr.interactions/dfr.total_user_interaction_count)

In [28]:
dfr.agg(min("rui")).show()

+--------------------+
|            min(rui)|
+--------------------+
|0.024390243902439025|
+--------------------+



In [29]:
from pyspark.sql.functions import mean
dfr.agg(mean("rui")).show()

+------------------+
|          avg(rui)|
+------------------+
|0.9502173913043478|
+------------------+



In [30]:
dfr.approxQuantile("rui", [0.1, 0.2, 0.3, 0.4, 0.5], 0.25)

[0.024390243902439025, 0.024390243902439025, 1.0, 1.0, 1.0]

In [31]:
dfs.groupBy("event").count().show()

+-----------+-----+
|      event|count|
+-----------+-----+
|transaction|  123|
|  addtocart|  334|
|       view|13387|
+-----------+-----+



In [32]:
def assign_ratings(e):
    
    rating = 0
    if e == "view":
        rating = 2
    else:
        rating = 5
    
    return rating

In [33]:
from pyspark.sql import Row
dfs.select("event").rdd.map(lambda x : assign_ratings(x[0])).map(lambda x: Row(x)).toDF(["ratings"]).show()

+-------+
|ratings|
+-------+
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
|      2|
+-------+
only showing top 20 rows



In [34]:
from pyspark.sql.functions import col, when
dfs = dfs.withColumn("ratings", when(col("event")=='view', 2).otherwise(5))

In [35]:
dfs.rdd.countApproxDistinct()

13620