In [1]:
### Import Libraries

from pyspark.sql import SparkSession

ModuleNotFoundError: No module named 'pyspark'

In [None]:
### Create a new SparkSession and assign it to a variable named spark

spark = SparkSession.builder.getOrCreate()

In [None]:
### Create an RDD from a list of sample clickstream counts and save it as clickstream_counts_rdd.

# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd = spark.sparkContext.parallelize(sample_clickstream_counts)

In [None]:
### Using the RDD from the previous step, create a DataFrame named clickstream_sample_df

# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd.toDF(['source_page', 'target_page', 'link_category','link_count']) 

# Display the DataFrame to the notebook
clickstream_sample_df.show(5)

In [None]:
### Read the files in ./cleaned/clickstream/ into a new Spark DataFrame named clickstream
### and display the first few rows of the DataFrame in the notebook

# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream = spark.read \
    .option('header', True) \
    .option('delimiter', '\t') \
    .option('inferSchema', True) \
    .csv("./cleaned/clickstream/")

# Display the DataFrame to the notebook
clickstream.show(5, truncate=False)

In [None]:
### Print the schema of the DataFrame in the notebook.

# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()

In [None]:
### Drop the language_code column from the DataFrame and display the new schema in the notebook

# Drop target columns
clickstream = clickstream.drop('language_code')

# Display the first few rows of the DataFrame
clickstream.show(5)
# Display the new schema in the notebook
clickstream.printSchema()

In [None]:
### Rename `referrer` and `resource` to `source_page` and `target_page`, respectively,

# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream\
.withColumnRenamed('referrer', 'source_page')\
.withColumnRenamed('resource','target_page')
  
# Display the first few rows of the DataFrame
clickstream.show(5, truncate=False)
# Display the new schema in the notebook
clickstream.printSchema()

In [None]:
### Add the clickstream DataFrame as a temporary view named clickstream
### to make the data queryable with sparkSession.sql()

# Create a temporary view in the metadata for this `SparkSession` 
clickstream.createOrReplaceTempView('clickstream')

In [None]:
### Filter the dataset to entries with Hanging_Gardens_of_Babylon
### as the target_page and order the result by click_count using
### PySpark DataFrame methods.

# Filter and sort the DataFrame using PySpark DataFrame methods
clickstream\
.filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
.sort('click_count', ascending=False)\
.show(10, truncate=False)

In [None]:
### Perform the same analysis as the previous exercise using a SQL query.

# Filter and sort the DataFrame using SQL
spark.sql(
"""
SELECT * 
FROM clickstream
WHERE target_page == 'Hanging_Gardens_of_Babylon'
ORDER BY click_count DESC
"""
).show(10, truncate=False)

In [None]:
### Calculate the sum of click_count grouped by link_category using PySpark DataFrame methods.

# Aggregate the DataFrame using SQL
spark.sql(
"""
SELECT link_category, sum(click_count)
FROM clickstream
GROUP BY link_category
""").show(10, truncate=False)

In [None]:
### Let's create a new DataFrame named internal_clickstream that only contains article pairs where link_category is link.

# Create a new DataFrame (named `internal_clickstream`) using `filter` to select rows to 
# a specific condition and `select` to choose which columns to return from the query.
internal_clickstream = clickstream\
    .select(["source_page", "target_page", "click_count"])\
    .filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(truncate=False)

In [None]:
### Save the internal_clickstream DataFrame as CSV files in a directory called ./results/article_to_article_csv/

# Save the `internal_clickstream` DataFrame to a series of CSV files in `./results/article_links_csv/`
# with `DataFrame.write.csv()`
internal_clickstream\
    .write\
    .csv('./results/article_links_csv/', mode="overwrite")

In [None]:
### Save the internal_clickstream DataFrame as parquet files in a directory called ./results/article_to_article_pq/

# Save the `internal_clickstream` DataFrame to a series of parquet files in `./results/article_links_parquet/`
# with `DataFrame.write.parquet()`

internal_clickstream\
    .write\
    .parquet('./results/article_links_parquet/', mode="overwrite")

In [None]:
### Close the SparkSession and underlying sparkContext

# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()