# Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import warnings
warnings.simplefilter('ignore')

# Spark Session Init

In [2]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

23/12/14 11:29:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Data Ingestion

In [4]:
path = "dataset1.csv"
pages_df = spark.read.option("header",True).csv(path)
pages_df.show(5)

+-------+----------+----------+------+
|   user|session_id| timestamp|  page|
+-------+----------+----------+------+
|U770487|   S126225|1665870305| Page4|
|U770487|   S126225|1665870538| Page3|
|U770487|   S126225|1665870647| Page9|
|U770487|   S126225|1665870741|Page10|
|U770487|   S126225|1665871178| Page1|
+-------+----------+----------+------+
only showing top 5 rows



In [5]:
path = "dataset2.csv"
trans_df = spark.read.option("header",True).csv(path)
trans_df.show(5)

+-------+----------+----------+-----------+
|   user|session_id| timestamp|transaction|
+-------+----------+----------+-----------+
|U770487|   S126225|1665871214|          A|
|U770487|   S198246|1648324960|          B|
|U770487|   S717889|1650322553|          A|
|U770487|   S456778|1650312309|          B|
|U770487|   S476417|1669426490|          A|
+-------+----------+----------+-----------+
only showing top 5 rows



## Initial checks for count of rows and nulls

In [7]:
pages_df.count()

50000

In [8]:
pages_df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in pages_df.columns]).show()

[Stage 8:>                                                          (0 + 1) / 1]

+----+----------+---------+----+
|user|session_id|timestamp|page|
+----+----------+---------+----+
|   0|         0|        0|   0|
+----+----------+---------+----+



                                                                                

In [9]:
trans_df.count()

8997

In [10]:
trans_df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in trans_df.columns]).show()

+----+----------+---------+-----------+
|user|session_id|timestamp|transaction|
+----+----------+---------+-----------+
|   0|         0|        0|          0|
+----+----------+---------+-----------+



# Which are the most popular pages of the e-shop?


In [32]:
popular_df = pages_df.groupBy(['page']) \
  .agg(F.count("*").alias("max_hits")) \
  .sort(F.desc("max_hits"))
popular_df.show()

+------+--------+
|  page|max_hits|
+------+--------+
| Page8|    5071|
| Page2|    5050|
| Page7|    5045|
| Page5|    5021|
| Page4|    5001|
| Page6|    4995|
| Page3|    4991|
| Page1|    4981|
|Page10|    4961|
| Page9|    4884|
+------+--------+



                                                                                

# How many users visit these pages?

In [34]:
users_df = pages_df.groupBy(['page']) \
  .agg(F.countDistinct("user").alias("users")) \
  .sort(F.desc("users"))
users_df.show()



+------+-----+
|  page|users|
+------+-----+
|Page10|  569|
| Page2|  566|
| Page7|  566|
| Page9|  564|
| Page6|  562|
| Page5|  561|
| Page4|  561|
| Page3|  560|
| Page8|  560|
| Page1|  555|
+------+-----+



                                                                                

# How many transactions are performed on each page?

At this step ww need to left join pages dataset with transactions on user and session id and bring transaction times

In [58]:
joined_df = pages_df.join(trans_df,['user','session_id'],'left').select(pages_df['*'],trans_df['transaction'],trans_df['timestamp'].alias('trans_time'))
joined_df.show()

+-------+----------+----------+------+-----------+----------+
|   user|session_id| timestamp|  page|transaction|trans_time|
+-------+----------+----------+------+-----------+----------+
|U770487|   S126225|1665870305| Page4|          A|1665871214|
|U770487|   S126225|1665870538| Page3|          A|1665871214|
|U770487|   S126225|1665870647| Page9|          A|1665871214|
|U770487|   S126225|1665870741|Page10|          A|1665871214|
|U770487|   S126225|1665871178| Page1|          A|1665871214|
|U770487|   S198246|1648324273| Page9|          B|1648324960|
|U770487|   S198246|1648324305| Page9|          B|1648324960|
|U770487|   S198246|1648324513| Page9|          B|1648324960|
|U770487|   S198246|1648324947| Page4|          B|1648324960|
|U770487|   S717889|1650322531| Page3|          A|1650322553|
|U770487|   S456778|1650311815| Page4|          B|1650312309|
|U770487|   S456778|1650312164| Page2|          B|1650312309|
|U770487|   S456778|1650312263| Page7|          B|1650312309|
|U770487

Checking rows with no transactions

In [59]:
joined_df.filter(F.col('transaction').isNull()).count()

262

