# New York Taxi ETL

##### Import Spark

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, DoubleType, IntegerType
from pyspark.sql.functions import lit
from pyspark.sql import functions as F

### Construct Spark Session

In [0]:
#Start SparkSession with azure hadoop package 
spark = SparkSession.builder.master('local').appName('app').config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1').getOrCreate()        
spark.conf.set("fs.azure.account.key.springboardstorage.blob.core.windows.net","aytLE9zNkSMkYuioLbflu5bhemJ6vMZ10hCKEMgSPURwFfZqBJpNOZjEJUsxirAhSOLZReYsuB2u+AStlBBQWw==")

In [0]:
# Create mount mount to connect to azure blob
# ...Use this once or enter into try / expect block
try:
    dbutils.fs.mount(source = "wasbs://springboardcontainer@springboardstorage.blob.core.windows.net",
    mount_point = "/mnt/taxi_etl",
    extra_configs = {"fs.azure.account.key.springboardstorage.blob.core.windows.net": "aytLE9zNkSMkYuioLbflu5bhemJ6vMZ10hCKEMgSPURwFfZqBJpNOZjEJUsxirAhSOLZReYsuB2u+AStlBBQWw=="})
# How to pass in java.lang.IllegalArgumentException?
except Exception as e:
    print('Already mounted')

An error occurred while calling o445.mount.
: java.rmi.RemoteException: java.lang.IllegalArgumentException: requirement failed: Directory already mounted: /mnt/taxi_etl; nested exception is: 
	java.lang.IllegalArgumentException: requirement failed: Directory already mounted: /mnt/taxi_etl
	at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:135)
	at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:69)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.createOrUpdateMount(DBUtilsCore.scala:1010)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.$anonfun$mount$1(DBUtilsCore.scala:1036)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:559)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:654)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:675)
	at com.databricks.logging.UsageLogging.$ano

In [0]:
# View files in SpringBoard Container
dbutils.fs.ls("/mnt/taxi_etl")

#### Grab Taxi Data

##### Find Dates

In [0]:
# Example URLs
# https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-01.parquet
# https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet
# https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2022-01.parquet
# https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet

import pandas as pd
import requests

def get_dates():
    yellow_dates = pd.date_range('2011-01-01','2022-12-01',freq='MS').strftime("%Y-%m").to_list()
    # green_dates = pd.date_range('2014-01-01','2022-12-01',freq='MS').strftime("%Y-%m").to_list()
    # fh_dates = pd.date_range('2015-01-01','2022-12-01',freq='MS').strftime("%Y-%m").to_list()
    # hv_dates = pd.date_range('2019-02-01','2022-12-01',freq='MS').strftime("%Y-%m").to_list()
    return yellow_dates
    
yellow_dates = get_dates()[0]
# green_dates = get_dates()[1]
# fh_dates = get_dates()[2]
# hv_dates = get_dates()[3]



##### Functions To Download and Store Data

In [0]:
class ScrapeNyTaxi:
    '''
    Functions to loop thru select dates from get_dates() and download each parquet file in that range
    Yellow taxi data: 2011-2022
    Green taxi data: 2013-2022
    For-hire vehicle data: 2015-2022
    High-volume for-hire vehicle data: 2019-2022
    '''
    def grab_yellow():
        '''
        Download yellow cab parquet files 
        '''
        for date in yellow_dates:

            try:
                url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{date}.parquet'
                response = requests.get(url, allow_redirects=True)
                open(f'/dbfs/mnt/taxi_etl/trip_data/yellow_{date}.parquet','wb').write(response.content)

                
            except Exception as e:
                print(f'Exception: {e}')
                continue
    
    '''
    def grab_green():

        # Download yellow cab parquet files 

        for date in green_dates:
            try:
                url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_{date}.parquet'
                response = requests.get(url, allow_redirects=True)
                open(f'/dbfs/mnt/taxi_etl/trip_data/green_{date}.parquet','wb').write(response.content)

            except Exception as e:
                print(f'Exception: {e}')
                continue
            

    def grab_fh():

        # Download for hire vehicle parquet files 

        for date in fh_dates:
            try:
                url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_{date}.parquet'
                response = requests.get(url, allow_redirects=True)
                open(f'/dbfs/mnt/taxi_etl/trip_data/for_hire_{date}.parquet','wb').write(response.content)

            except Exception as e:
                print(f'Exception: {e}')
                continue     

    
    def grab_hv():
        # Download for hire vehicle parquet files 
        for date in hv_dates:
            try:
                url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_{date}.parquet'
                response = requests.get(url, allow_redirects=True)
                open(f'/dbfs/mnt/taxi_etl/trip_data/high_volume_{date}.parquet','wb').write(response.content)

            except Exception as e:
                print(f'Exception: {e}')
                continue     
    '''

