# Big Data Wrangling With Google Books Ngrams

## Table of Contents
- [Big Data Wrangling With Google Books Ngrams](#big-data-wrangling-with-google-books-ngrams)
  - [Introduction  ](#introduction--)
  - [Setup Spark and import any packages needed on the head node.](#setup-spark-and-import-any-packages-needed-on-the-head-node.)
  - [4. Using pyspark, read the data you copied into HDFS in Step 3. Once you have created a pyspark DataFrame, complete the subparts of the question:](#4.-using-pyspark,-read-the-data-you-copied-into-hdfs-in-step-3.-once-you-have-created-a-pyspark-dataframe,-complete-the-subparts-of-the-question:)
  - [4.a. Describe the dataset (examples include size, shape, schema) in pyspark](#4.a.-describe-the-dataset-(examples-include-size,-shape,-schema)-in-pyspark)
  - [4.b Create a new DataFrame from a query using Spark SQL, filtering to include only the rows where the token is "data" and describe the new dataset](#4.b-create-a-new-dataframe-from-a-query-using-spark-sql,-filtering-to-include-only-the-rows-where-the-token-is-"data"-and-describe-the-new-dataset)
    - [Method 1 - Using filter](#method-1---using-filter)
    - [Method 2 - Using Spark SQL](#method-2---using-spark-sql)
  - [4.c. Write the filtered data back to a directory in the HDFS from Spark using df.write.csv(). Be sure to pass the header=True parameter and examine the contents of what you've written.](#4.c.-write-the-filtered-data-back-to-a-directory-in-the-hdfs-from-spark-using-df.write.csv().-be-sure-to-pass-the-header=true-parameter-and-examine-the-contents-of-what-you've-written.)
  - [5. Collect the contents of the directory into a single file on the local drive of the head node using getmerge and move this file to your local machine / laptop.](#5.-collect-the-contents-of-the-directory-into-a-single-file-on-the-local-drive-of-the-head-node-using-getmerge-and-move-this-file-to-your-local-machine-/-laptop.)


## Introduction  

This notebook was run on the remote EMR Spark AWS cluster and addresses question 4 and 5 of the deliverable.

Before starting this notebook, we did the steps for questions 1 to 3.

1. Spin up a new EMR cluster on AWS for using Spark and EMR notebooks
2. Connect to the head node of the cluster using SSH.
3. Copy the data folder from the S3 bucket directly into a directory on the Hadoop File System (HDFS) named /user/hadoop/eng_1M_1gram.

## Setup Spark and import any packages needed on the head node.

In [3]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1719338465640_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1719338465640_0003,pyspark,idle,Link,Link,,✔


In [4]:
spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f7cb0654510>

In [5]:
sc.list_packages()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- ----------
aws-cfn-bootstrap          2.0
beautifulsoup4             4.9.3
boto                       2.49.0
click                      8.1.3
docutils                   0.14
jmespath                   1.0.1
joblib                     1.2.0
lockfile                   0.11.0
lxml                       4.9.2
mysqlclient                1.4.2
nltk                       3.8
nose                       1.3.4
numpy                      1.20.0
pip                        20.2.2
py-dateutil                2.2
pystache                   0.5.4
python-daemon              2.2.3
python37-sagemaker-pyspark 1.4.2
pytz                       2022.7
PyYAML                     5.4.1
regex                      2021.11.10
setuptools                 28.8.0
simplejson                 3.2.0
six                        1.13.0
tqdm                       4.64.1
wheel                      0.29.0
windmill                   1.6


In [6]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1719338465640_0003,pyspark,idle,Link,Link,,✔


In [7]:
# install data science & plotting packages
# this is the spark version of pip

sc.install_pypi_package("pandas==1.0.5")
sc.install_pypi_package("matplotlib==3.1.1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.0.5
  Downloading pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl (10.1 MB)
Collecting python-dateutil>=2.6.1
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.9.0.post0

Collecting matplotlib==3.1.1
  Downloading matplotlib-3.1.1-cp37-cp37m-manylinux1_x86_64.whl (13.1 MB)
Collecting cycler>=0.10
  Downloading cycler-0.11.0-py3-none-any.whl (6.4 kB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Downloading pyparsing-3.1.2-py3-none-any.whl (103 kB)
Collecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.4.5-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.1 MB)
Collecting typing-extensions; python_version < "3.8"
  Downloading typing_extensions-4.7.1-py3-none-any.whl (33 kB)
Installing collected packages: cycler, pyparsing, typing-extensions, kiwisolver, matplotlib
Successfully installed cycler-0.11.0 kiwisolver-1.4.5 matpl

In [9]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
# only on head node

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let us answer question 4 now.

## 4. Using pyspark, read the data you copied into HDFS in Step 3. Once you have created a pyspark DataFrame, complete the subparts of the question:

In [10]:
df = spark.read.csv('/user/hadoop/eng_1M_1gram',
                    header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- books: string (nullable = true)

## 4.a. Describe the dataset (examples include size, shape, schema) in pyspark

In [12]:
df.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['token', 'year', 'frequency', 'pages', 'books']

In [13]:
df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(token='inGermany', year='1927', frequency='2', pages='2', books='2'), Row(token='inGermany', year='1929', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1930', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1933', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1934', frequency='1', pages='1', books='1')]

In [14]:
df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
|inGermany|1935|        1|    1|    1|
|inGermany|1938|        5|    5|    5|
|inGermany|1939|        1|    1|    1|
|inGermany|1940|        1|    1|    1|
|inGermany|1942|        2|    2|    2|
+---------+----+---------+-----+-----+
only showing top 10 rows

In [16]:
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

261823225

The dataset has 261823225 rows (~261 million) and 5 columns.

## 4.b Create a new DataFrame from a query using Spark SQL, filtering to include only the rows where the token is "data" and describe the new dataset

In [17]:
df.select('token').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3223095

In [18]:
df.filter(df['token'] == "inData").show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
+-----+----+---------+-----+-----+

### Method 1 - Using filter

In [19]:
df.filter(df['token'] == "data").show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
| data|1584|       16|   14|    1|
| data|1614|        3|    2|    1|
+-----+----+---------+-----+-----+
only showing top 2 rows

In [20]:
df_data = df.filter(df['token'] == "data")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- books: string (nullable = true)

In [22]:
df_data.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['token', 'year', 'frequency', 'pages', 'books']

In [24]:
df_data.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
| data|1584|       16|   14|    1|
| data|1614|        3|    2|    1|
| data|1627|        1|    1|    1|
| data|1631|       22|   18|    1|
| data|1637|        1|    1|    1|
| data|1638|        2|    2|    1|
| data|1640|        1|    1|    1|
| data|1642|        1|    1|    1|
| data|1644|        4|    4|    1|
| data|1647|        1|    1|    1|
+-----+----+---------+-----+-----+
only showing top 10 rows

In [25]:
df_data.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

316

### Method 2 - Using Spark SQL

In [26]:
df.createOrReplaceTempView("ngrams")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
spark.sql("SELECT * FROM ngrams WHERE token='data'").show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
| data|1584|       16|   14|    1|
| data|1614|        3|    2|    1|
| data|1627|        1|    1|    1|
| data|1631|       22|   18|    1|
| data|1637|        1|    1|    1|
| data|1638|        2|    2|    1|
| data|1640|        1|    1|    1|
| data|1642|        1|    1|    1|
| data|1644|        4|    4|    1|
| data|1647|        1|    1|    1|
+-----+----+---------+-----+-----+
only showing top 10 rows

In [28]:
df_sql = spark.sql("SELECT * FROM ngrams WHERE token='data'")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
df_sql.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

316

The filtered dataset has 316 rows.

In [30]:
df_sql.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- books: string (nullable = true)

In [31]:
df_sql.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['token', 'year', 'frequency', 'pages', 'books']

## 4.c. Write the filtered data back to a directory in the HDFS from Spark using df.write.csv(). Be sure to pass the header=True parameter and examine the contents of what you've written.

In [33]:
df_sql.write.csv("ngram_data.csv",header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
# double checking that the csv was written properly
df_test = spark.read.csv("/user/livy/ngram_data.csv", header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
df_test.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
| data|1584|       16|   14|    1|
| data|1614|        3|    2|    1|
| data|1627|        1|    1|    1|
| data|1631|       22|   18|    1|
| data|1637|        1|    1|    1|
| data|1638|        2|    2|    1|
| data|1640|        1|    1|    1|
| data|1642|        1|    1|    1|
| data|1644|        4|    4|    1|
| data|1647|        1|    1|    1|
+-----+----+---------+-----+-----+
only showing top 10 rows

In [36]:
df_test.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

316

## 5. Collect the contents of the directory into a single file on the local drive of the head node using getmerge and move this file to your local machine / laptop.

I did the following steps to do this:
My file name was ngram_data.csv
1. hadoop fs -ls /user/livy to verify that the file showed up there
2. hadoop fs -getmerge /user/livy/ngram_data.csv ngramLocal.csv to merge the csv and save to the local directory on the cluster
3. sudo cp ngramLocal.csv  /mnt/var/lib/jupyter/home/jovyan/ to move it to a location I can download it from
4. Download ngramLocal.csv by going to https://localhost:9995 and selecting the file and hitting Download
5. Verifying that the file had downloaded onto my laptop.
6. Loaded it into a notebook to make sure it showed up properly.