# New York Taxi ETL

##### Import Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, DoubleType, IntegerType
from pyspark.sql.functions import lit
from pyspark.sql import functions as F

### Construct Spark Session

In [None]:
#Start SparkSession with azure hadoop package 
spark = SparkSession.builder.master('local').appName('app').config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1').getOrCreate()        
spark.conf.set("fs.azure.account.key.springboardstorage.blob.core.windows.net","{key}")

In [None]:
# Create mount mount to connect to azure blob
# ...Use this once or enter into try / expect block
try:
    dbutils.fs.mount(source = "wasbs://springboardcontainer@springboardstorage.blob.core.windows.net",
    mount_point = "/mnt/taxi_etl",
    extra_configs = {"fs.azure.account.key.springboardstorage.blob.core.windows.net": "{key}"})
# How to pass in java.lang.IllegalArgumentException?
except Exception as e:
    print('Already mounted')

In [None]:
# View files in SpringBoard Container
dbutils.fs.ls("/mnt/taxi_etl")

#### Grab Taxi Data

##### Find Dates

In [None]:
# Example URLs
# https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-01.parquet

import pandas as pd
import requests

def get_dates():
    yellow_dates = pd.date_range('2011-01-01','2022-12-01',freq='MS').strftime("%Y-%m").to_list()
    return yellow_dates
    
yellow_dates = get_dates()



##### Functions To Download and Store Data

In [None]:
class ScrapeNyTaxi:
    '''
    Functions to loop thru select dates from get_dates() and download each parquet file in that range
    Yellow taxi data: 2011-2022
    '''
    def grab_yellow():
        '''
        Download yellow cab parquet files 
        '''
        for date in yellow_dates:

            try:
                url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{date}.parquet'
                response = requests.get(url, allow_redirects=True)
                open(f'/dbfs/mnt/taxi_etl/trip_data/yellow_{date}.parquet','wb').write(response.content)

            except Exception as e:
                print(f'Exception: {e}')
                continue


In [None]:
ScrapeNyTaxi.grab_yellow()

##### Remove Empty files

In [None]:
import os
path = '/dbfs/mnt/taxi_etl/trip_data/'
onlyfiles = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
# Total number of files
print(f'Total: {len(onlyfiles)} files')

In [None]:
# Several files are empty, as there was no data to pull from the web
for file in onlyfiles:
    if os.path.getsize(path+file) < 250:
      print(f'{file} has no data')
      os.remove(path+file)

In [None]:
onlyfiles = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
# Total number of files
print(f'Total: {len(onlyfiles)} files')

##### Print Size of Files

In [None]:
yellow_bytes = 0

for file in onlyfiles:
    if file.startswith('yellow'):
        yellow_bytes += os.path.getsize(path+file)

print(f'yellow taxi data totals {yellow_bytes/1000000000} gigs')

#### Explore Taxi Data

##### Schemas

In [None]:
directory = '/dbfs/mnt/taxi_etl/trip_data/'

yellow_taxi = []

for file in os.listdir(directory):
    if file.startswith('yellow'):
        yellow_taxi.append(file)

all_data = [yellow_taxi]

In [None]:
def get_schema(data):

    file_list = []
    schema_list = []

    for files in data:
        df = spark.read.option('inferSchema','true').format('parquet').load(directory[5:]+files)
        file_list.append(files)
        schema_list.append(str(df.dtypes))

    list_zip = zip(file_list,schema_list)
    zipped_list = list(list_zip)

    df_schema = StructType([ \
        StructField("File",StringType(),True), \
        StructField("Schema",StringType(),True),
    ]) 

    df = spark.createDataFrame(zipped_list,schema= df_schema)
    df = df.groupBy("Schema").agg(F.collect_list('File'))
    
    data_str = data[0]
    name = data_str.split(' ')[0]
    
    df.write.json(f'/dbfs/mnt/taxi_etl/trip_data/{name}_schema.json')
    df.show()