In [0]:
ScrapeNyTaxi.grab_yellow()

In [0]:
# ScrapeNyTaxi.grab_green()

In [0]:
# ScrapeNyTaxi.grab_fh()

In [0]:
# ScrapeNyTaxi.grab_hv()

##### Remove Empty files

In [0]:
import os
path = '/dbfs/mnt/taxi_etl/trip_data/'
onlyfiles = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
# Total number of files
print(f'Total: {len(onlyfiles)} files')

Total: 391 files


In [0]:
# Several files are empty, as there was no data to pull from the web
for file in onlyfiles:
    if os.path.getsize(path+file) < 250:
      print(f'{file} has no data')
      os.remove(path+file)

In [0]:
onlyfiles = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
# Total number of files
print(f'Total: {len(onlyfiles)} files')

Total: 391 files


##### Print Size of Files

In [0]:
yellow_bytes = 0
# green_bytes = 0
# fh_bytes = 0
# hv_bytes = 0
for file in onlyfiles:
    if file.startswith('yellow'):
        yellow_bytes += os.path.getsize(path+file)
'''
    if file.startswith('green'):
        green_bytes += os.path.getsize(path+file)
    if file.startswith('for'):
        fh_bytes += os.path.getsize(path+file)      
    if file.startswith('high'):
        hv_bytes += os.path.getsize(path+file)              
'''
print(f'yellow taxi data totals {yellow_bytes/1000000000} gigs')

# print(f'green taxi data totals {green_bytes/1000000000} gigs')
# print(f'for-hire taxi data totals {fh_bytes/1000000000} gigs')
# print(f'high-volume taxi data totals {hv_bytes/1000000000} gigs')

print(f'total data: {(green_bytes+yellow_bytes+fh_bytes+hv_bytes)/1000000000} gigs')

#### Explore Taxi Data

##### Schemas

In [0]:
directory = '/dbfs/mnt/taxi_etl/trip_data/'

# for_hire = []
# green_taxi = []
# high_volume = []
yellow_taxi = []

for file in os.listdir(directory):
    if file.startswith('yellow'):
        yellow_taxi.append(file)

    '''
    if file.startswith('for'):
        for_hire.append(file)
    if file.startswith('green'):
        green_taxi.append(file)
    if file.startswith('high'):
        high_volume.append(file)
    '''

all_data = [yellow_taxi]
# all_data = [for_hire,green_taxi,high_volume,yellow_taxi]

In [0]:
def get_schema(data):

    file_list = []
    schema_list = []

    for files in data:
        df = spark.read.option('inferSchema','true').format('parquet').load(directory[5:]+files)
        file_list.append(files)
        schema_list.append(str(df.dtypes))

    list_zip = zip(file_list,schema_list)
    zipped_list = list(list_zip)

    df_schema = StructType([ \
        StructField("File",StringType(),True), \
        StructField("Schema",StringType(),True),
    ]) 

    df = spark.createDataFrame(zipped_list,schema= df_schema)
    df = df.groupBy("Schema").agg(F.collect_list('File'))
    
    data_str = data[0]
    name = data_str.split(' ')[0]
    
    df.write.json(f'/dbfs/mnt/taxi_etl/trip_data/{name}_schema.json')
    df.show()

In [0]:
# Run schema finder for groups (yellow,green,for-hire,high-volume) of data
for data in all_data:
    get_schema(data)

# Our result shows us that there are several schemas for each of the datasets which must be normalized

#### Normalize Schemas

##### Define Schemas

In [0]:
# Define Schemas for each type of data 

