# Analyzing Wikipedia Clickstream Data

### Import Libraries

In [1]:
from pyspark.sql import SparkSession

## Introduction to Clickstream Data

In [2]:
# Create a new SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate() 

In [3]:
# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd = spark.sparkContext.parallelize( sample_clickstream_counts )

In [4]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd.toDF(["source_page", "target_page", "link_category", "link_count"])

# Display the DataFrame to the notebook
clickstream_sample_df.show()

+--------------------+--------------------+-------------+----------+
|         source_page|         target_page|link_category|link_count|
+--------------------+--------------------+-------------+----------+
|        other-search|Hanging_Gardens_o...|     external|     47000|
|         other-empty|Hanging_Gardens_o...|     external|     34600|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|     14000|
|             Babylon|Hanging_Gardens_o...|         link|      2500|
+--------------------+--------------------+-------------+----------+



## Inspecting Clickstream Data

In [5]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream = spark.read.options(header='True', delimiter='\t', inferSchema = 'True').csv("./cleaned/clickstream/")

# Display the DataFrame to the notebook
clickstream.show()

+-------------------+--------------------+-------------+-------------+-----------+
|           referrer|            resource|link_category|language_code|click_count|
+-------------------+--------------------+-------------+-------------+-----------+
|   Daniel_Day-Lewis|      Phantom_Thread|         link|           en|      43190|
|     other-internal|      Phantom_Thread|     external|           en|      21683|
|        other-empty|      Phantom_Thread|     external|           en|     169532|
|90th_Academy_Awards|      Phantom_Thread|         link|           en|      40449|
|       other-search|      Phantom_Thread|     external|           en|     536940|
|       other-search|Tara_Grinstead_mu...|     external|           en|      30041|
|       other-search|      Yossi_Benayoun|     external|           en|      11045|
|        other-empty|       Parthiv_Patel|     external|           en|      11481|
|       other-search|       Parthiv_Patel|     external|           en|      34953|
|   

In [6]:
# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)



In [7]:
# Drop target columns
clickstream = clickstream.drop("language_code")

# Display the first few rows of the DataFrame
clickstream.show(5)

# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|           referrer|      resource|link_category|click_count|
+-------------------+--------------+-------------+-----------+
|   Daniel_Day-Lewis|Phantom_Thread|         link|      43190|
|     other-internal|Phantom_Thread|     external|      21683|
|        other-empty|Phantom_Thread|     external|     169532|
|90th_Academy_Awards|Phantom_Thread|         link|      40449|
|       other-search|Phantom_Thread|     external|     536940|
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



In [8]:
# Rename `referrer` and `resource` to `source_page` and `target_page` 
clickstream = clickstream.withColumnRenamed("referrer","source_page")
clickstream = clickstream.withColumnRenamed("resource","target_page")
  
# Display the first few rows of the DataFrame
clickstream.show(5)

# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|        source_page|   target_page|link_category|click_count|
+-------------------+--------------+-------------+-----------+
|   Daniel_Day-Lewis|Phantom_Thread|         link|      43190|
|     other-internal|Phantom_Thread|     external|      21683|
|        other-empty|Phantom_Thread|     external|     169532|
|90th_Academy_Awards|Phantom_Thread|         link|      40449|
|       other-search|Phantom_Thread|     external|     536940|
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



## Querying Clickstream Data

In [9]:
# Create a temporary view in the metadata for this `SparkSession` 
clickstream.createOrReplaceTempView('clickstream')
print(clickstream.columns)


['source_page', 'target_page', 'link_category', 'click_count']


In [10]:
# Filter and sort the DataFrame using PySpark DataFrame methods
clickstream.filter(clickstream["target_page"] == "Hanging_Gardens_of_Babylon").sort("click_count").show()

+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|Seven_Wonders_of_...|Hanging_Gardens_o...|         link|      12296|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|      14668|
|         other-empty|Hanging_Gardens_o...|     external|      34619|
|        other-search|Hanging_Gardens_o...|     external|      47088|
+--------------------+--------------------+-------------+-----------+



In [11]:
# Filter and sort the DataFrame using SQL
query = """ SELECT * FROM clickstream WHERE target_page = 'Hanging_Gardens_of_Babylon' ORDER BY click_count """
spark.sql(query).show()

+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|Seven_Wonders_of_...|Hanging_Gardens_o...|         link|      12296|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|      14668|
|         other-empty|Hanging_Gardens_o...|     external|      34619|
|        other-search|Hanging_Gardens_o...|     external|      47088|
+--------------------+--------------------+-------------+-----------+



In [12]:
# Aggregate the DataFrame using PySpark DataFrame Methods 
clickstream.groupBy(clickstream.link_category).sum().show()

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|         link|        97805811|
|        other|         9338172|
|     external|      3248677856|
+-------------+----------------+



In [13]:
# Aggregate the DataFrame using SQL
query = """
SELECT link_category, SUM(click_count) as total_count
FROM clickstream
GROUP BY link_category;
"""
spark.sql(query).show()

+-------------+-----------+
|link_category|total_count|
+-------------+-----------+
|         link|   97805811|
|        other|    9338172|
|     external| 3248677856|
+-------------+-----------+



## Saving Results to Disk

In [18]:
# Create a new DataFrame named `internal_clickstream`
internal_clickstream = clickstream.select("link_category", "source_page", "target_page").filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show()

+-------------+--------------------+--------------------+
|link_category|         source_page|         target_page|
+-------------+--------------------+--------------------+
|         link|    Daniel_Day-Lewis|      Phantom_Thread|
|         link| 90th_Academy_Awards|      Phantom_Thread|
|         link|              Shinee|Kim_Jong-hyun_(si...|
|         link|      Agnyaathavaasi|        Anu_Emmanuel|
|         link|      Naa_Peru_Surya|        Anu_Emmanuel|
|         link|        Mariah_Carey|         Nick_Cannon|
|         link|               Kesha|Rainbow_(Kesha_al...|
|         link|  David_Attenborough|   John_Attenborough|
|         link|            Boney_M.|       Bobby_Farrell|
|         link|The_End_of_the_F*...|      Jessica_Barden|
|         link|   Quentin_Tarantino|   The_Hateful_Eight|
|         link|Ready_Player_One_...|        Olivia_Cooke|
|         link| Royal_Rumble_(2018)|Kevin_Owens_and_S...|
|         link|     Macaulay_Culkin|         Brenda_Song|
|         link

In [20]:
# Save the `internal_clickstream` DataFrame to a series of CSV files
internal_clickstream.write.csv("./results/article_to_article_csv/")

AnalysisException: path file:/home/ccuser/workspace/pyspark-sql-project/results/article_to_article_csv already exists.

In [21]:
# Save the `internal_clickstream` DataFrame to a series of parquet files
internal_clickstream.write.parquet("./results/article_to_article_pq/")

In [23]:
# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()