In [None]:
# Install libraries within the notebook scope
sc.install_pypi_package("boto3")
sc.install_pypi_package("pandas")
sc.install_pypi_package("requests")
sc.install_pypi_package("s3fs")

In [None]:
import boto3
from datetime import datetime
from datetime import timedelta
import pandas as pd
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import functions as f, types as t
from pathlib import Path
import requests
import s3fs
import subprocess
import timeit
from urllib.parse import urlparse

# Removes truncation of columns, column values in Pandas
# by default
pd.set_option('max_columns', None)
pd.set_option('max_colwidth', None)

# Monkey patching the DataFrame transform method for spark 2.4
def transform(self, f):
    return f(self)
DataFrame.transform = transform

# Override the timeit template to return the command's
# return value in addition to the time
# Reference: https://stackoverflow.com/questions/24812253/how-can-i-capture-return-value-with-python-timeit-module
timeit.template = """
def inner(_it, _timer{init}):
    {setup}
    _t0 = _timer()
    for _i in _it:
        retval = {stmt}
    _t1 = _timer()
    return _t1 - _t0, retval
"""

def shell_cmd(cmd):
    for line in subprocess.check_output(cmd, shell=True).split(b'\n'):
        print(line)

def timer_method(cmd):
    # Setting globals = globals() enables the timeit function
    # to return the value generated by cmd
    return timeit.timeit(cmd, number=1, globals = globals())

### Lab 3.1 - Leveraging File Types

Write functions to:
* Write out the taxi-lookup dataset to local storage as csv, json, and parquet files
* Write a function to read the files and to print/return a dataframe containing the counts of zones per borough
* Time how long it takes to write out the different file types and to perform the aggregation using each file type

In [None]:
# These should point to the paths where the data from the taxi and taxi-lookup ingests were written
taxiPath = "hdfs:///tmp/data/nyc-taxi/taxi-data/output/section2/json/"
taxiLookupPath = "hdfs:///tmp/data/nyc-taxi/zone-lookup/output/section2/json/"

In [None]:
def write_csv(inputDF):
    (inputDF
        .write
        .mode("overwrite")
        .option("header", True)
        .csv("hdfs:///tmp/data/nyc-taxi/taxi-zone/output/section3/csv"))

def write_json(inputDF):
    (inputDF
        .write
        .mode("overwrite")
        .json("hdfs:///tmp/data/nyc-taxi/taxi-zone/output/section3/json"))

def write_parquet(inputDF):
    (inputDF
        .write
        .mode("overwrite")
        .parquet("hdfs:///tmp/data/nyc-taxi/taxi-zone/output/section3/parquet"))
        
def read_test_csv():
    readDF = (spark
                 .read
                 .option('header', True).csv("hdfs:///tmp/data/nyc-taxi/taxi-zone/output/section3/csv"))
    readDF.show()
    dfCount = readDF.groupBy("borough").count()
    print(dfCount)
    return dfCount

def read_test_json():
    readDF = (spark
                 .read
                 .json("hdfs:///tmp/data/nyc-taxi/taxi-zone/output/section3/json"))
    readDF.show()
    dfCount = readDF.groupBy("borough").count()
    print(dfCount)
    return dfCount

def read_test_parquet():
    readDF = (spark
                 .read
                 .parquet("hdfs:///tmp/data/nyc-taxi/taxi-zone/output/section3/parquet"))
    readDF.show()
    dfCount = readDF.groupBy("borough").count()
    print(dfCount)
    return dfCount

In [None]:
inputDF = (spark
              .read
              .json(taxiLookupPath))
inputDF.printSchema()
inputDF.show()

print(f'CSV write time: {timer_method("write_csv(inputDF)")}')
print(f'JSON write time: {timer_method("write_json(inputDF)")}')
print(f'Parquet write time: {timer_method("write_parquet(inputDF)")}')

In [None]:
print(f'CSV read and transform time: {timer_method("read_test_csv()")}')
print(f'JSON read and transform time: {timer_method("read_test_json()")}')
print(f'Parquet read and transform time: {timer_method("read_test_parquet()")}')

### Lab 3.2 Partitioning

Write functions to:
* Based on the class discussion , update the following functions with the most approrpiate partitioning values
* Write a function to read the written files and to print/return a dataframe containing the counts of taxi rides per pickup_month
* Time how long it takes to write out the data using the different partitioning methodologies and to perform the aggregation using strategy

In [None]:
def write_paritioned_parquet(inputDF):
    (inputDF
        .write
        .mode("overwrite")
        .partitionBy('pickup_month')
        .parquet("hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/partitioned"))
            