yellow_schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('pickup_datetime', TimestampType(), True),
    StructField('dropoff_datetime', TimestampType(), True),
    StructField('passenger_count', StringType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', LongType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('airport_fee', IntegerType(), True),
    StructField('taxi_type', StringType(), True),
    ])

'''
green_schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('lpep_pickup_datetime', TimestampType(), True),
    StructField('lpep_dropoff_datetime', TimestampType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('ehail_fee', LongType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('payment_type', LongType(), True),
    StructField('trip_type', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('taxi_type', StringType(), True),
    ])

fhv_schema = StructType([
    StructField('dispatching_base_num', StringType(), True),
    StructField('pickup_datetime', TimestampType(), True),
    StructField('dropOff_datetime', TimestampType(), True),
    StructField('PULocationID', DoubleType(), True),
    StructField('DOLocationID', DoubleType(), True),
    StructField('SR_Flag', StringType(), True),
    StructField('Affiliated_base_number', StringType(), True),
    StructField('taxi_type', StringType(), True),
    ])

hv_schema = StructType([
    StructField('hvfhs_license_num', StringType(), True),
    StructField('dispatching_base_num', StringType(), True),
    StructField('originating_base_num', StringType(), True),
    StructField('request_datetime', TimestampType(), True),
    StructField('on_scene_datetime', TimestampType(), True),
    StructField('pickup_datetime', TimestampType(), True),
    StructField('dropoff_datetime', TimestampType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('trip_miles', DoubleType(), True),
    StructField('trip_time', LongType(), True),
    StructField('base_passenger_fare', DoubleType(), True),
    StructField('tolls', DoubleType(), True),
    StructField('bcf', DoubleType(), True),
    StructField('sales_tax', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('airport_fee', DoubleType(), True),
    StructField('tips', DoubleType(), True),
    StructField('driver_pay', DoubleType(), True),
    StructField('shared_request_flag', StringType(), True),
    StructField('shared_match_flag', StringType(), True),
    StructField('access_a_ride_flag', StringType(), True),
    StructField('wav_request_flag', StringType(), True),
    StructField('wav_match_flag', StringType(), True),
    StructField('taxi_type', StringType(), True),
    ])
'''

"\ngreen_schema = StructType([\n    StructField('VendorID', LongType(), True),\n    StructField('lpep_pickup_datetime', TimestampType(), True),\n    StructField('lpep_dropoff_datetime', TimestampType(), True),\n    StructField('store_and_fwd_flag', StringType(), True),\n    StructField('trip_distance', DoubleType(), True),\n    StructField('fare_amount', DoubleType(), True),\n    StructField('extra', DoubleType(), True),\n    StructField('mta_tax', DoubleType(), True),\n    StructField('tip_amount', DoubleType(), True),\n    StructField('tolls_amount', DoubleType(), True),\n    StructField('ehail_fee', LongType(), True),\n    StructField('improvement_surcharge', DoubleType(), True),\n    StructField('total_amount', DoubleType(), True),\n    StructField('payment_type', LongType(), True),\n    StructField('trip_type', DoubleType(), True),\n    StructField('congestion_surcharge', DoubleType(), True),\n    StructField('taxi_type', StringType(), True),\n    ])\n\nfhv_schema = StructType([\n

##### Cast Each Group's Schema and Union to Itself

In [0]:
'''
def make_yellow():

    emptyRDD = spark.sparkContext.emptyRDD()
    yellow_df = spark.createDataFrame(emptyRDD,schema=yellow_schema)

    yellow_list = []

    for file in os.listdir(directory):
        if file.startswith('yellow'):
            yellow_list.append(file)    

    for file in yellow_list:
        df_yellow = spark.read.option('inferSchema','true').parquet(f'{directory[5:]}{file}')
        df_yellow = df_yellow.withColumn('taxi_type',lit('yellow'))
        df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime','pickup_datetime')\
            .withColumnRenamed('tpep_dropoff_datetime','dropoff_datetime')

        df_yellow.createOrReplaceTempView('Cast')

        df_yellow = spark.sql("SELECT BIGINT(VendorID),TIMESTAMP(pickup_datetime),\
            TIMESTAMP(dropoff_datetime),DOUBLE(passenger_count),DOUBLE(trip_distance),\
            BIGINT(RatecodeID),STRING(store_and_fwd_flag),BIGINT(PULocationID),BIGINT(DOLocationID),\
            BIGINT(payment_type),DOUBLE(fare_amount),DOUBLE(extra),DOUBLE(mta_tax),DOUBLE(tip_amount),\
            DOUBLE(tolls_amount),DOUBLE(improvement_surcharge),DOUBLE(total_amount),DOUBLE(congestion_surcharge),\
            DOUBLE(airport_fee),STRING(taxi_type) from Cast")

        yellow_df = df_yellow.union(yellow_df)
        print(f'{file} analyzed')

    yellow_df.printSchema()

    return yellow_df
'''

