**Unit 4 Deliverable 1**
# Big Data Wrangling with Google nGrams (Step 4)
**Author:** Raghad Ibrahim  
**Date:** April 30, 2024  

> ## Table of Contents:
>> [Initialization](#1)
>>
>> [Installing and Importing Necessary Packages](#2)
>>
>> [A First Look At The Data](#3)
>>
>> [Spark SQL](#4)
>>
>> [Writing to HDFS](#5)  

## Initialization <a class="anchor" id="1"></a>

First I will initialize the `spark` application - make sure you have the `PySpark` environment selected, otherwise the Spark session need to be initialized manually.

Once the session is initialized, we can may proceed.

In [2]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1714468804888_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f55959ef650>

In [3]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1714468804888_0004,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1714468804888_0004,pyspark,idle,Link,Link,,✔


## Installing and Importing Necessary Packages <a class="anchor" id="2"></a>

We can also check the available python packages and potentially add new ones. First let's check what packages we do have using `sc.list_packages()` - we want to make sure that `pandas` and `matplotlib` are both available for us. 

In [4]:
sc.list_packages()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- ----------
aws-cfn-bootstrap          2.0
beautifulsoup4             4.9.3
boto                       2.49.0
click                      8.1.3
docutils                   0.14
jmespath                   1.0.1
joblib                     1.2.0
lockfile                   0.11.0
lxml                       4.9.2
mysqlclient                1.4.2
nltk                       3.8
nose                       1.3.4
numpy                      1.20.0
pip                        20.2.2
py-dateutil                2.2
pystache                   0.5.4
python-daemon              2.2.3
python37-sagemaker-pyspark 1.4.2
pytz                       2022.7
PyYAML                     5.4.1
regex                      2021.11.10
setuptools                 28.8.0
simplejson                 3.2.0
six                        1.13.0
tqdm                       4.64.1
wheel                      0.29.0
windmill                   1.6


Seems like `pandas` and `matplotlib` are nowhere to be seen so we can proceed with installing them as demonstrated below:

In [5]:
# install data science & plotting packages

sc.install_pypi_package("pandas == 1.0.5") 
sc.install_pypi_package("matplotlib == 3.1.1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.0.5
  Downloading pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl (10.1 MB)
Collecting python-dateutil>=2.6.1
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.9.0.post0

Collecting matplotlib==3.1.1
  Downloading matplotlib-3.1.1-cp37-cp37m-manylinux1_x86_64.whl (13.1 MB)
Collecting cycler>=0.10
  Downloading cycler-0.11.0-py3-none-any.whl (6.4 kB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Downloading pyparsing-3.1.2-py3-none-any.whl (103 kB)
Collecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.4.5-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.1 MB)
Collecting typing-extensions; python_version < "3.8"
  Downloading typing_extensions-4.7.1-py3-none-any.whl (33 kB)
Installing collected packages: cycler, pyparsing, typing-extensions, kiwisolver, matplotlib
Successfully installed cycler-0.11.0 kiwisolver-1.4.5 matpl

Now that packages are installed, we can import them into the notebook as per usual.

In [7]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## A First Look At The Data  <a class="anchor" id="3"></a>

Spark can read data locally from files, databases, large datasets from HDFS, or even from S3 buckets! In this case we will be reading the data from HDFS where we have it.

In [23]:
# read data as dataframe

df = spark.read.csv('/user/hadoop/eng_1M_1gram', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We can print out the schema and data types in human-readable format using `printSchema()`:

In [14]:
# print out schema and data types

df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- books: string (nullable = true)

And now let's take a first look at the data. Or more specifically, the top $10$ rows of the data.

In [17]:
# show top 10 rows

df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
|inGermany|1935|        1|    1|    1|
|inGermany|1938|        5|    5|    5|
|inGermany|1939|        1|    1|    1|
|inGermany|1940|        1|    1|    1|
|inGermany|1942|        2|    2|    2|
+---------+----+---------+-----+-----+
only showing top 10 rows

It is apparent that the dataset has $5$ columns. To check the number of rows in this dataset, we can use `df.count()`:

In [None]:
df.count() # i accidentally reran it after exiting

**This dataset has $261,823,225$ rows!!**

## Spark SQL  <a class="anchor" id="4"></a>

Now we will create a new DataFrame from a query using Spark SQL, filtering to include only the rows where the token is "data". 

Using SQL requires the `CreateorReplaceTempView` function, which registers the data as a view in the Spark session. We can then query against that view with SQL, with the view name being the table name:

In [24]:
df.createOrReplaceTempView("temp_df")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

To display the first $5$ rows of the data, we can simply type out an SQL command like this:

In [25]:
# query to display top 5 rows

spark.sql("SELECT * FROM temp_df limit 5").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
+---------+----+---------+-----+-----+

We can also filter the rows with books that have the word "data" in them, i.e. `token == data`:

In [26]:
# query to filter the rows where token == data

spark.sql("SELECT * FROM temp_df WHERE token == 'data';").show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
| data|1584|       16|   14|    1|
| data|1614|        3|    2|    1|
| data|1627|        1|    1|    1|
| data|1631|       22|   18|    1|
| data|1637|        1|    1|    1|
| data|1638|        2|    2|    1|
| data|1640|        1|    1|    1|
| data|1642|        1|    1|    1|
| data|1644|        4|    4|    1|
| data|1647|        1|    1|    1|
+-----+----+---------+-----+-----+
only showing top 10 rows

For a statistical description of the dataset, we can use `.describe().show()`:

In [27]:
# describing the rows where token is equal to data

spark.sql("SELECT * FROM temp_df WHERE token == 'data';").describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+------------------+-----------------+------------------+------------------+
|summary|token|              year|        frequency|             pages|             books|
+-------+-----+------------------+-----------------+------------------+------------------+
|  count|  316|               316|              316|               316|               316|
|   mean| null|1847.5696202531647|38555.99367088608|21711.041139240508| 1493.110759493671|
| stddev| null| 96.87438222401165| 69212.3664179185| 34901.79774004759|1560.0408024002788|
|    min| data|              1584|                1|                 1|                 1|
|    max| data|              2008|            98764|             99110|               955|
+-------+-----+------------------+-----------------+------------------+------------------+

We have $316$ rows which represents the number of distinct years in which books with the word "data" were published. The `min` and `max` for the `year` column are $1584$ and $2008$ respectively, in other words the word "data" first appears in books published in the year $1584$. The highest number of occurrences of the word in a single year was $98,764$ times.

Now that we have filtered the rows where token is equal to data, let us save it into new dataframe so that we don't have to filter it again. The reason we pass the `header = True` parameter is to prevent a duplicate header. 

In [29]:
# saving in a dataframe

filtered_data=spark.sql("SELECT * FROM temp_df WHERE token == 'data';")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Writing to HDFS <a class="anchor" id="5"></a>

Finally, we will write the filtered data back to a directory in the HDFS from Spark using `df.write.csv()`. 

In [30]:
# writing to HDFS using `write.csv()`

filtered_data.write.csv('/user/hadoop/filtered_data', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<hr>

Refer to the report for the next steps.