def write_coalesce_parquet(inputDF):
    (inputDF
        .coalesce(5)
        .write     
        .mode("overwrite")
        .partitionBy('pickup_month')
        .parquet("hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/coalesced"))
            
def write_repartition_parquet(inputDF):
    (inputDF
        .repartition(5)
        .write
        .mode("overwrite")
        .partitionBy('pickup_month')
        .parquet("hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/repartitioned"))
            
def write_sorted_parquet(inputDF): #RLE, need to verify that this still matters
    (inputDF.orderBy('pickup_month','passenger_count', 'PULocationID', 'DOLocationID', 'trip_distance', 'fare_amount', 'tip_amount', 'tpep_dropoff_datetime', 'tpep_pickup_datetime')
        .coalesce(5)     
        .write
        .mode("overwrite")
        .partitionBy('pickup_month')
        .parquet("hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/sorted"))

# def write_parition(inputDF, partition):
#     writeDF = inputDF.filter(f"pickup_month == 'f{partition}'")
#     writeDF.show()
#     (writeDF.orderBy('passenger_count', 'PULocationID', 'DOLocationID', 'trip_distance', 'fare_amount', 'tip_amount', 'tpep_dropoff_datetime', 'tpep_pickup_datetime')
#         .repartition(5)
#         .write
#         .mode("overwrite")
#         .parquet(f"hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/manual/pickup_month={parition}"))

# def write_pre_defined_partitions(inputDF):
#     ##multiparll over list of partitions
#     #need to verify if this is actually faster
#     from multiprocessing import Process
#     partitionList = [x[0] for x in inputDF.select("pickup_month").distinct().collect()]
#     print(parititionList)
#     for partiion in partitionList:
#         writeProcess = Process(target=write_parition, args=(inputDF, partiion))
#         writeProcess.start()
#         writeProcess.join()
                
#     spark.sql("MSCK RPAIR TABLE table_name")
#     spark.sql("REFRESH TABLE table_name")

def read_test_parquet(readPath):
    readDF = (spark
                 .read
                 .parquet(readPath).sort("pickup_month", "passenger_count"))
    return readDF
def agg_test_parquet(baseDF):
    dfCount = baseDF.groupBy("pickup_month").count()
    print(dfCount)
    return dfCount

In [None]:
# Will talk through how to access the spark ui/view progress of the running code while this operation is running
inputDF = (spark
              .read
              .json(taxiPath)
              .withColumn("pickup_month", f.date_format("pickup_datetime", "yyyyMM")))
inputDF.show()

print(timer_method("write_paritioned_parquet(inputDF)"))
print(timer_method("write_coalesce_parquet(inputDF)"))
print(timer_method("write_repartition_parquet(inputDF)"))
print(timer_method("write_sorted_parquet(inputDF)"))
# print(timer_method("write_pre_defined_partitions(inputDF)"))

In [None]:
testPaths = [
    'hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/partitioned',
    'hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/coalesced',
    'hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/repartitioned',
    'hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/sorted',
#     'hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/manual/'
]

for testPath in testPaths:
    print(timer_method(f"read_test_parquet('{testPath}')"))
    baseDF = read_test_parquet(testPath)
    print(timer_method("agg_test_parquet(baseDF)"))

### Schema Management

In [None]:
# Normally want to leverage a respository
# In scala you could apply the types to the dataframe to create a dataset. That is not possible in PySpark 2.4
# Can leverage a SQL DDL or a spark structType. 
taxiSchema = inputDF.schema
print(spark.read.schema(taxiSchema).parquet('hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/sorted').count())

In [None]:
taxiSchema = t.StructType([ \
    t.StructField('DOLocationID',t.LongType(),True), \
    t.StructField('PULocationID',t.LongType(),True), \
    t.StructField('RatecodeID',t.StringType(),True), \
    t.StructField('SR_Flag',t.StringType(),True), \
    t.StructField('VendorID',t.StringType(),True), \
    t.StructField('congestion_surcharge',t.StringType(),True), \
    t.StructField('dispatching_base_num',t.StringType(),True), \
    t.StructField('dropoff_datetime',t.StringType(),True), \
    t.StructField('extra',t.StringType(),True), \
    t.StructField('fare_amount',t.DoubleType(),True), \
    t.StructField('hvfhs_license_num',t.StringType(),True), \
    t.StructField('improvement_surcharge',t.StringType(),True), \
    t.StructField('lpep_dropoff_datetime',t.StringType(),True), \
    t.StructField('lpep_pickup_datetime',t.StringType(),True), \
    t.StructField('mta_tax',t.StringType(),True), \
    t.StructField('passenger_count',t.LongType(),True), \
    t.StructField('payment_type',t.StringType(),True), \
    t.StructField('pickup_datetime',t.StringType(),True), \
    t.StructField('service',t.StringType(),True), \
    t.StructField('store_and_fwd_flag',t.StringType(),True), \
    t.StructField('tip_amount',t.DoubleType(),True), \
    t.StructField('tolls_amount',t.StringType(),True), \
    t.StructField('total_amount',t.StringType(),True), \
    t.StructField('tpep_dropoff_datetime',t.StringType(),True), \
    t.StructField('tpep_pickup_datetime',t.StringType(),True), \
    t.StructField('trip_distance',t.DoubleType(),True), \
#     StructField('trip_type',t.StringType(),True), \
    t.StructField('pickup_month',t.StringType(),True)])
