# Analyzing Wikipedia Clickstream Data

### Import Libraries

In [1]:
from pyspark.sql import SparkSession

ModuleNotFoundError: No module named 'pyspark'

## Introduction to Clickstream Data

In [None]:
# Create a new SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

In [None]:
# Create an RDD from a list of sample clickstream count
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

clickstream_counts_rdd = spark.sparkContext.parallelize(
    sample_clickstream_counts
)

In [None]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd\
    .toDF(["source_page", "target_page",  "link_category", "link_count"])

# Display the DataFrame to the notebook
clickstream_sample_df.show(5, truncate=False)

## Inspecting Clickstream Data

In [None]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream = spark.read \
    .option('header', True) \
    .option('delimiter', '\t') \
    .option('inferSchema', True) \
    .csv("clickstream")

# Display the DataFrame to the notebook
clickstream.show(5, truncate=False)

In [None]:
# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()

In [None]:
# Drop target columns
clickstream = clickstream.drop("language_code")

# Display the first few rows of the DataFrame and the new schema in the notebook
clickstream.show(5, truncate=False)
clickstream.printSchema()

In [None]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream\
    .withColumnRenamed("referrer", "source_page")\
    .withColumnRenamed("resource", "target_page")
  
# Display the first few rows of the DataFrame and the new schema in the notebook
clickstream.show(5, truncate=False)
clickstream.printSchema()

## Querying Clickstream Data

In [None]:
# Create a temporary view in the metadata for this `SparkSession` to make the data
# queryable with `sparkSession.sql()`
clickstream.createOrReplaceTempView("clickstream")

In [None]:
# Filter and sort the DataFrame using PySpark DataFrame methods
clickstream\
    .filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
    .orderBy('click_count', ascending=False)\
    .show(10, truncate=False)

In [None]:
# Filter and sort the DataFrame using SQL
spark.sql(
    """
    SELECT *
    FROM clickstream
    WHERE target_page = 'Hanging_Gardens_of_Babylon'
    ORDER BY click_count DESC
    """
).show(10, truncate=False)

In [None]:
# Aggregate the DataFrame using PySpark DataFrame Methods 
clickstream\
    .groupBy('link_category')\
    .sum()\
    .show(truncate=False)

In [None]:
# Aggregate the DataFrame using SQL
spark.sql(
    """
    SELECT link_category, SUM(click_count) FROM clickstream
    GROUP BY link_category
    """
).show(truncate=False)

## Saving Results to Disk

In [None]:
# Create a new DataFrame (named `internal_clickstream`) using `filter` to select rows to 
# a specific condition and `select` to choose which columns to return from the query.
internal_clickstream = clickstream\
    .select(["source_page", "target_page", "click_count"])\
    .filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(truncate=False)

In [None]:
# Save the `internal_clickstream` DataFrame to a series of CSV files in `./results/article_links_csv/`
# with `DataFrame.write.csv()`
internal_clickstream\
    .write\
    .csv('article_links_csv', mode="overwrite")

In [None]:
# Save the `internal_clickstream` DataFrame to a series of parquet files in `./results/article_links_parquet/`
# with `DataFrame.write.parquet()`

internal_clickstream\
    .write\
    .parquet('article_links_parquet', mode="overwrite")

In [None]:
# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()

In [None]:
# The SparkSession and sparkContext are stopped; the following line will throw 
# an error:
clickstream.show()