# Big Data Wrangling With Google Books Ngrams

We are using pyspark to read the data from our Hadoop File System (HDFS). We copied the data folder from the S3 bucket directly into a directory on the Hadoop File System (HDFS) named /user/hadoop/eng_1M_1gram.
Using pyspark we will try to read the data, do some queries on the data.

In [1]:
#starting the spark application
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1675629431941_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fd402472550>

## Loading the Dataset from HDFS

In [2]:
#reading the data from HDFC and saving it in dataframe
df= spark.read.csv('s3://brainstation-dsft/eng_1M_1gram.csv', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Understanding the data 

In [3]:
#understanding the data schema
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- books: string (nullable = true)

In [5]:
#The columns we have
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['token', 'year', 'frequency', 'pages', 'books']

In [6]:
#Reading top 10 rows
df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
|inGermany|1935|        1|    1|    1|
|inGermany|1938|        5|    5|    5|
|inGermany|1939|        1|    1|    1|
|inGermany|1940|        1|    1|    1|
|inGermany|1942|        2|    2|    2|
+---------+----+---------+-----+-----+
only showing top 10 rows

### Data dictionary

`Token` : String columns </br>
`year`: record of the year numeric </br>
`frequncy`: numeric </br>
`pages`: numeric</br>
`books`: numeric</br>

In [7]:
#Number of rows
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

261823225

In [8]:
print(f"We have {df.count()} number of records in Google Ngram big data saved in s3 bucket")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We have 261823225 number of records in Google Ngram big data saved in s3 bucket

In [9]:
#understanding the frequency column
df.describe('frequency').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|        frequency|
+-------+-----------------+
|  count|        261822832|
|   mean|396.6344411924442|
| stddev|47883.07879567001|
|    min|                1|
|    max|        frequency|
+-------+-----------------+

## Create a new DataFrame from a query using Spark SQL, filtering to include only the rows where the token is "data" and describe the new dataset.

In [10]:
#create a new dataframe 
df.createOrReplaceTempView("gn_gram")
spark.sql("SELECT * FROM gn_gram limit 5").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
+---------+----+---------+-----+-----+

In [11]:
#query to filter the rows where token is data
spark.sql("SELECT * FROM gn_gram WHERE token == 'data';").show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
| data|1584|       16|   14|    1|
| data|1614|        3|    2|    1|
| data|1627|        1|    1|    1|
| data|1631|       22|   18|    1|
| data|1637|        1|    1|    1|
| data|1638|        2|    2|    1|
| data|1640|        1|    1|    1|
| data|1642|        1|    1|    1|
| data|1644|        4|    4|    1|
| data|1647|        1|    1|    1|
+-----+----+---------+-----+-----+
only showing top 10 rows

In [12]:
#describing the rows where token is equal to data
spark.sql("SELECT * FROM gn_gram WHERE token == 'data';").describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+------------------+-----------------+------------------+------------------+
|summary|token|              year|        frequency|             pages|             books|
+-------+-----+------------------+-----------------+------------------+------------------+
|  count|  316|               316|              316|               316|               316|
|   mean| null|1847.5696202531647|38555.99367088608|21711.041139240508| 1493.110759493671|
| stddev| null| 96.87438222401165| 69212.3664179185| 34901.79774004759|1560.0408024002788|
|    min| data|              1584|                1|                 1|                 1|
|    max| data|              2008|            98764|             99110|               955|
+-------+-----+------------------+-----------------+------------------+------------------+

We have a total of 316 rows that are meeting the filtering criteria of token equals to data.
Upon describing the filtered rows, we can see the minimum, maximum, mean and standard deviation for all the numeric columns.

Now we have filtered the rows where token is equal to data, let us save it into new dataframe so that we don't have to filter it again.

In [13]:
#saving in a dataframe
df_new=spark.sql("SELECT * FROM gn_gram WHERE token == 'data';")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write the filtered data back to a directory in the HDFS

In [20]:
#writing to hdfs using write.csv and ensuring that its header is also saved as we have it here.
df_new.write.csv('hdfs:///user/hadoop/spark_output', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We need to use hdfs:/// to be considered a fullly qualified syntax.

Using write.csv() command we have successfully saved the file to HDFS.

In [18]:
# df.write.option("header",True).csv("hdfs://user/hadoop/spark_output")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…