# Codecademy: Practice with PySpark SQL

### Introduction

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master('local[*]')\
    .config('spark.driver.memory', '1g')\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

In [10]:
# The variable `spark` is a `pyspark.sql.session.SparkSession`
type(spark)

pyspark.sql.session.SparkSession

In [11]:
# The underlying `SparkContext` can be accessed from the `SparkSession`
type(spark.sparkContext)

pyspark.context.SparkContext

In [12]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

### Creating Spark DataFrames

In [14]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

sample_page_views  = spark.sparkContext.parallelize([
    ["en", "Statue_of_Liberty", "2022-01-01", 263],
    ["en", "Replicas_of_the_Statue_of_Liberty", "2022-01-01", 11],
    ["en", "Statue_of_Lucille_Ball" ,"2022-01-01", 6],
    ["en", "Statue_of_Liberty_National_Monument", "2022-01-01", 4],
    ["en", "Statue_of_Liberty_play"  ,"2022-01-01", 3],  
])

1. Create a DataFrame from `sample_page_views`.

In [16]:
## YOUR SOLUTION HERE ##
sample_page_views_df = sample_page_views.toDF(["language_code", "title", "date", "count"])

# show first 5 rows
sample_page_views_df.show(5, truncate = False)

+-------------+-----------------------------------+----------+-----+
|language_code|title                              |date      |count|
+-------------+-----------------------------------+----------+-----+
|en           |Statue_of_Liberty                  |2022-01-01|263  |
|en           |Replicas_of_the_Statue_of_Liberty  |2022-01-01|11   |
|en           |Statue_of_Lucille_Ball             |2022-01-01|6    |
|en           |Statue_of_Liberty_National_Monument|2022-01-01|4    |
|en           |Statue_of_Liberty_play             |2022-01-01|3    |
+-------------+-----------------------------------+----------+-----+



2. Access the RDD underlying `sample_page_views_df`.

In [17]:
## YOUR SOLUTION HERE ##
sample_page_views_rdd_restored = sample_page_views_df.rdd

# show restored RDD
sample_page_views_rdd_restored.collect()

[Row(language_code='en', title='Statue_of_Liberty', date='2022-01-01', count=263),
 Row(language_code='en', title='Replicas_of_the_Statue_of_Liberty', date='2022-01-01', count=11),
 Row(language_code='en', title='Statue_of_Lucille_Ball', date='2022-01-01', count=6),
 Row(language_code='en', title='Statue_of_Liberty_National_Monument', date='2022-01-01', count=4),
 Row(language_code='en', title='Statue_of_Liberty_play', date='2022-01-01', count=3)]

### Spark DataFrames from External Sources

In [18]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

1. Read in the CSV file without any specific read options specified and show the top 10 rows.

In [19]:
## YOUR SOLUTION HERE ##
wiki_uniq_df = spark\
    .read\
    .csv('wiki_uniq_march_2022.csv')

# show the first 10 rows
wiki_uniq_df.show(10, truncate = False)

+------------------+-------------------+-----------------+-------------------+
|_c0               |_c1                |_c2              |_c3                |
+------------------+-------------------+-----------------+-------------------+
|domain            |uniq_human_visitors|uniq_bot_visitors|total_visitor_count|
|en.m.wikipedia.org|33261399           |8400247          |41661646           |
|en.wikipedia.org  |17009339           |4851741          |21861080           |
|es.m.wikipedia.org|5668575            |1977289          |7645864            |
|ru.m.wikipedia.org|5816762            |1367179          |7183941            |
|ja.m.wikipedia.org|5396108            |1325212          |6721320            |
|de.m.wikipedia.org|4439596            |853251           |5292847            |
|fr.m.wikipedia.org|3798528            |904567           |4703095            |
|ru.wikipedia.org  |2852296            |687501           |3539797            |
|es.wikipedia.org  |2460489            |962516      

2. Read in the CSV file with an option to treat the first row as a header and show the top 10 rows.

In [21]:
## YOUR SOLUTION HERE ##
wiki_uniq_w_header_df = spark.read\
    .option('header', True)\
    .csv('wiki_uniq_march_2022.csv')

# show the first 10 rows
wiki_uniq_w_header_df.show(10, truncate = False)

