## Wikipedia Clickstream Analysis with PySpark

### Introduction

Understanding user behavior on the web is crucial for uncovering how information flows and how people discover content. In this project, I analyzed Wikipedia's open-source "Clickstream" dataset, which captures user navigation paths by recording clicks between linked Wikipedia articles. This dataset also includes entries for users arriving from outside sources, such as search engines or external websites, providing a comprehensive view of content discovery patterns. 

By harnessing the power of PySpark SQL, I processed and explored this large-scale data, uncovering insights about how users navigate Wikipedia, with a focus on clickstreams leading to the **London** page. This project not only highlights user behavior but also showcases the value of big data tools in drawing meaningful conclusions from raw information.

### Objectives
- **Data Cleaning:** Preprocess and filter raw clickstream data to remove inconsistencies.
- **Data Analysis:** Leverage PySpark SQL to analyze navigation patterns between Wikipedia articles.
- **Visualization:** Present the data using a Sankey diagram to illustrate how users arrive at specific Wikipedia pages.

### Key Insights

One highlight of the analysis was examining the click paths leading to the Wikipedia page for **London**:
- **37,113** users accessed the page through an **external search engine**.
- **4,600** users arrived from the **United Kingdom** Wikipedia article.
- **2,909** clicks originated from Wikipedia's **Main Page**.
- **2,261** users came from the **England** Wikipedia page.

The data clearly shows that search engines play a crucial role in driving traffic to Wikipedia, while internal links also contribute significantly to content discovery.

### Tools and Technologies Used
- **PySpark SQL:** For scalable data processing and querying.
- **Jupyter Notebook:** To interactively clean, analyze, and visualize data.
- **Sankey Diagrams:** To map out user click paths visually and intuitively.



### Project Status

This project is currently a work in progress, with continuous improvements planned. Over the coming days, I will be adding new analyses, refining visualizations, and incorporating additional insights to further enhance the depth and clarity of the findings.

---


### Import Libraries

In [3]:
from pyspark.sql import SparkSession

## Introduction to Clickstream Data

Create a new `SparkSession` and assign it to a variable named `spark`.

In [6]:
# Create a new SparkSession
spark = spark = SparkSession.builder.getOrCreate()

Create an RDD from a list of sample clickstream counts and save it as `clickstream_counts_rdd`.

In [8]:
# Create an RDD from a list of sample clickstream count
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

clickstream_counts_rdd = spark.sparkContext.parallelize(sample_clickstream_counts)

Using the RDD from the previous step, create a DataFrame named `clickstream_sample_df`

In [20]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd.toDF(["Source Page", "Target Pge",  "Link Category", "Link Count"])

# Display the DataFrame to the notebook
clickstream_sample_df.show(5, truncate=False)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (LAPTOP-9IF7K86H executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more


## Inspecting Clickstream Data

Read the files in `./cleaned/clickstream/` into a new Spark DataFrame named `clickstream` and display the first few rows of the DataFrame in the notebook

In [None]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream = spark.read \
    .option('header', True) \
    .option('delimiter', '\t') \
    .option('inferSchema', True) \
    .csv("./cleaned/clickstream/")

# Display the DataFrame to the notebook
clickstream.show(5, truncate=False)

Print the schema of the DataFrame in the notebook.

In [None]:
# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()


Drop the `language_code` column from the DataFrame and display the new schema in the notebook.

In [None]:
# Drop target columns
clickstream = clickstream.drop("language_code")

# Display the first few rows of the DataFrame and the new schema in the notebook
clickstream.show(5, truncate=False)
clickstream.printSchema()

Rename `referrer` and `resource` to `source_page` and `target_page`, respectively,

In [None]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream.withColumnRenamed("referrer", "source_page").withColumnRenamed("resource", "target_page")
  
# Display the first few rows of the DataFrame and the new schema in the notebook
clickstream.show(5, truncate=False)
clickstream.printSchema()

## Querying Clickstream Data

Add the `clickstream` DataFrame as a temporary view named `clickstream` to make the data queryable with `sparkSession.sql()`

In [None]:
# Create a temporary view in the metadata for this `SparkSession` to make the data
# queryable with `sparkSession.sql()`
clickstream.createOrReplaceTempView("clickstream")

### Task 9

Filter the dataset to entries with `Hanging_Gardens_of_Babylon` as the `target_page` and order the result by `click_count` using PySpark DataFrame methods.

In [None]:
# Filter and sort the DataFrame using PySpark DataFrame methods
clickstream\
    .filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
    .orderBy('click_count', ascending=False)\
    .show(10, truncate=False)

### Task 10

Perform the same analysis as the previous exercise using a SQL query. 

In [None]:
# Filter and sort the DataFrame using SQL
spark.sql(
    """
    SELECT *
    FROM clickstream
    WHERE target_page = 'Hanging_Gardens_of_Babylon'
    ORDER BY click_count DESC
    """
).show(10, truncate=False)

### Task 11

Calculate the sum of `click_count` grouped by `link_category` using PySpark DataFrame methods.

In [None]:
# Aggregate the DataFrame using PySpark DataFrame Methods 
clickstream\
    .groupBy('link_category')\
    .sum()\
    .show(truncate=False)

### Task 12

Perform the same analysis as the previous exercise using a SQL query.

In [None]:
# Aggregate the DataFrame using SQL
spark.sql(
    """
    SELECT link_category, SUM(click_count) FROM clickstream
    GROUP BY link_category
    """
).show(truncate=False)

## Task Group 4 - Saving Results to Disk

### Task 13

Let's create a new DataFrame named `internal_clickstream` that only contains article pairs where `link_category` is `link`. Use `filter()` to select rows to a specific condition and `select()` to choose which columns to return from the query.

In [None]:
# Create a new DataFrame (named `internal_clickstream`) using `filter` to select rows to 
# a specific condition and `select` to choose which columns to return from the query.
internal_clickstream = clickstream\
    .select(["source_page", "target_page", "click_count"])\
    .filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(truncate=False)

### Task 14

Using `DataFrame.write.csv()`, save the `internal_clickstream` DataFrame as CSV files in a directory called `./results/article_to_article_csv/`.

In [None]:
# Save the `internal_clickstream` DataFrame to a series of CSV files in `./results/article_links_csv/`
# with `DataFrame.write.csv()`
internal_clickstream\
    .write\
    .csv('./results/article_links_csv/', mode="overwrite")

### Task 15

Using `DataFrame.write.parquet()`, save the `internal_clickstream` DataFrame as parquet files in a directory called `./results/article_to_article_pq/`.

In [None]:
# Save the `internal_clickstream` DataFrame to a series of parquet files in `./results/article_links_parquet/`
# with `DataFrame.write.parquet()`

internal_clickstream\
    .write\
    .parquet('./results/article_links_parquet/', mode="overwrite")

### Task 16

Close the `SparkSession` and underlying `SparkContext`. What happens if you we call `clickstream.show()` after closing the `SparkSession`?

In [None]:
# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()

### Conclusion

This project provided a hands-on opportunity to work with large datasets and apply big data techniques using PySpark. Analyzing Wikipedia's clickstream data offered valuable insights into how users discover content — whether through internal navigation or external search engines. The use of visualizations, such as Sankey diagrams, helped make these patterns more tangible and easier to interpret.

Beyond the technical skills gained, this project reinforced the importance of data-driven storytelling — transforming raw data into clear, actionable insights. It stands as a strong addition to my data science portfolio, demonstrating my ability to process big data, extract meaningful patterns, and present findings effectively.