In [None]:
# Run schema finder for lists of data files
for data in all_data:
    get_schema(data)

#### Normalize Schemas

##### Define Schemas

In [None]:
# Define Schemas for yellow taxi data

yellow_schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('pickup_datetime', TimestampType(), True),
    StructField('dropoff_datetime', TimestampType(), True),
    StructField('passenger_count', StringType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', LongType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('airport_fee', IntegerType(), True),
    StructField('taxi_type', StringType(), True),
    ])

##### Cast Each Group's Schema and Union to Itself

In [None]:
# IF SCHEMAS ARE DIFFERENT WITHIN GROUPS (YELLOW, GREEN, HIGH-VOLUME)
# FUNCTION TO NORMALIZE SCHEMA

def make_yellow():

    emptyRDD = spark.sparkContext.emptyRDD()
    yellow_df = spark.createDataFrame(emptyRDD,schema=yellow_schema)

    yellow_list = []

    for file in os.listdir(directory):
        if file.startswith('yellow'):
            yellow_list.append(file)    

    for file in yellow_list:
        df_yellow = spark.read.option('inferSchema','true').parquet(f'{directory[5:]}{file}')
        df_yellow = df_yellow.withColumn('taxi_type',lit('yellow'))
        df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime','pickup_datetime')\
            .withColumnRenamed('tpep_dropoff_datetime','dropoff_datetime')

        df_yellow.createOrReplaceTempView('Cast')

        df_yellow = spark.sql("SELECT BIGINT(VendorID),TIMESTAMP(pickup_datetime),\
            TIMESTAMP(dropoff_datetime),DOUBLE(passenger_count),DOUBLE(trip_distance),\
            BIGINT(RatecodeID),STRING(store_and_fwd_flag),BIGINT(PULocationID),BIGINT(DOLocationID),\
            BIGINT(payment_type),DOUBLE(fare_amount),DOUBLE(extra),DOUBLE(mta_tax),DOUBLE(tip_amount),\
            DOUBLE(tolls_amount),DOUBLE(improvement_surcharge),DOUBLE(total_amount),DOUBLE(congestion_surcharge),\
            DOUBLE(airport_fee),STRING(taxi_type) from Cast")

        yellow_df = df_yellow.union(yellow_df)
        print(f'{file} analyzed')

    yellow_df.printSchema()

    return yellow_df

In [None]:
def write_dfs():
    yellow_df = spark.read.schema(yellow_schema).format('parquet').load(directory[5:]+'yellow*.parquet')
    yellow_df.show()
    # IF SCHEMA ISSUES
    # yellow_df = make_yellow()

write_dfs()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2011-01-01 00:10:00|  2011-01-01 00:12:00|              4|          0.0|         1|              null|         145|         145|           1|        2.9|  0.5|    0.5|      0.2

#### Grab New York Weather Data

##### Import Packages

In [None]:
%pip install selenium

In [None]:
import datetime
from bs4 import BeautifulSoup as BS
from selenium import webdriver
import pandas as pd
import time

##### Install Chrome and ChromeDriver

In [None]:
dbutils.fs.mkdirs("dbfs:/databricks/scripts/")
dbutils.fs.put("/databricks/scripts/selenium-install.sh","""
#!/bin/bash
%sh
LAST_VERSION="https://www.googleapis.com/download/storage/v1/b/chromium-browser-snapshots/o/Linux_x64%2FLAST_CHANGE?alt=media"
VERSION=$(curl -s -S $LAST_VERSION)
if [ -d $VERSION ] ; then
  echo "version already installed"
  exit
fi
 
rm -rf /tmp/chrome/$VERSION
mkdir -p /tmp/chrome/$VERSION
 
URL="https://www.googleapis.com/download/storage/v1/b/chromium-browser-snapshots/o/Linux_x64%2F$VERSION%2Fchrome-linux.zip?alt=media"
ZIP="${VERSION}-chrome-linux.zip"
 
curl -# $URL > /tmp/chrome/$ZIP
unzip /tmp/chrome/$ZIP -d /tmp/chrome/$VERSION
 
URL="https://www.googleapis.com/download/storage/v1/b/chromium-browser-snapshots/o/Linux_x64%2F$VERSION%2Fchromedriver_linux64.zip?alt=media"
ZIP="${VERSION}-chromedriver_linux64.zip"
 
curl -# $URL > /tmp/chrome/$ZIP
unzip /tmp/chrome/$ZIP -d /tmp/chrome/$VERSION
 
mkdir -p /tmp/chrome/chrome-user-data-dir
 
rm -f /tmp/chrome/latest
ln -s /tmp/chrome/$VERSION /tmp/chrome/latest
 
# to avoid errors about missing libraries
sudo apt-get update
sudo apt-get install -y libgbm-dev
""", True)
display(dbutils.fs.ls("dbfs:/databricks/scripts/"))

