# Chapter 5

## Structured Operations

Using the Spark Structured API to perform operations on the data

Lets start off by reading some data from `GCS`

In [4]:
%%bash
gsutil ls gs://reddys-data-for-experimenting/flight-data/json/

gs://reddys-data-for-experimenting/flight-data/json/2010-summary.json
gs://reddys-data-for-experimenting/flight-data/json/2011-summary.json
gs://reddys-data-for-experimenting/flight-data/json/2012-summary.json
gs://reddys-data-for-experimenting/flight-data/json/2013-summary.json
gs://reddys-data-for-experimenting/flight-data/json/2014-summary.json
gs://reddys-data-for-experimenting/flight-data/json/2015-summary.json


### Infering Schema automatically

In [16]:
df = spark.read.format("json").load("gs://reddys-data-for-experimenting/flight-data/json/2015-summary.json")

In [17]:
# Printing schema
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [18]:
df.head(5)

[Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Romania', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Croatia', count=1),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Ireland', count=344),
 Row(DEST_COUNTRY_NAME=u'Egypt', ORIGIN_COUNTRY_NAME=u'United States', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'India', count=62)]

### Manually adding the schema

This is the way to go for production workloads

In [19]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

manualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False, metadata={"PII": "true"})
])

manualSchemaDf = spark.read.format("json") \
    .schema(manualSchema) \
    .load("gs://reddys-data-for-experimenting/flight-data/json/2015-summary.json")

In [20]:
manualSchemaDf.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [21]:
manualSchemaDf.head(5)

[Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Romania', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Croatia', count=1),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Ireland', count=344),
 Row(DEST_COUNTRY_NAME=u'Egypt', ORIGIN_COUNTRY_NAME=u'United States', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'India', count=62)]

In [22]:
manualSchemaDf.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [25]:
manualSchemaDf.first()

Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Romania', count=15)

In [32]:
from pyspark.sql import Row

oneRow = Row("Hello", 1, True, 34.78)
oneRow[3]

34.78

In [33]:
manualSchemaDf.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(5)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
|    United States|            Ireland|
|            Egypt|      United States|
|    United States|              India|
+-----------------+-------------------+
only showing top 5 rows



In [35]:
manualSchemaDf.selectExpr(
    "count(distinct(DEST_COUNTRY_NAME)) as NO_OF_DESTINATIONS", 
    "avg(count) as AVG_NO_OF_TRIPS"
).show(2)

+------------------+---------------+
|NO_OF_DESTINATIONS|AVG_NO_OF_TRIPS|
+------------------+---------------+
|               132|    1770.765625|
+------------------+---------------+



### Creating a dataframe from rows

In [37]:
from pyspark.sql import Row

rows = [
    Row("Orgin Country 1", "Destination Country 1", 1),
    Row("Orgin Country 1", "Destination Country 1", 5),
    Row("Orgin Country 1", "Destination Country 1", 2),
    Row("Orgin Country 1", "Destination Country 1", 9)
]

schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False, metadata={"PII": "true"})
])

paralleizedRows = spark.sparkContext.parallelize(rows)
newDf = spark.createDataFrame(paralleizedRows, schema)

In [38]:
newDf.show()

+-----------------+--------------------+-----+
|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+-----------------+--------------------+-----+
|  Orgin Country 1|Destination Count...|    1|
|  Orgin Country 1|Destination Count...|    5|
|  Orgin Country 1|Destination Count...|    2|
|  Orgin Country 1|Destination Count...|    9|
+-----------------+--------------------+-----+



In [39]:
newDf.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = false)



### Dataframe Unions

In [40]:
df.count()

256

In [41]:
newDf.count()

4

In [45]:
unionDf = df.union(newDf)

In [46]:
unionDf.count()

260

### Sorting

In [50]:
from pyspark.sql.functions import col
unionDf.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [51]:
from pyspark.sql.functions import asc, desc
unionDf.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [52]:
spark.stop()
sc.stop()