+------------------+-------------------+-----------------+-------------------+
|domain            |uniq_human_visitors|uniq_bot_visitors|total_visitor_count|
+------------------+-------------------+-----------------+-------------------+
|en.m.wikipedia.org|33261399           |8400247          |41661646           |
|en.wikipedia.org  |17009339           |4851741          |21861080           |
|es.m.wikipedia.org|5668575            |1977289          |7645864            |
|ru.m.wikipedia.org|5816762            |1367179          |7183941            |
|ja.m.wikipedia.org|5396108            |1325212          |6721320            |
|de.m.wikipedia.org|4439596            |853251           |5292847            |
|fr.m.wikipedia.org|3798528            |904567           |4703095            |
|ru.wikipedia.org  |2852296            |687501           |3539797            |
|es.wikipedia.org  |2460489            |962516           |3423005            |
|it.m.wikipedia.org|2806943            |566876      

3. Check the data types of `wiki_uniq_w_header_df`.

In [22]:
# show the data types
wiki_uniq_w_header_df.dtypes

[('domain', 'string'),
 ('uniq_human_visitors', 'string'),
 ('uniq_bot_visitors', 'string'),
 ('total_visitor_count', 'string')]

4. Read in the CSV file with an option to treat the first row as a header and infer the schema. Then check the data types of `wiki_uniq_w_schema_df`.

In [23]:
## YOUR SOLUTION HERE ##
wiki_uniq_w_schema_df = spark.read\
    .option('header', True)\
    .option('inferSchema', True)\
    .csv('wiki_uniq_march_2022.csv')

# show the data types
wiki_uniq_w_schema_df.dtypes

[('domain', 'string'),
 ('uniq_human_visitors', 'int'),
 ('uniq_bot_visitors', 'int'),
 ('total_visitor_count', 'int')]

### Inspecting and Cleaning Data With PySpark

In [24]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("learning_spark_sql") \
    .getOrCreate()

# read in the Wikipedia unique visitors dataset
uniq_views_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022.csv")

1. Print the DataFrame schema for `uniq_views_df`.

In [25]:
## YOUR SOLUTION HERE ##
uniq_views_df.printSchema()

root
 |-- domain: string (nullable = true)
 |-- uniq_human_visitors: integer (nullable = true)
 |-- uniq_bot_visitors: integer (nullable = true)
 |-- total_visitor_count: integer (nullable = true)



2. Show a description of the data for `uniq_views_df`.

In [29]:
## YOUR SOLUTION HERE ##
uniq_views_df_desc = uniq_views_df.describe()

# show summary
uniq_views_df_desc.show()

+-------+----------------+-------------------+-----------------+-------------------+
|summary|          domain|uniq_human_visitors|uniq_bot_visitors|total_visitor_count|
+-------+----------------+-------------------+-----------------+-------------------+
|  count|             760|                760|              760|                760|
|   mean|            NULL|  155413.0394736842| 51431.0552631579| 206844.09473684212|
| stddev|            NULL| 1435327.5409314982| 376318.441663093| 1809320.9789242456|
|    min|aa.wikibooks.org|                  0|              170|               1005|
|    max|zu.wikipedia.org|           33261399|          8400247|           41661646|
+-------+----------------+-------------------+-----------------+-------------------+



3. Drop the columns `total_visitor_count` and `uniq_bot_visitors`.

In [30]:
## YOUR SOLUTION HERE ##
uniq_counts_human_df = uniq_views_df.drop('total_visitor_count', 'uniq_bot_visitors')

# show the first 5 rows
uniq_counts_human_df.show(5)

+------------------+-------------------+
|            domain|uniq_human_visitors|
+------------------+-------------------+
|en.m.wikipedia.org|           33261399|
|  en.wikipedia.org|           17009339|
|es.m.wikipedia.org|            5668575|
|ru.m.wikipedia.org|            5816762|
|ja.m.wikipedia.org|            5396108|
+------------------+-------------------+
only showing top 5 rows



4. Rename `uniq_human_visitors` to `unique_site_visitors`.

In [31]:
## YOUR SOLUTION HERE ##
uniq_counts_final_df = uniq_counts_human_df.withColumnRenamed('uniq_human_visitors', 'unique_site_visitors')

# show the first 5 rows
uniq_counts_final_df.show(5)

+------------------+--------------------+
|            domain|unique_site_visitors|
+------------------+--------------------+
|en.m.wikipedia.org|            33261399|
|  en.wikipedia.org|            17009339|
|es.m.wikipedia.org|             5668575|
|ru.m.wikipedia.org|             5816762|
|ja.m.wikipedia.org|             5396108|
+------------------+--------------------+
only showing top 5 rows



