# Analyzing Wikipedia Clickstream Data

* [Project Page Link](https://www.codecademy.com/courses/big-data-pyspark/projects/analyzing-wikipedia-pyspark)

### Import Libraries

In [38]:
from pyspark.sql import SparkSession

## Task Group 1 - Introduction to Clickstream Data

### Task 1
Create a new `SparkSession` and assign it to a variable named `spark`.

In [39]:
# Create a new SparkSession
spark = SparkSession.builder.appName('LoadCSV').getOrCreate()

# Get SparkContext from SparkSession
sc = spark.sparkContext

### Task 2

Create an RDD from a list of sample clickstream counts and save it as `clickstream_counts_rdd`.

In [40]:
# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd = sc.parallelize(sample_clickstream_counts)
clickstream_counts_rdd.collect()

[['other-search', 'Hanging_Gardens_of_Babylon', 'external', 47000],
 ['other-empty', 'Hanging_Gardens_of_Babylon', 'external', 34600],
 ['Wonders_of_the_World', 'Hanging_Gardens_of_Babylon', 'link', 14000],
 ['Babylon', 'Hanging_Gardens_of_Babylon', 'link', 2500]]

### Task 3

Using the RDD from the previous step, create a DataFrame named `clickstream_sample_df`

In [41]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd.toDF(["source_page", "target_page",  "link_category", "link_count"])

# Display the DataFrame to the notebook
clickstream_sample_df.show(4, truncate=False)

+--------------------+--------------------------+-------------+----------+
|source_page         |target_page               |link_category|link_count|
+--------------------+--------------------------+-------------+----------+
|other-search        |Hanging_Gardens_of_Babylon|external     |47000     |
|other-empty         |Hanging_Gardens_of_Babylon|external     |34600     |
|Wonders_of_the_World|Hanging_Gardens_of_Babylon|link         |14000     |
|Babylon             |Hanging_Gardens_of_Babylon|link         |2500      |
+--------------------+--------------------------+-------------+----------+



## Task Group 2 - Inspecting Clickstream Data

### Task 4

Read the files in `./cleaned/clickstream/` into a new Spark DataFrame named `clickstream` and display the first few rows of the DataFrame in the notebook

In [54]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream = spark.read.option('header', True).option('delimiter', '\t').option('inferSchema', True).csv('wiki_test_file.csv')

# Display the DataFrame to the notebook
clickstream.show(15, truncate=False)

+-------------------+--------------------------+-------------+-------------+-----------+
|referrer           |resource                  |link_category|language_code|click_count|
+-------------------+--------------------------+-------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread            |link         |en           |43190      |
|other-internal     |Phantom_Thread            |external     |en           |21683      |
|other-empty        |Phantom_Thread            |external     |en           |169532     |
|90th_Academy_Awards|Phantom_Thread            |link         |en           |40449      |
|other-search       |Phantom_Thread            |external     |en           |536940     |
|other-search       |Tara_Grinstead_murder_case|external     |en           |30041      |
|other-search       |Yossi_Benayoun            |external     |en           |11045      |
|other-empty        |Parthiv_Patel             |external     |en           |11481      |
|other-search       |

### Task 5

Print the schema of the DataFrame in the notebook.

In [43]:
# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)



### Task 6

Drop the `language_code` column from the DataFrame and display the new schema in the notebook.

In [44]:
# Drop target columns
clickstream = clickstream.drop('language_code')

# Display the first few rows of the DataFrame
clickstream.show(5, truncate=False)

# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|referrer           |resource      |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
|other-internal     |Phantom_Thread|external     |21683      |
|other-empty        |Phantom_Thread|external     |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



### Task 7

Rename `referrer` and `resource` to `source_page` and `target_page`, respectively,

In [55]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream\
    .withColumnRenamed('referrer', 'source_page')\
    .withColumnRenamed('resource', 'target_page')
    
# Display the first few rows of the DataFrame
clickstream.show(15, truncate=False)

# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------------------+-------------+-------------+-----------+
|source_page        |target_page               |link_category|language_code|click_count|
+-------------------+--------------------------+-------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread            |link         |en           |43190      |
|other-internal     |Phantom_Thread            |external     |en           |21683      |
|other-empty        |Phantom_Thread            |external     |en           |169532     |
|90th_Academy_Awards|Phantom_Thread            |link         |en           |40449      |
|other-search       |Phantom_Thread            |external     |en           |536940     |
|other-search       |Tara_Grinstead_murder_case|external     |en           |30041      |
|other-search       |Yossi_Benayoun            |external     |en           |11045      |
|other-empty        |Parthiv_Patel             |external     |en           |11481      |
|other-search       |

## Task Group 3 - Querying Clickstream Data

### Task 8

Add the `clickstream` DataFrame as a temporary view named `clickstream` to make the data queryable with `sparkSession.sql()`

In [46]:
# Create a temporary view in the metadata for this `SparkSession` 
clickstream.createOrReplaceTempView('data')


### Task 9

Filter the dataset to entries with `Hanging_Gardens_of_Babylon` as the `target_page` and order the result by `click_count` using PySpark DataFrame methods.

In [47]:
# Filter and sort the DataFrame using PySpark DataFrame methods
clickstream.filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon').orderBy('click_count', ascending=False).show(10, truncate=False)


+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



### Task 10

Perform the same analysis as the previous exercise using a SQL query. 

In [48]:
# Filter and sort the DataFrame using SQL
query = """
    SELECT *
    FROM data
    WHERE target_page = 'Hanging_Gardens_of_Babylon'
    ORDER BY click_count DESC
"""

spark.sql(query).show(truncate=False)


+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



### Task 11

Calculate the sum of `click_count` grouped by `link_category` using PySpark DataFrame methods.

In [57]:
# Aggregate the DataFrame using PySpark DataFrame Methods 
clickstream.select(['link_category', 'click_count']).groupby('link_category').sum().orderBy('sum(click_count)', ascending=False).show(3, truncate=False)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|external     |3248677856      |
|link         |97805811        |
|other        |9338172         |
+-------------+----------------+



### Task 12

Perform the same analysis as the previous exercise using a SQL query.

In [58]:
# Aggregate the DataFrame using SQL
query = """
    SELECT link_category, SUM(click_count)
    FROM data
    GROUP BY link_category
    ORDER BY SUM(click_count) DESC
"""

# Run query
spark.sql(query).show(3, truncate=False)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|external     |3248677856      |
|link         |97805811        |
|other        |9338172         |
+-------------+----------------+



## Task Group 4 - Saving Results to Disk

### Task 13

Let's create a new DataFrame named `internal_clickstream` that only contains article pairs where `link_category` is `link`. Use `filter()` to select rows to a specific condition and `select()` to choose which columns to return from the query.

In [63]:
# Create a new DataFrame named `internal_clickstream`
internal_clickstream = clickstream.select(['source_page', 'target_page', 'click_count']).filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
print(internal_clickstream.take(4))

internal_clickstream.show(truncate=False)

[Row(source_page='Daniel_Day-Lewis', target_page='Phantom_Thread', click_count=43190), Row(source_page='90th_Academy_Awards', target_page='Phantom_Thread', click_count=40449), Row(source_page='Shinee', target_page='Kim_Jong-hyun_(singer)', click_count=24433), Row(source_page='Agnyaathavaasi', target_page='Anu_Emmanuel', click_count=15020)]
+----------------------------+----------------------------+-----------+
|source_page                 |target_page                 |click_count|
+----------------------------+----------------------------+-----------+
|Daniel_Day-Lewis            |Phantom_Thread              |43190      |
|90th_Academy_Awards         |Phantom_Thread              |40449      |
|Shinee                      |Kim_Jong-hyun_(singer)      |24433      |
|Agnyaathavaasi              |Anu_Emmanuel                |15020      |
|Naa_Peru_Surya              |Anu_Emmanuel                |12361      |
|Mariah_Carey                |Nick_Cannon                 |16214      |
|Kesha    

### Task 14

Using `DataFrame.write.csv()`, save the `internal_clickstream` DataFrame as CSV files in a directory called `./results/article_to_article_csv/`.

In [None]:
# Save the `internal_clickstream` DataFrame to a series of CSV files
internal_clickstream.write.csv('./cleaned_wiki_test_file', mode='overwrite')

### Task 15

Using `DataFrame.write.parquet()`, save the `internal_clickstream` DataFrame as parquet files in a directory called `./results/article_to_article_pq/`.

In [None]:
# Save the `internal_clickstream` DataFrame to a series of parquet files
internal_clickstream.write.parquet('./cleaned_wiki_test_file_parquet/', mode='overwrite')

                                                                                

### Task 16

Close the `SparkSession` and underlying `SparkContext`. What happens if you we call `clickstream.show()` after closing the `SparkSession`?

In [67]:
# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()
sc.stop()

In [68]:
# The SparkSession and sparkContext are stopped; the following line will throw an error:
clickstream.show()

Py4JJavaError: An error occurred while calling o316.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1654)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1639)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:102)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues(FileFormat.scala:138)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues$(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:346)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:548)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:575)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:498)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.GeneratedMethodAccessor131.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