print(spark.read.schema(taxiSchema).parquet('hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/sorted').count())

# stations = "col1 STRING, col2 INT"
spark.createDataFrame(inputDF.rdd,schema=taxiSchema,verifySchema=True).show()

### Lab 3.3 - Case Study 3: Find the average taxi rides per zip code on the 10 worst air quality days of each month

Write functions to: 
* Select the best iteration of the taxi data for this workload
* Ingest the borough zipcode mapping and explode the zip code columns
* Ingest air quality data from: https://www.airnowapi.org/aq/observation/zipCode/historical/?format=application/json&zipCode={zipCode}&date={date}T00-0000&distance=100&API_KEY={apiKey}
    * https://docs.airnowapi.org/HistoricalObservationsByZip/docs
    * Make sure you have created a account at: https://docs.airnowapi.org/account/request/
    * Use the zip codes from the borough zipcode mapping
    * Due to API call limitations only pull 100 days of air quality for the following zipcodes (11212, 10023, 11374, 11414) starting on 2020/06/01
    * #### Be aware that you only have 500 requests per hour to that api endpoint
* Find the average taxi rides per borough on the 10 worst air quality days of each month
* Persist the resulting agg dataframe using .cache()
* Run and time the full ingest

Make sure to write the ingests with an eye on efficiency for this specific workload 

In [None]:
def get_taxi_df():
    taxiDF = spark.read.parquet("hdfs:///tmp/data/nyc-taxi/taxi-data/output/section3/sorted")
#     taxi_lookup = spark.read.parquet("hdfs:///tmp/data/nyc-taxi/zone-lookup/output/section2/json/") #expected error
    taxi_lookup = spark.read.json("hdfs:///tmp/data/nyc-taxi/zone-lookup/output/section2/json/")
    taxi_filtered = (taxiDF
     .filter(taxiDF.pickup_datetime.isNotNull())
     .filter(taxiDF.dropoff_datetime.isNotNull()))
                     
    groupDF = taxi_filtered.join(taxi_lookup.select("Borough", "LocationID"), taxi_filtered.PULocationID == taxi_lookup.LocationID)
    groupDF.show()
    return groupDF

def get_zip_code_mapping_df():
    zipReadDF = spark.read.option('header', True).csv("s3://data-scale-oreilly/data/borough-zip-mapping/ny-zip-codes.csv")
    returnZipDF = zipReadDF.select('Borough', 'Neighborhood', f.explode(f.split('ZIP Codes', ',')).alias('zip'))
    returnZipDF.show()
    return returnZipDF


def get_air_quality_df(zipDF):
#     zipList = [x[0] for x in zipDF.select('zip').collect()]
    zipList = ['11212', '10023']#, '11374', '11414']
    airQualitySchema = t.StructType([
        t.StructField("AQI", t.LongType(), True),
        t.StructField("Category", t.MapType(t.StringType(), t.LongType()), True),
        t.StructField("DateObserved", t.StringType(), True),
        t.StructField("HourObserved", t.LongType(), True),
        t.StructField("Latitude", t.DoubleType(), True),
        t.StructField("LocalTimeZone", t.StringType(), True),
        t.StructField("Longitude", t.DoubleType(), True),
        t.StructField("ParameterName", t.StringType(), True),
        t.StructField("ReportingArea", t.StringType(), True),
        t.StructField("StateCode", t.StringType(), True),
        t.StructField("zip", t.StringType(), True),
        ])
    
    airDF = spark.createDataFrame([], airQualitySchema)

    for zipCode in zipList:
        for d in range(0,99):
            startDate = datetime.strptime("2020-06-01", "%Y-%m-%d").date()
            dateDelta = timedelta(days=d)
            endDate = startDate + dateDelta
            
            apiPath = f"https://www.airnowapi.org/aq/observation/zipCode/historical/?format=application/json&zipCode={zipCode}&date={endDate}T00-0000&distance=100&API_KEY=8DFC7E6B-F641-41D9-95DC-9CF3B90AF038"
            request = requests.get(apiPath)
            requestDF = spark.createDataFrame(request.json())
            requestDF = requestDF.withColumn('zip', f.lit(zipCode))
            airDF = airDF.unionAll(requestDF)
        
    returnAirDF = airDF.withColumn("categoryNumber", f.col("Category.Number"))
    returnAirDF.show()
    
    return returnAirDF