### Querying PySpark DataFrames

In [34]:
from pyspark.sql import SparkSession

# Create a New SparkSession
spark = SparkSession \
    .builder \
    .appName("learning_spark_sql") \
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022_w_site_type.csv")

1. Filter the DataFrame to sites with `language_code` is `"ar"`.

In [35]:
## YOUR SOLUTION HERE ##
ar_site_visitors = wiki_uniq_df\
    .filter(wiki_uniq_df.language_code == 'ar')

# show the DataFrame
ar_site_visitors.show()

+--------------------+-------------------+-----------------+-------------------+-------------+-----------+
|              domain|uniq_human_visitors|uniq_bot_visitors|total_visitor_count|language_code|  site_type|
+--------------------+-------------------+-----------------+-------------------+-------------+-----------+
|  ar.m.wikipedia.org|            1644253|           750620|            2394873|           ar|  wikipedia|
|    ar.wikipedia.org|             212695|            97700|             310395|           ar|  wikipedia|
| ar.m.wikisource.org|              56124|            52885|             109009|           ar| wikisource|
|   ar.wikisource.org|               2134|             4355|               6489|           ar| wikisource|
|  ar.m.wikiquote.org|                776|             3511|               4287|           ar|  wikiquote|
|   ar.wiktionary.org|                262|             2335|               2597|           ar| wiktionary|
| ar.m.wiktionary.org|               

2. Filter the DataFrame to sites with `language_code` is `"ar"` and keep only the columns `domain` and `uniq_human_visitors`. 

In [36]:
## YOUR SOLUTION HERE ##
ar_visitors_slim = wiki_uniq_df\
    .select(['domain', 'uniq_human_visitors'])\
    .filter(wiki_uniq_df.language_code == 'ar')

# show the DataFrame
ar_visitors_slim.show()

+--------------------+-------------------+
|              domain|uniq_human_visitors|
+--------------------+-------------------+
|  ar.m.wikipedia.org|            1644253|
|    ar.wikipedia.org|             212695|
| ar.m.wikisource.org|              56124|
|   ar.wikisource.org|               2134|
|  ar.m.wikiquote.org|                776|
|   ar.wiktionary.org|                262|
| ar.m.wiktionary.org|                448|
|ar.m.wikiversity.org|                389|
|  ar.m.wikibooks.org|                378|
+--------------------+-------------------+



3. Calculate the sum of all `uniq_human_visitors` grouped by `site_type` and ordered from highest to lowest page views.

In [37]:
## YOUR SOLUTION HERE ##
top_visitors_site_type = wiki_uniq_df\
    .select(['site_type', 'uniq_human_visitors'])\
    .groupBy('site_type')\
    .sum()\
    .orderBy('sum(uniq_human_visitors)', ascending = False)

# show the DataFrame
top_visitors_site_type.show()

+-----------+------------------------+
|  site_type|sum(uniq_human_visitors)|
+-----------+------------------------+
|  wikipedia|               116527479|
| wiktionary|                  892193|
|  wikimedia|                  312995|
| wikisource|                  172179|
|   wikidata|                   69744|
|  wikibooks|                   54680|
|  wikiquote|                   38048|
| wikivoyage|                   14648|
|       wiki|                   13067|
|wikiversity|                   12548|
|   wikinews|                    5578|
|   wikitech|                     751|
+-----------+------------------------+



### Querying PySpark with SQL

In [38]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession \
    .builder \
    .appName("learning_spark_sql") \
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022_w_site_type.csv")

# Create a temporary view with the DataFrame
wiki_uniq_df\
    .createOrReplaceTempView('uniq_visitors_march')

1. Filter the DataFrame to sites where `language_code` is `"ar"`.

In [39]:
## YOUR SOLUTION HERE ##
ar_site_visitors_qry = """
    SELECT * 
    FROM uniq_visitors_march
    WHERE language_code = 'ar';
"""
    
# show the DataFrame
spark.sql(ar_site_visitors_qry)\
    .show(truncate = False)

