## DataFrames et Spark SQL




In [1]:
# create entry points to spark
try:
    sc.stop()
except:
    pass

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Spark SQL") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

sc = spark.sparkContext
sc

### Generate your own DataFrame
Instead of accessing the file system, let's create a DataFrame by generating the data.  In this case, we'll first create the `stringRDD` RDD and then convert it into a DataFrame when we're reading `stringJSONRDD` using `spark.read.json`.

In [2]:
# Generate our own JSON data

stringJSONRDD = sc.parallelize(("""
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""",
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [3]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

In [4]:
swimmersJSON

DataFrame[age: bigint, eyeColor: string, id: string, name: string]

In [5]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



DataFrame.createOrReplaceTempView(name: str) → None

Creates or replaces a local temporary view with this DataFrame.

=> The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

In [6]:
# Create temporary table

swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [7]:
# DataFrame API
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [8]:
# SQL Query
spark.sql("select * from swimmersJSON").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

#### Inferring the Schema Using Reflection
Note that Apache Spark is inferring the schema using reflection; i.e. it automaticlaly determines the schema of the data based on reviewing the JSON data.

In [9]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



Notice that Spark was able to determine infer the schema (when reviewing the schema using `.printSchema`).

But what if we want to programmatically specify the schema?

#### Programmatically Specifying the Schema
In this case, let's specify the schema for a `CSV` text file.

In [10]:
from pyspark.sql.types import *

# Generate our own CSV data
#   This way we don't have to access the file system yet.
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [11]:
# Print the schema
#   Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [12]:
# SQL Query
spark.sql("select * from swimmers").collect()

[Row(id=123, name='Katie', age=19, eyeColor='brown'),
 Row(id=234, name='Michael', age=22, eyeColor='green'),
 Row(id=345, name='Simone', age=23, eyeColor='blue')]

As you can see from above, we can programmatically apply the `schema` instead of allowing the Spark engine to infer the schema via reflection.

Additional Resources include:
* [PySpark API Reference](https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html)
* [Spark SQL, DataFrames, and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema): This is in reference to Programmatically Specifying the Schema using a `CSV` file.

#### SparkSession

Notice that we're no longer using `sqlContext.read...` but instead `spark.read...`.  This is because as part of Spark 2.0, `HiveContext`, `SQLContext`, `StreamingContext`, `SparkContext` have been merged together into the Spark Session `spark`.
* Entry point for reading data
* Working with metadata
* Configuration
* Cluster resource management

For more information, please refer to [How to use SparkSession in Apache Spark](https://sparkbyexamples.com/spark/sparksession-explained-with-examples/) .

### Querying with the DataFrame API
With DataFrames, you can start writing your queries using the DataFrame API

In [13]:
# Show the values
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [14]:
# Using Databricks `display` command to view the data easier
display(swimmers)

DataFrame[id: bigint, name: string, age: bigint, eyeColor: string]

In [15]:
# Get count of rows
swimmers.count()

3

In [16]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [17]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



### Querying with SQL
With DataFrames, you can start writing your queries using `Spark SQL` - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).

In [18]:
# Execute SQL Query and return the data
spark.sql("select * from swimmers").show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



Let's get the row count:

In [19]:
# Get count of rows in SQL
spark.sql("select count(1) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()


In [None]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

In [None]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

## Application:

Query flight departure delays by State and City by joining the departure delay and join to the airport codes (to identify state and city).

* On-Time Performance Datasets

The source `airports` dataset can be found at [OpenFlights Airport, airline and route data](https://openflights.org/data.php).

The `flights`, also known as the `departuredelays`, dataset can be found at [Airline On-Time Performance and Causes of Flight Delays: On_Time Data](https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays-on-time-data)

1- Read into spark DataFrames the datasets departuredelays.csv and airport-codes.txt.

2- display dataframe with .show(), .cache() , print the data schema

3- Create a local temporary view with these dataframes.

4- answer the queries below:

* Query Sum of Flight Delays by City and Origin Code (for Washington State)
* Query Sum of Flight Delays by State (for the US)
* Add 2 more analysis axes of your choice

In [17]:
flightPerf=spark.read.csv("departuredelays.csv",header=True)
airports=spark.read.csv("airport-codes.txt", sep='\t',header=True)

In [24]:
flightPerf.show(2)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
+--------+-----+--------+------+-----------+
only showing top 2 rows



In [25]:
airports.show(2)

+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
+----------+-----+-------+----+
only showing top 2 rows



In [47]:
flightPerf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [49]:
flightPerf.createOrReplaceTempView("toto")

In [53]:
spark.sql("select * from toto").take(2)

[Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL'),
 Row(date='01020600', delay='-8', distance='369', origin='ABE', destination='DTW')]


    Query Sum of Flight Delays by City and Origin Code (for Washington State)
    Query Sum of Flight Delays by State (for the US)
    Add 2 more analysis axes of your choice


In [58]:
flightPerf.createOrReplaceTempView('flight')
airports.createOrReplaceTempView('airport')

In [106]:
# Query Sum of Flight Delays by City and Origin Code (for Washington State)
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from flight f join airport a on a.IATA = f.origin where a.City = 'Washington DC' group by a.City, f.origin").show()

+-------------+------+--------+
|         City|origin|  Delays|
+-------------+------+--------+
|Washington DC|   DCA|137086.0|
|Washington DC|   IAD|260151.0|
+-------------+------+--------+



In [124]:
(flightPerf.select('delay', 'origin').join(airports.select('IATA','City'),flightPerf.origin==airports.IATA)
                                      .groupby(['City','origin']).agg({'delay':'sum'})
                                      .filter(airports.City=='Washington DC').show())

+-------------+------+----------+
|         City|origin|sum(delay)|
+-------------+------+----------+
|Washington DC|   DCA|  137086.0|
|Washington DC|   IAD|  260151.0|
+-------------+------+----------+



In [108]:
#Query Sum of Flight Delays by State (for the US)
spark.sql("select a.State, sum(f.delay) as Delays from flight f join airport a on a.IATA = f.origin where a.Country = 'USA' group by a.State").show()

+-----+---------+
|State|   Delays|
+-----+---------+
|   SC|  80666.0|
|   AZ| 401793.0|
|   LA| 199136.0|
|   MN| 256811.0|
|   NJ| 452791.0|
|   OR| 109333.0|
|   VA|  98016.0|
| NULL| 397237.0|
|   RI|  30760.0|
|   WY|  15365.0|
|   KY|  61156.0|
|   NH|  20474.0|
|   MI| 366486.0|
|   NV| 474208.0|
|   WI| 152311.0|
|   ID|  22932.0|
|   CA|1891919.0|
|   CT|  54662.0|
|   NE|  59376.0|
|   MT|  19271.0|
+-----+---------+
only showing top 20 rows



In [130]:
(flightPerf.select('delay', 'origin')
                .join(airports.select('IATA','City','State'),flightPerf.origin==airports.IATA)
                .groupby('State').agg({'delay':'sum'})
                .show())

+-----+----------+
|State|sum(delay)|
+-----+----------+
|   SC|   80666.0|
|   AZ|  401793.0|
|   LA|  199136.0|
|   MN|  256811.0|
|   NJ|  452791.0|
|   OR|  109333.0|
|   VA|   98016.0|
| NULL|  397237.0|
|   RI|   30760.0|
|   WY|   15365.0|
|   KY|   61156.0|
|   NH|   20474.0|
|   MI|  366486.0|
|   NV|  474208.0|
|   WI|  152311.0|
|   ID|   22932.0|
|   CA| 1891919.0|
|   CT|   54662.0|
|   NE|   59376.0|
|   MT|   19271.0|
+-----+----------+
only showing top 20 rows



In [109]:
#Query farest destination from where City is Washington
spark.sql("select a.City, f.destination, max(f.delay) as Delays from flight f join airport a on a.IATA = f.origin where a.City = 'Washington DC' group by a.City, f.destination").show()

+-------------+-----------+------+
|         City|destination|Delays|
+-------------+-----------+------+
|Washington DC|        ABQ|     9|
|Washington DC|        ALB|    95|
|Washington DC|        ATL|    98|
|Washington DC|        AUS|     9|
|Washington DC|        BDL|    98|
|Washington DC|        BNA|    99|
|Washington DC|        BOS|    99|
|Washington DC|        BTR|    61|
|Washington DC|        BTV|    97|
|Washington DC|        BUF|     9|
|Washington DC|        CAE|    97|
|Washington DC|        CHS|    95|
|Washington DC|        CLE|    97|
|Washington DC|        CLT|    99|
|Washington DC|        CMH|    99|
|Washington DC|        COS|    83|
|Washington DC|        CRW|     8|
|Washington DC|        CVG|     9|
|Washington DC|        DAY|    96|
|Washington DC|        DCA|     0|
+-------------+-----------+------+
only showing top 20 rows



In [133]:
#one solution is merging everythin on one table, and filter on it
from pyspark.sql.functions import col

df1 = airports.select('IATA','City','State','Country')
df2 = flightPerf.select('origin','destination','delay')

# First join: Merge df1 with df2 on the origin key
merged_df = df2.join(df1, df2.origin == df1.IATA, how="left")

# Rename the columns from the first join with suffix _origin
merged_df = merged_df.withColumnRenamed("City", "origin_city") \
                     .withColumnRenamed("State", "origin_state") \
                     .withColumnRenamed("Country", "origin_country")

# Drop the IATA column after the first join
merged_df = merged_df.drop("IATA")

# Second join: Merge the result with df1 again on the destination key
merged_df = merged_df.join(df1, merged_df.destination == df1.IATA, how="left")

# Rename the columns from the second join with suffix _destination
merged_df = merged_df.withColumnRenamed("City", "destination_city") \
                     .withColumnRenamed("State", "destination_state") \
                     .withColumnRenamed("Country", "destination_country")

# Drop the IATA column after the second join
merged_df = merged_df.drop("IATA")

# Show the final result
merged_df.show()


+------+-----------+-----+-----------+------------+--------------+----------------+-----------------+-------------------+
|origin|destination|delay|origin_city|origin_state|origin_country|destination_city|destination_state|destination_country|
+------+-----------+-----+-----------+------------+--------------+----------------+-----------------+-------------------+
|   ABE|        ATL|    6|  Allentown|          PA|           USA|         Atlanta|               GA|                USA|
|   ABE|        DTW|   -8|  Allentown|          PA|           USA|         Detroit|               MI|                USA|
|   ABE|        ATL|   -2|  Allentown|          PA|           USA|         Atlanta|               GA|                USA|
|   ABE|        ATL|   -4|  Allentown|          PA|           USA|         Atlanta|               GA|                USA|
|   ABE|        ATL|   -4|  Allentown|          PA|           USA|         Atlanta|               GA|                USA|
|   ABE|        ATL|    