# 4. Read Data in HDFS Using Pyspark

After creating this notebook via AWS using our BigDataCluster. Using pyspark, I will now read the data I copied into HDFS in the previous step (found in PDF Document).

In [1]:
# To start Spark application
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1607300637474_0002,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fe3fba7cf50>

In [2]:
# Read in our data locally
df = spark.read.csv('s3://brainstation-dsft/eng_1M_1gram.csv', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1. Show Schema of Table in Pyspark

Using the `inferSchema` parameter I can infer the data types in each column. Since we set this parameter to None or False by default, then all columns are read in as strings.

In [3]:
# To print out the schema and data types
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- books: string (nullable = true)

We can see that we have a total of five columns all string type: 

    - token
    - year
    - frequency
    - pages
    - books

We can also get a list of these columns using .columns.

In [4]:
# To show list of columns
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['token', 'year', 'frequency', 'pages', 'books']

To get a better understanding of our data, lets take a closer look at it using the head command.

In [5]:
# To show the first few lines of our data
df.head(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(token='inGermany', year='1927', frequency='2', pages='2', books='2'), Row(token='inGermany', year='1929', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1930', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1933', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1934', frequency='1', pages='1', books='1')]

We can show this more clearly and in a more organized fashion by instead using .show() to get nicely formatted tabular data.

In [6]:
# To show data formatted tabularly
df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
|inGermany|1935|        1|    1|    1|
|inGermany|1938|        5|    5|    5|
|inGermany|1939|        1|    1|    1|
|inGermany|1940|        1|    1|    1|
|inGermany|1942|        2|    2|    2|
+---------+----+---------+-----+-----+
only showing top 10 rows

### 2. Display Total Number of Rows of Data

Taking a further look, we can also get the total count of rows for the whole dataframe using .count().

In [7]:
# To show total count of rows in dataframe
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

261823225

We can see that we have a total of 261,823,225 rows. That is a staggering number of rows and data. 

### 3. Create New DataFrame from a Query using Spark SQL

To create a new dataframe, first we will register the view using our current dataframe. Next we can then create our new dataframe and filter out only the rows where `token` is "data" by using the `spark.sql` module. 

In [8]:
# Register the view
df.createOrReplaceTempView('bigdatagoogle')

# Create new dataframe
sql_df = spark.sql('SELECT * FROM bigdatagoogle WHERE token = "data"')

# To show data formatted tabularly
sql_df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
| data|1584|       16|   14|    1|
| data|1614|        3|    2|    1|
| data|1627|        1|    1|    1|
| data|1631|       22|   18|    1|
| data|1637|        1|    1|    1|
| data|1638|        2|    2|    1|
| data|1640|        1|    1|    1|
| data|1642|        1|    1|    1|
| data|1644|        4|    4|    1|
| data|1647|        1|    1|    1|
+-----+----+---------+-----+-----+
only showing top 10 rows

### 4. Get Count of Number of Rows

To get a count of the total number of rows within our new dataframe. We can simply run the `.count()` function as previously except this time on our `sql_df` dataframe.

In [9]:
# To show total count of rows in new dataframe
sql_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

316

As we can see, our newly created `sql_df` dataframe has a total of 316 rows. This means that out of the previous 261,823,225 rows, only 316 rows had "data" in the token column.  

### 5. Write Filtered Data Back to Directory in Hadoop Filesystem from Spark

The below code will allow us to write the data from our new dataframe to our directory.

In [13]:
# Writing data to directory
sql_df.write.option("header", "true").csv('/user/hadoop/newdatatoken')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'path hdfs://ip-172-31-7-243.ca-central-1.compute.internal:8020/user/hadoop/newdatatoken already exists.;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 932, in csv
    self._jwrite.csv(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'path hdfs://ip-172-31-7-243.ca-central-1.compute.internal:8020/user/hadoop/newdatatoken already exists.;'