+--------------------+-------------------+-----------------+-------------------+-------------+-----------+
|domain              |uniq_human_visitors|uniq_bot_visitors|total_visitor_count|language_code|site_type  |
+--------------------+-------------------+-----------------+-------------------+-------------+-----------+
|ar.m.wikipedia.org  |1644253            |750620           |2394873            |ar           |wikipedia  |
|ar.wikipedia.org    |212695             |97700            |310395             |ar           |wikipedia  |
|ar.m.wikisource.org |56124              |52885            |109009             |ar           |wikisource |
|ar.wikisource.org   |2134               |4355             |6489               |ar           |wikisource |
|ar.m.wikiquote.org  |776                |3511             |4287               |ar           |wikiquote  |
|ar.wiktionary.org   |262                |2335             |2597               |ar           |wiktionary |
|ar.m.wiktionary.org |448            

2. Filter the DataFrame to sites with `language_code` is `"ar"` and keep only the columns `domain` and `uniq_human_visitors`.

In [40]:
## YOUR SOLUTION HERE ##
ar_site_visitors_slim_qry = """
    SELECT domain, uniq_human_visitors
    FROM uniq_visitors_march
    WHERE language_code = 'ar';
"""

# show the DataFrame
spark.sql(ar_site_visitors_slim_qry)\
    .show(truncate = False)

+--------------------+-------------------+
|domain              |uniq_human_visitors|
+--------------------+-------------------+
|ar.m.wikipedia.org  |1644253            |
|ar.wikipedia.org    |212695             |
|ar.m.wikisource.org |56124              |
|ar.wikisource.org   |2134               |
|ar.m.wikiquote.org  |776                |
|ar.wiktionary.org   |262                |
|ar.m.wiktionary.org |448                |
|ar.m.wikiversity.org|389                |
|ar.m.wikibooks.org  |378                |
+--------------------+-------------------+



3. Calculate the sum of all `uniq_human_visitors` grouped by `site_type` and ordered from highest to lowest `uniq_human_visitors`.

In [41]:
## YOUR SOLUTION HERE ##
site_top_type_qry = """
    SELECT site_type, SUM(uniq_human_visitors)
    FROM uniq_visitors_march
    GROUP BY site_type
    ORDER BY SUM(uniq_human_visitors) DESC;
"""

# show the DataFrame
spark.sql(site_top_type_qry)\
    .show(truncate = False)

+-----------+------------------------+
|site_type  |sum(uniq_human_visitors)|
+-----------+------------------------+
|wikipedia  |116527479               |
|wiktionary |892193                  |
|wikimedia  |312995                  |
|wikisource |172179                  |
|wikidata   |69744                   |
|wikibooks  |54680                   |
|wikiquote  |38048                   |
|wikivoyage |14648                   |
|wiki       |13067                   |
|wikiversity|12548                   |
|wikinews   |5578                    |
|wikitech   |751                     |
+-----------+------------------------+



### Saving PySpark DataFrames

In [42]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022.csv")

1. Run the code to create a new DataFrame with only `domain` and `uniq_human_visitors`.

In [43]:
# select only domain and uniq_human visitors
uniq_human_visitors_df = wiki_uniq_df\
    .select('domain', 'uniq_human_visitors')

# show the new DataFrame
uniq_human_visitors_df.show()

+------------------+-------------------+
|            domain|uniq_human_visitors|
+------------------+-------------------+
|en.m.wikipedia.org|           33261399|
|  en.wikipedia.org|           17009339|
|es.m.wikipedia.org|            5668575|
|ru.m.wikipedia.org|            5816762|
|ja.m.wikipedia.org|            5396108|
|de.m.wikipedia.org|            4439596|
|fr.m.wikipedia.org|            3798528|
|  ru.wikipedia.org|            2852296|
|  es.wikipedia.org|            2460489|
|it.m.wikipedia.org|            2806943|
|  de.wikipedia.org|            2252670|
|  ja.wikipedia.org|            2128471|
|  fr.wikipedia.org|            1839196|
|zh.m.wikipedia.org|            2123391|
|ar.m.wikipedia.org|            1644253|
|pt.m.wikipedia.org|            1471752|
|pl.m.wikipedia.org|            1410339|
|fa.m.wikipedia.org|            1194940|
|  zh.wikipedia.org|            1088755|
|tr.m.wikipedia.org|             908573|
+------------------+-------------------+
only showing top

2. Save the new DataFrame as CSV files.

In [46]:
## YOUR SOLUTION HERE ##
uniq_human_visitors_df.write.csv('./results/csv/uniq_human_visitors/', mode = 'overwrite')

3. Save the new DataFrame as Parquet files.

In [48]:
## YOUR SOLUTION HERE ##
uniq_human_visitors_df.write.parquet('./results/pq/uniq_human_visitors/', mode = 'overwrite')