In [14]:
import pandas as pd
import geopandas as gpd
from pyspark.sql import SparkSession, functions as F
import matplotlib.pyplot as plt
import pyspark
import numpy as np
from matplotlib.ticker import FixedLocator

In [15]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("curating parquets using sql")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

In [16]:
# Reading in the data 
sdf = spark.read.parquet("../data/raw/yellow_data/*")
sdf = sdf.withColumn("hourly_time", F.date_trunc("hour", F.col("tpep_pickup_datetime")))
weather_sdf = spark.read.option("header", True).csv("../data/curated/weather.csv")


# Creating a temporary sql view
sdf.createOrReplaceTempView('taxi')
weather_sdf.createOrReplaceTempView('weather')

### Creating Parquet for Hourly Demand of Taxi Cabs based on weekday

In [17]:
hourly_demand_wd = spark.sql("""
SELECT 
    HOUR(tpep_pickup_datetime) as hour,
    WEEKDAY(CAST(tpep_pickup_datetime AS DATE)) as wd,
    COUNT(*) as demand
FROM 
    taxi
GROUP BY 
    WEEKDAY(CAST(tpep_pickup_datetime AS DATE)),
    HOUR(tpep_pickup_datetime)
ORDER BY
    WEEKDAY(CAST(tpep_pickup_datetime AS DATE)),
    HOUR(tpep_pickup_datetime)             
""")
                             
hourly_demand_wd \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/hourly_demand_wd')

                                                                                

### Creating Parquet for Total Demand based on different Zones

In [18]:
location_demand = spark.sql("""
SELECT 
    COUNT(*) as demand,
    pulocationid
FROM 
    taxi
GROUP BY 
    pulocationid
""")

location_demand \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/location_demand')

                                                                                

### Creating parquet which combines all the trip data with the hourly weather data

In [19]:
combined = spark.sql("""
SELECT 
    *
FROM 
    taxi t
LEFT JOIN
    weather w
ON 
    t.hourly_time = w.time   
ORDER BY     
    t.tpep_pickup_datetime       
""")

### Creating Parquet for Hourly demand for combined

In [21]:
combined.createOrReplaceTempView('combined')
hour_all = spark.sql("""
SELECT 
    COUNT(*) as demand,
    AVG(rainmm) as rain,
    AVG(windspeed10mkmh) as wind,
    AVG(cloudcover) as cloud,   
    AVG(temperature2mC) as temp,                    
    hourly_time as date_time,
    pulocationid
FROM
    combined
GROUP BY 
    pulocationid,
    hourly_time
""")
                      
hour_all \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/hour_all')

23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/08/19 22:28:45 WARN RowBasedKeyValueBatch: Calling spill() on