Wrote 1045 bytes.


path,name,size,modificationTime
dbfs:/databricks/scripts/selenium-install.sh,selenium-install.sh,1045,1683585238000


In [None]:
%sh
/dbfs/databricks/scripts/selenium-install.sh

In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
s = Service('/tmp/chrome/latest/chromedriver_linux64/chromedriver')
options = webdriver.ChromeOptions()
options.binary_location = "/tmp/chrome/latest/chrome-linux/chrome"
options.add_argument('headless')
options.add_argument('--disable-infobars')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--no-sandbox')
options.add_argument('--remote-debugging-port=9222')
options.add_argument('--homedir=/tmp/chrome/chrome-user-data-dir')
options.add_argument('--user-data-dir=/tmp/chrome/chrome-user-data-dir')
prefs = {"download.default_directory":"/tmp/chrome/chrome-user-data-di",
         "download.prompt_for_download":False
}
options.add_experimental_option("prefs",prefs)
# driver = webdriver.Chrome(service=s, options=options)

##### Load Wunderground Data

In [None]:
# Find range of dates matching with Taxi Trip data
# Earliest = 01/01/2010
# Latest = 01/01/2023
def get_dates():
    d1 = datetime.date(2010,1,1)
    d2 = datetime.date(2023,1,1)
    dd = [d1 + datetime.timedelta(days=x) for x in range((d2-d1).days + 1)]
    date_list = []
    for d in dd:
        date_list.append(str(d))
    return date_list


In [None]:
# function to load wunderground data (without this it has no records to show)
def render_page(url):
    driver = webdriver.Chrome(service=s, options=options)    
    driver.get(url)
    time.sleep(3)
    r = driver.page_source
    driver.quit()
    return r

In [None]:
def list_transpose(data_list):
    res_list = [[item.replace('%', '') for item in lst] for lst in data_list]
    res_list = [[item.replace(u'\xa0', u'') for item in lst] for lst in res_list]
    res_list = [[item.replace('°F','') for item in lst] for lst in res_list]
    res_list = [[item.replace('°in','') for item in lst] for lst in res_list]
    res_list = [[item.replace('°%','') for item in lst] for lst in res_list]
    res_list = [[item.replace('°mph','') for item in lst] for lst in res_list]
    final_list = [[item.replace('°','') for item in lst] for lst in res_list]
    return final_list

In [None]:
def set_schema(df):
    # To Interger
    df[["Temperature","Dew_Point", "Humidity","Wind_Speed","Wind_Gust"]] = df[["Temperature","Dew_Point", "Humidity","Wind_Speed","Wind_Gust"]].apply(pd.to_numeric)
    df[['Pressure','Precipitation']] = df[['Pressure','Precipitation']].apply(pd.to_numeric)
    # To DateTime
    df['datetime'] = df['datetime'].apply(pd.to_datetime)
    # To String
    df[['Wind','Condition']] = df[['Wind','Condition']].applymap(str)
    return df