def calculate_hottest_days(taxiDF, airQualityDF, zipDF):
    joinAirDF = airQualityDF.withColumn('air_day', f.date_format("dateObserved", "yyyyMMdd")).withColumnRenamed("zip", "air_zip")
    
    zipDF = zipDF.withColumn('zipBorough', f.col('Borough'))
    
    taxiZipDF = taxiDF.join(zipDF.select('zip', 'ZipBorough'), zipDF.zipBorough == taxiDF.Borough)
    
    joinTaxiDF = (taxiZipDF.withColumn("pickup_day", f.date_format("pickup_datetime", "yyyyMMdd"))
                        .withColumn("pickup_month", f.date_format("pickup_datetime", "yyyyMM")))

    joinCondition = [joinTaxiDF.pickup_day == joinAirDF.air_day, joinTaxiDF.zip == joinAirDF.air_zip]
    
    joinedAirDF = joinTaxiDF.join(joinAirDF.select('categoryNumber', 'air_day', 'air_zip'), joinCondition)
    
    aggDF = (joinedAirDF.groupBy('pickup_month', 'pickup_day', 'zip')
                        .agg(f.count('pickup_day').alias('count_rides'), f.avg('categoryNumber').alias('avg_cat')))
    
    win = Window.partitionBy("zip", "pickup_month").orderBy(f.desc("avg_cat", 'count_rides'))
    aggDF = aggDF.withColumn("row_num", f.row_number().over(win)).where("row_num >= 10")
    
    aggDF.show()
    return aggDF

In [None]:
def run_case_study():
    taxiDF = get_taxi_df()
    zipDF = get_zip_code_mapping_df()
    airQualityDF = get_air_quality_df(zipDF)
    airQualityDF.show()
    aggedDF = calculate_hottest_days(taxiDF, airQualityDF, zipDF)
    aggedDF.cache()
    print(aggedDF.count())
    # .cache is lazy evaluated, so we do the count to force the action

In [None]:
print(timer_method("run_case_study()"))

### Lab 3.4 - Write out data out to S3 for long term storage

Write functions to:
* Write the taxi/taxi-lookup data to S3 using the most appropriate storage format/partitioning methodology
* Write the cached dataframe from the previous usecase to S3 using the most appropriate storage format/partitioning methodology


In [None]:
# Normally want to avoid writing to EBS then copying to S3
# Intermediate workloads should be written to local EBS, while finished workloads/longterm storage should be read directly from/written directly to S3
# Luckily, EMRFS let's us write out directly to S3

def write_to_s3():
    taxiDF = (spark
              .read
              .json(taxiPath)
              .withColumn("pickup_month", f.date_format("pickup_datetime", "yyyyMM")))
    taxiLookupDF = (spark
              .read
              .json(taxiLookupPath)
              .withColumn("pickup_month", f.date_format("pickup_datetime", "yyyyMM")))
    
    (taxiDF.orderBy('pickup_month', 'passenger_count', 'PULocationID', 'DOLocationID', 'trip_distance', 'fare_amount', 'tip_amount', 'tpep_dropoff_datetime', 'tpep_pickup_datetime')
        .coalesce(5)     
        .write
        .mode("overwrite")
        .partitionBy('pickup_month')
        .parquet("s3://tmp/data/nyc-taxi/taxi-data/final"))
    
    (taxiLookupDF.orderBy('borough', 'service_zone', 'locationID', 'zone')
        .coalesce(1)     
        .write
        .mode("overwrite")
        .parquet("s3://tmp/data/nyc-taxi/taxi-lookup/final"))
    
    # We are leveraging (dangerously) a quirk of python here and a functionality of spark that let's us persist dataframes in memory. We are doing this to workaround a limitation of the timing function and the 500 api call limit from the air quality api
    (aggedDF.orderBy('pickup_month', 'row_num', 'zip', 'avg_cat', 'count_rides')
        .coalesce(1)     
        .write
        .mode("overwrite")
        .partitionBy('pickup_month')
        .parquet("s3://tmp/data/nyc-taxi/taxi-agg/final"))