In [10]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, countDistinct, to_date
from pyspark.sql.functions import lit
from pyspark.sql.functions import  from_json, from_unixtime, date_format

# Initialize Spark session

In [11]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("User Click Data ETL") \
    .getOrCreate()


#creating file path

In [12]:
file_path="user_click_data1.json"

# reading the file path if exists are not

In [13]:
if os.path.exists(file_path):
    print("File exists")
    user_df = spark.read.json(file_path)
    user_df.show()
else:
    print("File does not exist at the specified path")

File exists
+--------------+------------+--------------+-------+------+---------------+-------------------+--------------------+-------+
|       browser|        city|click_event_id|country|device|     ip_address|          timestamp|                 url|user_id|
+--------------+------------+--------------+-------+------+---------------+-------------------+--------------------+-------+
|Firefox Mobile|      Jaipur|           843|  India|Mobile| 240.246.150.88|2023-05-20 08:45:39|www.globalmart.co...| 525707|
|Firefox Mobile|      Jaipur|           844|  India|Mobile| 240.246.150.88|2023-05-20 08:49:34|www.globalmart.co...| 525707|
|        Safari|Philadelphia|           905|    USA|Mobile| 46.147.149.245|2023-05-20 08:51:44|www.globalmart.co...| 525679|
|        Safari|Philadelphia|           906|    USA|Mobile| 46.147.149.245|2023-05-20 08:56:27|www.globalmart.co...| 525679|
|        Safari|    Zaragoza|             2|  Spain|Mobile|  59.243.217.35|2023-05-20 08:33:31|www.globalmart.co.

# Read the JSON data

In [14]:
user_df = spark.read \
    .json(file_path)

# Correct data types

In [15]:
user1_df = user_df.withColumn("timestamp", col("timestamp").cast("timestamp"))
user_final_df = user1_df.withColumn("event_date", date_format(col("timestamp"), "MM-dd-yyyy"))

# represents a random time spent in minutes

In [16]:
result_df = user_final_df.withColumn("time_spent", col("timestamp").cast("long") / 60)  

# Group by URL, country, and date and aggregate

In [17]:
agg_df = result_df.groupBy("url", "country", "event_date").agg(
    avg("time_spent").alias("average_minutes_spent"),
    countDistinct("user_id").alias("unique_users_count"),
    count("click_event_id").alias("click_count")
)


# Show the result

In [18]:
agg_df.show()

+--------------------+-------+----------+---------------------+------------------+-----------+
|                 url|country|event_date|average_minutes_spent|unique_users_count|click_count|
+--------------------+-------+----------+---------------------+------------------+-----------+
|www.globalmart.co...|  India|05-20-2023| 2.8075883316666666E7|                 2|          2|
|www.globalmart.co...|England|05-20-2023| 2.8075875325000003E7|                 4|          4|
|www.globalmart.co...|    USA|05-20-2023|       2.8075887475E7|                 1|          2|
|www.globalmart.co...|  India|05-20-2023| 2.8075877410000004E7|                 4|          5|
|www.globalmart.co...|England|05-20-2023| 2.8075884174999997E7|                 2|          2|
|www.globalmart.co...|  Spain|05-20-2023| 2.8075890683333334E7|                 1|          1|
|www.globalmart.co...|England|05-20-2023|         2.80758705E7|                 1|          1|
|www.globalmart.co...|    USA|05-20-2023| 2.807588