In [None]:
def scraper(page, dates):
    # function to scrape wunderground
    for d in dates:

        url = str(str(page) + str(d))

        r = render_page(url)

        soup = BS(r, "html.parser")
        container = soup.find('lib-city-history-observation')
        check = container.find('tbody')

        data = []
        try:
            for c in check.find_all('tr', class_='ng-star-inserted'):
                for i in c.find_all('td', class_='ng-star-inserted'):
                    trial = i.text
                    trial = trial.strip('  ')
                    data.append(trial)
            
            df_daily = []
            cols = ['Time','Temperature','Dew_Point','Humidity','Wind','Wind_Speed','Wind_Gust','Pressure','Precipitation','Condition','Date']
            for i in range(0,len(data),10):
                snip_data = []
                snip_data.append(data[i:i+10])
                # Strip of Weird Characters
                snip_data = list_transpose(snip_data)
                snip_data[0].append(d)
                df = pd.DataFrame(snip_data,columns=cols)
                df['datetime'] = df['Date'] + ' ' + df['Time']
                df = df.drop(['Date','Time'],axis=1) 
                # Set Schema
                df = set_schema(df)
                df_daily.append(df)

            df_daily = pd.concat(df_daily)
            path = 'C:/Users/Robert.Jones/OneDrive - Central Coast Energy Services, Inc/Desktop/Springboard/Capstone/wunderground/parquet_files'
                        
            df_daily.to_parquet(f'/dbfs/mnt/taxi_etl/weather_data/NY_Weather{d}.parquet')
            
        except AttributeError:
            continue

In [None]:
# Call Functions
dates = get_dates()
page = 'https://www.wunderground.com/history/daily/us/ny/new-york-city/KLGA/date/'

df = scraper(page, dates)
# Wrote 4745 files
# Size of 42M

In [None]:
%sh du -h /dbfs/mnt/taxi_etl/weather_data/

In [None]:
%sh ls /dbfs/mnt/taxi_etl/weather_data/ | wc -l

In [None]:
# View files in SpringBoard Container
dbutils.fs.mkdirs("/mnt/taxi_etl/weather_data/combined_df")

In [None]:
def combine_weather_dfs():
    path = '/mnt/taxi_etl/weather_data/*.parquet'
    df = spark.read.option('inferSchema','true').parquet(path)
    df = df.drop('__index_level_0__')
    df = df.withColumnRenamed("Temperature", "temp(f)")\
       .withColumnRenamed("Dew_Point", "dew_point(f)")\
       .withColumnRenamed("Humidity", "humidity(%)")\
       .withColumnRenamed("Wind", "wind_direction")\
       .withColumnRenamed("Wind_Speed", "wind_speed(mph)")\
       .withColumnRenamed("Wind_Gust", "wind_gust(mph)")\
       .withColumnRenamed("Pressure", "pressure(inHg)")\
       .withColumnRenamed("Precipitation", "precipitation(in)")\
       .withColumnRenamed("Condition", "condition")
    df.printSchema()
    df.show()
combine_weather_dfs()
# Total rows = 133652 

root
 |-- temp(f): long (nullable = true)
 |-- dew_point(f): long (nullable = true)
 |-- humidity(%): long (nullable = true)
 |-- wind_direction: string (nullable = true)
 |-- wind_speed(mph): long (nullable = true)
 |-- wind_gust(mph): long (nullable = true)
 |-- pressure(inHg): double (nullable = true)
 |-- precipitation(in): double (nullable = true)
 |-- condition: string (nullable = true)
 |-- datetime: timestamp_ntz (nullable = true)

+-------+------------+-----------+--------------+---------------+--------------+--------------+-----------------+------------------+-------------------+
|temp(f)|dew_point(f)|humidity(%)|wind_direction|wind_speed(mph)|wind_gust(mph)|pressure(inHg)|precipitation(in)|         condition|           datetime|
+-------+------------+-----------+--------------+---------------+--------------+--------------+-----------------+------------------+-------------------+
|     46|          43|         89|            NE|             18|             0|         29.63|  