In [174]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, countDistinct, to_date, first, last, lead, lag
from pyspark.sql.types import StringType, StructField, IntegerType, StructType, TimestampType

# Initialize Spark session

In [175]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("User Click Data ETL") \
    .getOrCreate()

#creating file path

In [176]:
file_path="user_click_data1.json"

In [177]:
schema_df = StructType([
    StructField("browser", StringType(), True),
    StructField("city", StringType(), True),
    StructField("click_event_id", IntegerType(), False),
    StructField("country", StringType(), True),
    StructField("device", StringType(), True),
    StructField("ip_address", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("url", StringType(), True),
    StructField("user_id", StringType(), False)
])

# reading the file path if exists are not

In [178]:
try:
    user_df = spark.read.json(file_path, schema=schema_df)
except Exception as e:
    print(e)

# Correct data types

In [179]:
user_df = user_df.withColumn("event_date", to_date(col("timestamp")))
user_df = user_df.withColumn("user_id", col("user_id").cast(IntegerType()))

In [180]:
from pyspark.sql.window import Window
spec =  Window.partitionBy("country", "event_date", "user_id").orderBy("timestamp")

In [181]:
user_df = user_df.withColumn("last_click", lead(col("timestamp")).over(spec))
user_df = user_df.withColumn("last_1", last(col("timestamp")).over(spec))

# represents a random time spent in minutes

In [182]:
user_df = user_df.withColumn("time_spent", (col("last_click").cast("long") - col("timestamp").cast("long"))/60)
user_df = user_df.withColumn("time_1", (col("last_1").cast("long") - col("timestamp").cast("long"))/60)

user_df.write.csv("test.csv", mode="overwrite", header=True)

# Group by URL, country, and date and aggregate

In [183]:
agg_df = user_df.groupBy("url", "country", "event_date").agg(
    avg("time_spent").alias("average_minutes_spent"), avg("time_1").alias("average_1")
    countDistinct("user_id").alias("unique_users_count"),
    count("click_event_id").alias("click_count")
)


In [184]:
agg_df.write.csv("final_result.cvs", mode="overwrite")

# Show the result

In [185]:
agg_df[agg_df["url"] == "www.globalmart.com/product-id-23"].show()

+--------------------+-------+----------+---------------------+------------------+-----------+
|                 url|country|event_date|average_minutes_spent|unique_users_count|click_count|
+--------------------+-------+----------+---------------------+------------------+-----------+
|www.globalmart.co...|England|2023-05-20|                 NULL|                 6|          6|
|www.globalmart.co...|  Italy|2023-05-20|    6.933333333333334|                 4|          5|
|www.globalmart.co...|    USA|2023-05-20|   17.483333333333334|                 5|          6|
|www.globalmart.co...|  India|2023-05-20|                 NULL|                 4|          4|
|www.globalmart.co...|  Spain|2023-05-20|                 NULL|                 1|          1|
+--------------------+-------+----------+---------------------+------------------+-----------+