In [0]:
'''
def make_green():

    emptyRDD = spark.sparkContext.emptyRDD()
    green_df = spark.createDataFrame(emptyRDD,schema=green_schema)

    green_list = []

    for file in os.listdir(directory):
        if file.startswith('green'):
            green_list.append(file)    

    for file in green_list:    

        df_green = spark.read.option('inferSchema','true').parquet(f'{directory[5:]}{file}')
        df_green = df_green.withColumnRenamed('lpep_pickup_datetime','pickup_datetime')\
            .withColumnRenamed('lpep_dropoff_datetime','dropoff_datetime')
        df_green = df_green.withColumn('taxi_type',lit('green'))

        df_green.createOrReplaceTempView('Cast')

        df_green = spark.sql("SELECT BIGINT(VendorID),TIMESTAMP(pickup_datetime),\
            TIMESTAMP(dropoff_datetime),STRING(store_and_fwd_flag),DOUBLE(trip_distance),\
            DOUBLE(fare_amount),DOUBLE(extra),DOUBLE(mta_tax),DOUBLE(tip_amount),\
            DOUBLE(tolls_amount),BIGINT(ehail_fee),DOUBLE(improvement_surcharge),DOUBLE(total_amount),\
            BIGINT(payment_type),DOUBLE(trip_type),DOUBLE(congestion_surcharge),STRING(taxi_type) from Cast")

        green_df = df_green.union(green_df)
        print(f'{file} analyzed')        

    green_df.printSchema()

    return green_df
'''

In [0]:
'''
def make_fhv():

    emptyRDD = spark.sparkContext.emptyRDD()
    fhv_df = spark.createDataFrame(emptyRDD,schema=fhv_schema)

    fhv_list = []    

    for file in os.listdir(directory):
        if file.startswith('for'):
            fhv_list.append(file)    

    for file in fhv_list:    

        df_fhv = spark.read.option('inferSchema','true').parquet(f'{directory[5:]}{file}')
        df_fhv = df_fhv.withColumn('taxi_type',lit('for_hire'))
        df_fhv = df_fhv.withColumnRenamed('dropOff_datetime','dropoff_datetime')

        df_fhv.createOrReplaceTempView('Cast')

        df_fhv = spark.sql("SELECT STRING(dispatching_base_num),TIMESTAMP(pickup_datetime),\
            TIMESTAMP(dropoff_datetime),DOUBLE(PULocationID),DOUBLE(DOLocationID),STRING(SR_Flag),\
            STRING(Affiliated_base_number),STRING(taxi_type) from Cast")

        fhv_df = df_fhv.union(fhv_df)
        print(f'{file} analyzed')        

    fhv_df.printSchema()

    return fhv_df
'''

