In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-10-27 15:05:14,457 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
spark = SparkSession.builder \
    .appName("Clickstream") \
    .getOrCreate()

clickstream_df = spark.read.option("header", "true") \
                            .option("delimiter", "\t") \
                            .csv("hdfs:/data/clickstream.csv")

clickstream_df = clickstream_df.withColumn("user_id", clickstream_df["user_id"].cast("int")) \
                                 .withColumn("session_id", clickstream_df["session_id"].cast("int")) \
                                 .withColumn("timestamp", clickstream_df["timestamp"].cast("int"))

                                                                                

In [3]:
clickstream_df = clickstream_df.withColumn("is_error", F.when(F.col("event_type").contains("error"), 1).otherwise(0))

In [4]:
window = Window.partitionBy("user_id", "session_id").orderBy("timestamp")
clickstream_df = clickstream_df.withColumn("error_flag", F.sum("is_error").over(window))

In [5]:
clickstream_df = clickstream_df.withColumn("time_error", F.min(F.when(clickstream_df["is_error"] == 1, "timestamp")).over(window))

In [6]:
cleaned_df = clickstream_df.filter((F.col("error_flag") == 0) |
                                    (F.col("timestamp") < F.col('time_error')))

In [7]:
df = cleaned_df.filter(F.col("event_type") == 'page')

In [8]:
routes_df = df.groupBy("user_id", "session_id") \
                       .agg(F.collect_list("event_page").alias("events")) \
                       .withColumn("route", F.concat_ws("-", "events")) \
                       .select("route")


In [9]:
def remove_duplicates(route):
    parts = route.split('-')
    cleaned_parts = []
    for part in parts:
        if not cleaned_parts or cleaned_parts[-1] != part:
            cleaned_parts.append(part)
    return '-'.join(cleaned_parts)

In [10]:
remove_duplicates_udf = F.udf(remove_duplicates, StringType())

In [12]:
cleaned_routes_df = routes_df.withColumn("route", remove_duplicates_udf(F.col("route"))) \
                              .select("route")

In [13]:
route_counts_df = cleaned_routes_df.groupBy("route") \
                                     .agg(F.count("*").alias("count")) \
                                     .orderBy(F.desc("count"))

In [14]:
top_routes_df = route_counts_df.limit(30)


In [None]:
top_routes_df.show()



In [17]:
with open('top_30.txt', 'w') as file:
    for row in top_routes_df.collect():
        file.write(f"{row['route']}\t{row['count']}\n")