In [3]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import StorageLevel

from pyspark.sql import Row
from pyspark.sql.functions import *

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

import findspark

In [12]:
findspark.init('../../../../../spark')

In [13]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Pyspark") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("INFO")

In [14]:
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [15]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

In [16]:
# Create temporary table
swimmersJSON.createOrReplaceTempView('swimmersJSON')

In [17]:
# DataFrame API
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [18]:
#SQL Query
spark.sql('select * from swimmersJSON').collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

<b>Inferring the Schema Using Reflection</b>


Note that Apache Spark is inferring the schema using reflection; i.e. it automaticlaly determines the schema of the data based on reviewing the JSON data.

In [19]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



Notice that Spark was able to determine infer the schema (when reviewing the schema using .printSchema).

But what if we want to programmatically specify the schema?

<b> Programmatically Specifying the Schema</b>

In this case, let's specify the schema for a CSV text file.



In [20]:
from pyspark.sql.types import *

In [21]:
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

In [22]:
schemaString = 'id name age eyeColor'

schema = StructType([
    StructField('id', LongType(), True),
    StructField('name', StringType(), True),
    StructField('age', LongType(), True),
    StructField('eyeColor', StringType(), True)
])

In [23]:
# Apply the schema to the RDD and Create DataFrame

swimmers = spark.createDataFrame(stringCSVRDD, schema)

In [24]:
swimmers.createOrReplaceTempView('swimmers')

In [25]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [26]:
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [27]:
type(swimmersJSON)

pyspark.sql.dataframe.DataFrame

In [28]:
type(swimmers)

pyspark.sql.dataframe.DataFrame

## Querying with SQL
With DataFrames, you can start writing your queries using Spark SQL - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).

In [29]:
# Executure SQL Query and return the data
spark.sql('select * from swimmers').show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [30]:
# Get count of rows in SQL
spark.sql('select count(1) from swimmers').show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



Note, you can make use of %sql within the notebook cells of a Databricks notebook.



In [31]:
spark.sql('select id ,age from swimmers where age ==22').show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [32]:
# Query id and age for swimmers with age=22 via DataFrame API
swimmers.select('id','age').filter('age ==22').show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [33]:
# Query id and age for swimmers with age = 22 via
# DataFrame API in another way

swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [34]:
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [35]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql('select name, eyeColor from swimmers where eyeColor like "b%"' ).show()


+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



In [36]:
swimmers.select('name','eyeColor').filter('eyeColor like "b%"').show()


+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



### Querying with the DataFrame API

With DataFrames, you can start writing your queries using the DataFrame API



In [37]:
# Show the values
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [38]:
swimmers.count()

3

In [39]:
# Get the id, age where age == 22
swimmers.select('id','age').filter('age = 22').show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [40]:
spark.sql('select id, age from swimmers where age = 22').show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



## On-Time Flight Performance

Query flight departure delays by State and City by joining the departure delay and join to the airport codes (to identify state and city).



### DataFrame Queries 
Let's run a flight performance using DataFrames; let's first build the DataFrames from the source datasets.



In [47]:
os.listdir('../../../../apache_zeppelin_notebook/data/')

['.DS_Store', 'Chapter6', 'Chapter5', 'Chapter2', 'Chapter3', 'Chapter4']

In [48]:
# Set File Paths
path = '../../../../apache_zeppelin_notebook/data/Chapter3/'
flightPerFilePath = path+'departuredelays.csv'
airportsFilePath  = path+'airport-codes-na.txt'

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView('airports')

# Obtain Departure Delays dataset
flightPref = spark.read.csv(flightPerFilePath, header='true')
flightPref.createOrReplaceTempView('FlightPerformance')

# Cache the Departure Delays dataset
flightPref.cache()


DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [49]:
airports.show(3)

+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
|   Abilene|   TX|    USA| ABI|
+----------+-----+-------+----+
only showing top 3 rows



In [50]:
flightPref.show(3)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 3 rows



In [51]:
# Query Sum of Flight Delays by City and Origin Code
# (for Washington State)
spark.sql('''
          select a.City, f.origin, sum(f.delay) as Delays
          from FlightPerformance f
          join airports a on a.IATA = f.origin
          where a.State = 'WA'
          group by a.City, f.origin
          order by sum(f.delay) desc
          ''').show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [52]:
# Query Sum of Flight Delays by State ( for the US)
spark.sql('''select a.State, sum(f.delay) as Delays
             from FlightPerformance f
             join airports a on a.IATA = f.origin
             where a.Country = "USA"
             group by State 
             order by sum(f.delay) 
             ''' ).show()

+-----+-------+
|State| Delays|
+-----+-------+
|   AK| 5384.0|
|   WV| 8408.0|
|   VT|14755.0|
|   ME|15214.0|
|   WY|15365.0|
|   MT|19271.0|
|   NH|20474.0|
|   ID|22932.0|
|   KS|23752.0|
|   ND|27402.0|
|   SD|28790.0|
|   RI|30760.0|
|   MS|33827.0|
|   HI|36825.0|
|   CT|54662.0|
|   NE|59376.0|
|   KY|61156.0|
|   NM|64422.0|
|   IA|65128.0|
|   AR|69453.0|
+-----+-------+
only showing top 20 rows