In [0]:
'''
def make_hv():
  
    # emptyRDD = spark.sparkContext.emptyRDD()
    # hv_df = spark.createDataFrame(emptyRDD,schema=hv_schema)
    hv_df = spark.read.schema(hv_schema).parquet('/mnt/taxi_etl/trip_data/high*.parquet')
    hv_df.printSchema()
    hv_df.write.mode('overwrite').parquet('/dbfs/mnt/taxi_etl/trip_data/all_hv')
    
    
    ### Try to read into one DF, using wildcard. If Schema fields are completely different, then you can split into separate DFs. Many unions will reshuffle and cause slowdowns. 
    

    hv_list = []    
    for file in os.listdir(directory):
        if file.startswith('high'):
            hv_list.append(file) 

    for file in hv_list:    

        df_hv = spark.read.option('inferSchema','true').parquet(f'{directory[5:]}{file}')
        df_hv = df_hv.withColumn('taxi_type',lit('high_volume'))
        df_hv = df_hv.withColumnRenamed('shared_request_flag','SR_Flag')

        df_hv.createOrReplaceTempView('Cast')

        df_hv = spark.sql("SELECT STRING(hvfhs_license_num),STRING(dispatching_base_num),STRING(originating_base_num),TIMESTAMP(request_datetime),\
            TIMESTAMP(on_scene_datetime),TIMESTAMP(pickup_datetime),TIMESTAMP(dropoff_datetime),\
            DOUBLE(PULocationID),DOUBLE(DOLocationID),DOUBLE(trip_miles),DOUBLE(trip_time),\
            DOUBLE(base_passenger_fare),DOUBLE(tolls),DOUBLE(bcf),DOUBLE(sales_tax),DOUBLE(congestion_surcharge),\
            DOUBLE(airport_fee),DOUBLE(tips),DOUBLE(driver_pay),STRING(SR_Flag),STRING(shared_match_flag),\
            STRING(access_a_ride_flag),STRING(wav_request_flag),STRING(wav_match_flag),STRING(taxi_type) from Cast")

        hv_df = df_hv.union(hv_df)
        print(f'{file} analyzed')        

    hv_df.printSchema()
    


    return hv_df
'''