In [61]:
transactions_count_df = joined_df.groupBy(['page']) \
  .agg(F.count("transaction").alias("transactions")) \
  .sort(F.desc("transactions"))
transactions_count_df.show()

+------+------------+
|  page|transactions|
+------+------------+
| Page8|        5039|
| Page2|        5033|
| Page7|        5023|
| Page5|        4998|
| Page4|        4975|
| Page6|        4968|
| Page3|        4967|
| Page1|        4956|
|Page10|        4930|
| Page9|        4849|
+------+------------+



                                                                                

# What is the average time to purchase for a user?

In [67]:
joined_df.printSchema()

root
 |-- user: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- page: string (nullable = true)
 |-- transaction: string (nullable = true)
 |-- trans_time: string (nullable = true)
 |-- purchase_time: double (nullable = true)



We create a  new column to calculate purchase time , by substracting timestamp from trans_time for given combo of user,session and page

In [71]:
joined_df = joined_df.withColumn('purchase_time',(F.col('trans_time') -F.col('timestamp')))
joined_df.show()

+-------+----------+----------+------+-----------+----------+-------------+
|   user|session_id| timestamp|  page|transaction|trans_time|purchase_time|
+-------+----------+----------+------+-----------+----------+-------------+
|U770487|   S126225|1665870305| Page4|          A|1665871214|        909.0|
|U770487|   S126225|1665870538| Page3|          A|1665871214|        676.0|
|U770487|   S126225|1665870647| Page9|          A|1665871214|        567.0|
|U770487|   S126225|1665870741|Page10|          A|1665871214|        473.0|
|U770487|   S126225|1665871178| Page1|          A|1665871214|         36.0|
|U770487|   S198246|1648324273| Page9|          B|1648324960|        687.0|
|U770487|   S198246|1648324305| Page9|          B|1648324960|        655.0|
|U770487|   S198246|1648324513| Page9|          B|1648324960|        447.0|
|U770487|   S198246|1648324947| Page4|          B|1648324960|         13.0|
|U770487|   S717889|1650322531| Page3|          A|1650322553|         22.0|
|U770487|   

At first level of calculation, we group by 'user','session_id','page' and calculatepurchase_time

In [79]:
combo_average_df = joined_df.groupBy(['user','session_id','page']).agg(F.avg('purchase_time').alias("average_combo_purchase_time"))
combo_average_df.show()

+-------+----------+------+---------------------------+
|   user|session_id|  page|average_combo_purchase_time|
+-------+----------+------+---------------------------+
|U521947|   S882169| Page2|                      376.0|
|U787584|   S348235|Page10|                      949.0|
|U197966|   S393308| Page1|                      210.5|
|U722651|   S219490|Page10|         1213.6666666666667|
|U493396|   S356355| Page5|                      780.0|
|U642347|   S904092| Page4|                      831.0|
|U876936|   S297509| Page6|                     1995.0|
|U578073|   S954551| Page2|                      928.0|
|U507777|   S663123| Page6|                      277.0|
|U725510|   S622998| Page3|                       21.0|
|U657487|   S322167| Page3|                      317.0|
|U155663|   S419739| Page7|                      922.0|
|U169268|   S587521| Page9|                      119.0|
|U169268|   S761297| Page9|                      652.0|
|U820420|   S386643|Page10|                     

At second level , we group only by user the previous purchase_time to bring the desired results

In [81]:
user_avg_purc_time_df = combo_average_df.groupBy(['user']).agg(F.avg('average_combo_purchase_time').alias("average_purchase_time")).sort(F.asc("user"))
user_avg_purc_time_df.show()



+-------+---------------------+
|   user|average_purchase_time|
+-------+---------------------+
|U100126|   1375.0208333333335|
|U101223|    748.3083333333333|
|U102215|    763.8333333333334|
|U102528|    804.8846153846154|
|U102561|    887.3663194444445|
|U104917|   1223.4516129032259|
|U107804|     924.452380952381|
|U108680|    838.0573770491803|
|U111288|    906.3151515151516|
|U112017|    942.3488372093024|
|U113847|    871.3261904761906|
|U114317|    810.4204545454545|
|U114356|    820.5915492957746|
|U114895|    949.6336477987422|
|U120390|    951.3958333333333|
|U121567|           821.140625|
|U122786|  -1126934.2703252032|
|U127437|    989.3194444444443|
|U129169|    997.5764367816091|
|U130581|    834.7551440329219|
+-------+---------------------+
only showing top 20 rows



                                                                                

# Write to postgres

In [None]:
user_avg_purc_time_df..write.format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/sample_db") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "average_usage") \
    .option("user", "<user>").option("password", "<password>").save()