In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()

In [10]:
#reading clickstream data
clickstream_df=spark.read.json("jobathon_click_data.json")
clickstream_df.show(20)

+--------------------+--------------------+--------------------+----------+--------------------+
|          browser_id|    client_side_data|     event_date_time|event_type|          session_id|
+--------------------+--------------------+--------------------+----------+--------------------+
|np1VSCs0e0h5utWYw...|[https://www.gosh...|2022-08-02 08:41:...|  pageload|BXaHGXYlHJjVcyx49...|
|ulFDwWtfUxBWu9TKE...|[https://www.gosh...|2022-08-02 05:49:...|  pageload|HSCp4uJWTSUt8whLn...|
|5ApjGIfsqzi66BJeC...|[https://www.gosh...|2022-08-02 04:01:...|  pageload|ks2q6XlyWvlEOuBVP...|
|fmhMaS22p4KeDjH60...|[https://www.gosh...|2022-07-31 03:08:...|  pageload|7yQ7NY80yCBoyvoWf...|
|fmhMaS22p4KeDjH60...|[https://www.gosh...|2022-07-31 03:09:...|  pageload|7yQ7NY80yCBoyvoWf...|
|fmhMaS22p4KeDjH60...|[https://www.gosh...|2022-07-31 03:09:...|  pageload|7yQ7NY80yCBoyvoWf...|
|fmhMaS22p4KeDjH60...|[https://www.gosh...|2022-07-31 03:09:...|  pageload|7yQ7NY80yCBoyvoWf...|
|fmhMaS22p4KeDjH60...|[https:/

In [4]:
clickstream_df.printSchema()

root
 |-- browser_id: string (nullable = true)
 |-- client_side_data: struct (nullable = true)
 |    |-- current_page_url: string (nullable = true)
 |    |-- time_elapsed: string (nullable = true)
 |-- event_date_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- session_id: string (nullable = true)



In [16]:
#parsing struct type and converting time_elapsed to timestamp
from pyspark.sql.functions import * 
clickstream_df=clickstream_df.select("browser_id",col("client_side_data.current_page_url").alias("current_page_url"),
                                     to_timestamp("client_side_data.time_elapsed",'yyyy-MM-dd HH:mm:ss.SSSSSS').alias("time_elapsed"),
                                    "event_date_time","event_type","session_id")

In [11]:
#reading login data
from pyspark.sql.types import *
schema2=StructType([StructField("login_date_time",StringType(),True),StructField("session_id",StringType(),True),StructField("user_id",StringType(),True)])
login_df=spark.read.csv("user_login.csv",header=True,schema=schema2)
login_df.show(10)

+--------------------+--------------------+----------+
|     login_date_time|          session_id|   user_id|
+--------------------+--------------------+----------+
|2022-08-08 06:15:...|tgUTPWVyxNLvAtQwW...|32000003E8|
|2022-08-07 13:43:...|bxXEi5r6TUO5mqeyL...|       55B|
|2022-08-01 12:06:...|0uP51gKf3HO99MbeV...|2400001CD0|
|2022-08-05 07:37:...|J9AjSB2fAbjsZU2wL...|2C00000508|
|2022-08-03 04:08:...|n1AAr54bkvS8sHN8w...|1400001B2D|
|2022-08-07 15:56:...|Wov1oYcoOJ8ftDxyR...| E00000D43|
|2022-08-03 03:34:...|47gJpjnYE3L7zUAz0...| 400000439|
|2022-08-03 08:29:...|c3bMejhZfyWwNEun2...|2000000515|
|2022-08-03 09:15:...|y7GE4B1F6z0zFlePJ...|2200000522|
|2022-08-04 11:45:...|NOJvuWTBuOViOI4yH...|18000017CE|
+--------------------+--------------------+----------+
only showing top 10 rows



In [7]:
login_df.printSchema()

root
 |-- login_date_time: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- user_id: string (nullable = true)



In [17]:
#joining clicksteam and login data using session id
df1=clickstream_df.join(login_df,"session_id","left").select(clickstream_df["*"],"login_date_time","user_id")


# Derive click_flag, pageload_flag to calcualate click and pageload events, convert event_Date_time to date
df2=df1.select("*",when(col("event_type")=='click',lit(1)).otherwise(lit(0)).alias("click_flag"),
              when(col("event_type")=='pageload',lit(1)).otherwise(lit(0)).alias("pageload_flag"),
              col("event_date_time").cast("date").alias("date_of_click"))


#using window function to calculate first_url, number_of_clicks and number_of_pageloads
from pyspark.sql.window import Window
windowspec=Window.partitionBy("user_id","date_of_click","browser_id").orderBy("event_date_time")
windowspec2=Window.partitionBy("user_id","date_of_click","browser_id")


df3=df2.select(row_number().over(windowspec).alias("rnum"),
              sum("click_flag").over(windowspec2).alias("number_of_clicks"),
               sum("pageload_flag").over(windowspec2).alias("number_of_pageloads"),"*",
               when(col("user_id").isNotNull(),lit(1)).otherwise(lit(0)).alias("logged_in"),"date_of_click")


#renaming the columns
final_df=df3.where(col("rnum")==1).selectExpr("date_of_click as current_date","browser_id",
                                                                       "user_id","logged_in","current_page_url as first_url",
                                                                       "number_of_clicks","number_of_pageloads")

In [19]:
final_df.show(20)

+------------+--------------------+-------+---------+--------------------+----------------+-------------------+
|current_date|          browser_id|user_id|logged_in|           first_url|number_of_clicks|number_of_pageloads|
+------------+--------------------+-------+---------+--------------------+----------------+-------------------+
|  2022-07-31|1M94SXg2O0y3UGezj...|   null|        0|https://www.gosho...|               0|                  1|
|  2022-07-31|1QV3KizAo7lmgStLZ...|   null|        0|https://www.gosho...|              15|                  0|
|  2022-07-31|1anUiBdg7X8GgTAkb...|   null|        0|https://www.gosho...|               0|                  1|
|  2022-07-31|28OulKrS1uXWi2Opg...|   null|        0|https://www.gosho...|               1|                  0|
|  2022-07-31|2b5MTJKLywDvVsB7H...|   null|        0|https://www.gosho...|               0|                  1|
|  2022-07-31|39bI6swZJjy8x9VyX...|   null|        0|https://www.gosho...|               1|             

In [21]:
final_df.count()

305456