root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-317088730497197>:45[0m
[1;32m     15[0m     [38;5;124;03m'''[39;00m
[1;32m     16[0m [38;5;124;03m    hv_list = []    [39;00m
[1;32m     17[0m [38;5;124;03m    for file in os.listdir(directory):[39;00m
[0;32m   (...)[0m
[1;32m     40[0m [38;5;124;03m    [39;00m
[1;32m     41[0m [38;5;124;03m    '''[39;00m
[1;32m     43[0m     [38;5;28;01mreturn[39;00m hv_df
[0;32m---> 45[0m make_hv()

File [0;32m<command-317088730497197>:10[0m, in [0;36mmake_hv[0;34m()[0m
[1;32m      5[0m hv_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mschema(hv_schema)[38;5;241m.[39mparquet([38;5;124m'[39m[38;5;124m/mnt/taxi_etl/trip_data/high*.parquet[39m[38;5;124m'[39m)
[1;32m      7[0m hv_df[38;5;241m.[39mprintSchema()
[0;32m---> 10[0m [43mhv_df[49m[38;

In [0]:
### TOO SLOW, TIMES OUT ### 

# Storing too much in memory? If I write immedaitely after creating the DF, will this solve the issue? 

def write_dfs():

    yellow_df = spark.read.schema(yellow_schema).format('parquet').load(directory[5:]+'yellow*.parquet')

    # yellow_df = spark.read.option('inferSchema','true').format('parquet').load(directory[5:]+'yellow*.parquet')

    yellow_df.show()

    # yellow_df = make_yellow()
    # green_df = make_green()
    # fh_df = make_fhv()
    # hv_df = make_hv()

    # ERROR ON WRITE (DON'T NEED TO WRITE TO SINGLE DF?)
    '''
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 84 in stage 12.0 failed 4 times, most recent failure: Lost task 84.3 in stage 12.0 (TID 590) (10.139.64.4 executor 0): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/mnt/taxi_etl/trip_data/yellow_2018-10.parquet. Parquet column cannot be converted. Column: [passenger_count], Expected: LongType, Found: DOUBLE
    '''
    # yellow_df.write.mode('overwrite').parquet('/dbfs/mnt/taxi_etl/trip_data/all_yellow')


    # hv_df.write.parquet('/dbfs/mnt/taxi_etl/trip_data/all_hv')
    # fh_df.write.parquet('/dbfs/mnt/taxi_etl/trip_data/combined_dfs/all_fh')
    # green_df.write.parquet('/dbfs/mnt/taxi_etl/trip_data/all_green')
    
write_dfs()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2011-01-01 00:10:00|  2011-01-01 00:12:00|              4|          0.0|         1|              null|         145|         145|           1|        2.9|  0.5|    0.5|      0.2

#### Grab New York Weather Data

##### Import Packages

In [0]:
%pip install selenium

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting selenium
  Downloading selenium-4.9.1-py3-none-any.whl (6.6 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6.6/6.6 MB 36.6 MB/s eta 0:00:00
Collecting trio-websocket~=0.9
  Downloading trio_websocket-0.10.2-py3-none-any.whl (17 kB)
Collecting trio~=0.17
  Downloading trio-0.22.0-py3-none-any.whl (384 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 384.9/384.9 kB 25.8 MB/s eta 0:00:00
Collecting sortedcontainers
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB)
Collecting async-generator>=1.9
  Downloading async_generator-1.10-py3-none-any.whl (18 kB)
Collecting exceptiongroup>=1.0.0rc9
  Downloading exceptiongroup-1.1.1-py3-none-any.whl (14 kB)
Collecting outcome
  Downloading outcome-1.2.0-py2.py3-none-any.whl (9.7 kB)
Collecting sniffio
  Downloading sniffio-1.3.0-py3-none-any.whl (10 kB)
Collecting wsproto>=0.14
  Downloading wsproto-1.2.0-py

In [0]:
import datetime
from bs4 import BeautifulSoup as BS
from selenium import webdriver
import pandas as pd
import time

##### Install Chrome and ChromeDriver

In [0]:
dbutils.fs.mkdirs("dbfs:/databricks/scripts/")
dbutils.fs.put("/databricks/scripts/selenium-install.sh","""
#!/bin/bash
%sh
LAST_VERSION="https://www.googleapis.com/download/storage/v1/b/chromium-browser-snapshots/o/Linux_x64%2FLAST_CHANGE?alt=media"
VERSION=$(curl -s -S $LAST_VERSION)
if [ -d $VERSION ] ; then
  echo "version already installed"
  exit
fi
 
rm -rf /tmp/chrome/$VERSION
mkdir -p /tmp/chrome/$VERSION
 
URL="https://www.googleapis.com/download/storage/v1/b/chromium-browser-snapshots/o/Linux_x64%2F$VERSION%2Fchrome-linux.zip?alt=media"
ZIP="${VERSION}-chrome-linux.zip"
 
curl -# $URL > /tmp/chrome/$ZIP
unzip /tmp/chrome/$ZIP -d /tmp/chrome/$VERSION
 
URL="https://www.googleapis.com/download/storage/v1/b/chromium-browser-snapshots/o/Linux_x64%2F$VERSION%2Fchromedriver_linux64.zip?alt=media"
ZIP="${VERSION}-chromedriver_linux64.zip"
 
curl -# $URL > /tmp/chrome/$ZIP
unzip /tmp/chrome/$ZIP -d /tmp/chrome/$VERSION
 
mkdir -p /tmp/chrome/chrome-user-data-dir
 
rm -f /tmp/chrome/latest
ln -s /tmp/chrome/$VERSION /tmp/chrome/latest
 
# to avoid errors about missing libraries
sudo apt-get update
sudo apt-get install -y libgbm-dev
""", True)
display(dbutils.fs.ls("dbfs:/databricks/scripts/"))

Wrote 1045 bytes.


path,name,size,modificationTime
dbfs:/databricks/scripts/selenium-install.sh,selenium-install.sh,1045,1683585238000


In [0]:
%sh
/dbfs/databricks/scripts/selenium-install.sh

/dbfs/databricks/scripts/selenium-install.sh: line 3: fg: no job control
#=#=#                                                                                                                                                    0.1%###                                                                        4.9%########                                                                  11.1%#############                                                             18.1%##################                                                        25.2%#######################                                                   32.4%############################                                              39.8%#################################                                         47.1%######################################                                    54.1%###########################################                               60.8%###############################################

Archive:  /tmp/chrome/1141045-chrome-linux.zip
  inflating: /tmp/chrome/1141045/chrome-linux/MEIPreload/manifest.json  
  inflating: /tmp/chrome/1141045/chrome-linux/MEIPreload/preloaded_data.pb  
  inflating: /tmp/chrome/1141045/chrome-linux/chrome  
  inflating: /tmp/chrome/1141045/chrome-linux/chrome-wrapper  
  inflating: /tmp/chrome/1141045/chrome-linux/chrome_100_percent.pak  
  inflating: /tmp/chrome/1141045/chrome-linux/chrome_200_percent.pak  
  inflating: /tmp/chrome/1141045/chrome-linux/chrome_crashpad_handler  
  inflating: /tmp/chrome/1141045/chrome-linux/chrome_sandbox  
  inflating: /tmp/chrome/1141045/chrome-linux/icudtl.dat  
  inflating: /tmp/chrome/1141045/chrome-linux/libEGL.so  
  inflating: /tmp/chrome/1141045/chrome-linux/libGLESv2.so  
  inflating: /tmp/chrome/1141045/chrome-linux/libvk_swiftshader.so  
  inflating: /tmp/chrome/1141045/chrome-linux/libvulkan.so.1  
  inflating: /tmp/chrome/1141045/chrome-linux/nacl_helper  
  inflating: /tmp/chrome/1141045/chrom

#=#=#                                                                                                                                                    0.2%######################################                                    53.6%######################################################################## 100.0%


Archive:  /tmp/chrome/1141045-chromedriver_linux64.zip
  inflating: /tmp/chrome/1141045/chromedriver_linux64/LICENSE.chromedriver  
  inflating: /tmp/chrome/1141045/chromedriver_linux64/chromedriver  
Hit:1 https://repos.azul.com/zulu/deb stable InRelease
Hit:2 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Reading package lists...


W: https://repos.azul.com/zulu/deb/dists/stable/InRelease: Key is stored in legacy trusted.gpg keyring (/etc/apt/trusted.gpg), see the DEPRECATION section in apt-key(8) for details.


Reading package lists...
Building dependency tree...
Reading state information...
libgbm-dev is already the newest version (22.2.5-0ubuntu0.1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 33 not upgraded.


In [0]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
s = Service('/tmp/chrome/latest/chromedriver_linux64/chromedriver')
options = webdriver.ChromeOptions()
options.binary_location = "/tmp/chrome/latest/chrome-linux/chrome"
options.add_argument('headless')
options.add_argument('--disable-infobars')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--no-sandbox')
options.add_argument('--remote-debugging-port=9222')
options.add_argument('--homedir=/tmp/chrome/chrome-user-data-dir')
options.add_argument('--user-data-dir=/tmp/chrome/chrome-user-data-dir')
prefs = {"download.default_directory":"/tmp/chrome/chrome-user-data-di",
         "download.prompt_for_download":False
}
options.add_experimental_option("prefs",prefs)
# driver = webdriver.Chrome(service=s, options=options)

##### Load Wunderground Data

In [0]:
# Find range of dates matching with Taxi Trip data
# Earliest = 01/01/2010
# Latest = 01/01/2023
def get_dates():
    d1 = datetime.date(2010,1,1)
    d2 = datetime.date(2023,1,1)
    dd = [d1 + datetime.timedelta(days=x) for x in range((d2-d1).days + 1)]
    date_list = []
    for d in dd:
        date_list.append(str(d))
    return date_list


In [0]:
# function to load wunderground data (without this it has no records to show)
def render_page(url):
    driver = webdriver.Chrome(service=s, options=options)    
    driver.get(url)
    time.sleep(3)
    r = driver.page_source
    driver.quit()
    return r

In [0]:
def list_transpose(data_list):
    res_list = [[item.replace('%', '') for item in lst] for lst in data_list]
    res_list = [[item.replace(u'\xa0', u'') for item in lst] for lst in res_list]
    res_list = [[item.replace('°F','') for item in lst] for lst in res_list]
    res_list = [[item.replace('°in','') for item in lst] for lst in res_list]
    res_list = [[item.replace('°%','') for item in lst] for lst in res_list]
    res_list = [[item.replace('°mph','') for item in lst] for lst in res_list]
    final_list = [[item.replace('°','') for item in lst] for lst in res_list]
    return final_list

In [0]:
def set_schema(df):
    # To Interger
    df[["Temperature","Dew_Point", "Humidity","Wind_Speed","Wind_Gust"]] = df[["Temperature","Dew_Point", "Humidity","Wind_Speed","Wind_Gust"]].apply(pd.to_numeric)
    df[['Pressure','Precipitation']] = df[['Pressure','Precipitation']].apply(pd.to_numeric)
    # To DateTime
    df['datetime'] = df['datetime'].apply(pd.to_datetime)
    # To String
    df[['Wind','Condition']] = df[['Wind','Condition']].applymap(str)
    return df

In [0]:
def scraper(page, dates):
    # function to scrape wunderground
    for d in dates:

        url = str(str(page) + str(d))

        r = render_page(url)

        soup = BS(r, "html.parser")
        container = soup.find('lib-city-history-observation')
        check = container.find('tbody')

        data = []
        try:
            for c in check.find_all('tr', class_='ng-star-inserted'):
                for i in c.find_all('td', class_='ng-star-inserted'):
                    trial = i.text
                    trial = trial.strip('  ')
                    data.append(trial)
            
            df_daily = []
            cols = ['Time','Temperature','Dew_Point','Humidity','Wind','Wind_Speed','Wind_Gust','Pressure','Precipitation','Condition','Date']
            for i in range(0,len(data),10):
                snip_data = []
                snip_data.append(data[i:i+10])
                # Strip of Weird Characters
                snip_data = list_transpose(snip_data)
                snip_data[0].append(d)
                df = pd.DataFrame(snip_data,columns=cols)
                df['datetime'] = df['Date'] + ' ' + df['Time']
                df = df.drop(['Date','Time'],axis=1) 
                # Set Schema
                df = set_schema(df)
                df_daily.append(df)

            df_daily = pd.concat(df_daily)
            path = 'C:/Users/Robert.Jones/OneDrive - Central Coast Energy Services, Inc/Desktop/Springboard/Capstone/wunderground/parquet_files'
                        
            df_daily.to_parquet(f'/dbfs/mnt/taxi_etl/weather_data/NY_Weather{d}.parquet')
            
        except AttributeError:
            continue

In [0]:
# Call Functions
dates = get_dates()
page = 'https://www.wunderground.com/history/daily/us/ny/new-york-city/KLGA/date/'

df = scraper(page, dates)
# Wrote 4745 files
# Size of 42M

In [0]:
%sh du -h /dbfs/mnt/taxi_etl/weather_data/

In [0]:
%sh ls /dbfs/mnt/taxi_etl/weather_data/ | wc -l

In [0]:
# View files in SpringBoard Container
dbutils.fs.mkdirs("/mnt/taxi_etl/weather_data/combined_df")

In [0]:
def combine_weather_dfs():
    path = '/mnt/taxi_etl/weather_data/*.parquet'
    df = spark.read.option('inferSchema','true').parquet(path)
    df = df.drop('__index_level_0__')
    df = df.withColumnRenamed("Temperature", "temp(f)")\
       .withColumnRenamed("Dew_Point", "dew_point(f)")\
       .withColumnRenamed("Humidity", "humidity(%)")\
       .withColumnRenamed("Wind", "wind_direction")\
       .withColumnRenamed("Wind_Speed", "wind_speed(mph)")\
       .withColumnRenamed("Wind_Gust", "wind_gust(mph)")\
       .withColumnRenamed("Pressure", "pressure(inHg)")\
       .withColumnRenamed("Precipitation", "precipitation(in)")\
       .withColumnRenamed("Condition", "condition")
    df.printSchema()
    df.show()
combine_weather_dfs()
# Total rows = 133652 

root
 |-- temp(f): long (nullable = true)
 |-- dew_point(f): long (nullable = true)
 |-- humidity(%): long (nullable = true)
 |-- wind_direction: string (nullable = true)
 |-- wind_speed(mph): long (nullable = true)
 |-- wind_gust(mph): long (nullable = true)
 |-- pressure(inHg): double (nullable = true)
 |-- precipitation(in): double (nullable = true)
 |-- condition: string (nullable = true)
 |-- datetime: timestamp_ntz (nullable = true)

+-------+------------+-----------+--------------+---------------+--------------+--------------+-----------------+------------------+-------------------+
|temp(f)|dew_point(f)|humidity(%)|wind_direction|wind_speed(mph)|wind_gust(mph)|pressure(inHg)|precipitation(in)|         condition|           datetime|
+-------+------------+-----------+--------------+---------------+--------------+--------------+-----------------+------------------+-------------------+
|     46|          43|         89|            NE|             18|             0|         29.63|  