## Big Data Platforms Group project - GCP Read-Write data 


### This notebook reads in all the data needed for the model, transform it and creates the 
### file called Taxi_Uber_lyft_w_e_c_sdf that includes all the informacion

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
sc = spark.sparkContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
import pandas as pd

spark = SparkSession.builder.appName('BDP-GroupProject').getOrCreate()

def read_data(path):
    table = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .option("multiline", True)\
    .csv(path,inferSchema=True, header=True )
    return table

### Taxi and Rideshare Data
- Variables unique to taxi data: 'taxi_id','tolls','extras','payment_type','company'
- Variables unique to rideshare data: 'additional_charges','trips_pooled','shared_trip_authorized'

In [2]:
#----------
# Load rideshare parquet file
#----------
Uber_lyft_sdf = spark.read.parquet('gs://big-data-final/clean-parquet/rideshare.parquet')

# correct column taxonomy structure
for col in Uber_lyft_sdf.columns:    
    Uber_lyft_sdf = Uber_lyft_sdf.withColumnRenamed(col, col.lower().replace(' ','_'))
    
# create additional columns:
Uber_lyft_sdf = Uber_lyft_sdf.withColumn('ride_type',F.lit('rideshare'))
Uber_lyft_sdf = Uber_lyft_sdf.withColumn('payment_type',F.lit('Mobile'))
Uber_lyft_sdf = Uber_lyft_sdf.withColumnRenamed('tip','tips')
#Uber_lyft_sdf = Uber_lyft_sdf.drop('trips_pooled','shared_trip_authorized')

print(Uber_lyft_sdf.columns)

                                                                                

['trip_id', 'trip_start_timestamp', 'trip_end_timestamp', 'trip_seconds', 'trip_miles', 'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area', 'dropoff_community_area', 'fare', 'tips', 'additional_charges', 'trip_total', 'shared_trip_authorized', 'trips_pooled', 'pickup_centroid_latitude', 'pickup_centroid_longitude', 'pickup_centroid_location', 'dropoff_centroid_latitude', 'dropoff_centroid_longitude', 'dropoff_centroid_location', 'ride_type', 'payment_type']



Taxi_sdf = {}
for years in [2019,2020,2021]:
    Taxi_sdf[years] = read_data(f"/user/josefinabollini/GroupProject/Taxi_Trips_-_{years}.csv")
  
Taxi_sdf = Taxi_sdf[2019].union(Taxi_sdf[2020]).union(Taxi_sdf[2021])


In [3]:
#----------
# Load taxi parquet file
#----------
Taxi_sdf = spark.read.parquet('gs://big-data-final/clean-parquet/taxi.parquet')
#print(Taxi_sdf.columns)

#Taxi_sdf = Taxi_sdf.drop('Company','Taxi_ID')

# 'additional_charges' = 'Tolls' + 'Extras'
from pyspark.sql.functions import col
Taxi_sdf = Taxi_sdf.withColumn('additional_charges',(col('Tolls')+col('Extras')))

#Taxi_sdf = Taxi_sdf.withColumnRenamed('dropoff_centroid__location','dropoff_centroid_location')

# drop variables:
#Taxi_sdf = Taxi_sdf.drop('Company','Taxi_ID', 'Trip_ID',
#                         'Tolls','Extras',
#                         'Pickup_Census_Tract','Dropoff_Census_Tract',
#                         'Pickup_Centroid_Latitude','Pickup_Centroid_Longitude','Pickup_Centroid_Location',
#                         'Dropoff_Centroid_Latitude','Dropoff_Centroid_Longitude','Dropoff_Centroid__Location')

Taxi_sdf = Taxi_sdf.drop('Company','Taxi_ID', 'Trip_ID',
                         'Tolls','Extras',
                         'Pickup_Census_Tract','Dropoff_Census_Tract',
                         'Pickup_Centroid_Latitude','Pickup_Centroid_Longitude','Pickup_Centroid_Location',
                         'Dropoff_Centroid_Latitude','Dropoff_Centroid_Longitude','Dropoff_Centroid__Location')

# correct column taxonomy structure
for col in Taxi_sdf.columns:
    Taxi_sdf = Taxi_sdf.withColumnRenamed(col, col.lower().replace(' ','_'))

# create 'trips_pooled' and 'shared_trip_authorized' in Taxi_sdf
Taxi_sdf = Taxi_sdf.withColumn('ride_type',F.lit('taxi'))
Taxi_sdf = Taxi_sdf.withColumn('trips_pooled',F.lit(1))
Taxi_sdf = Taxi_sdf.withColumn('shared_trip_authorized',F.lit('False'))
Taxi_sdf = Taxi_sdf.withColumn('shared_trip_authorized',F.col('shared_trip_authorized').cast(BooleanType()))

print(Taxi_sdf.columns)   

['trip_start_timestamp', 'trip_end_timestamp', 'trip_seconds', 'trip_miles', 'pickup_community_area', 'dropoff_community_area', 'fare', 'tips', 'trip_total', 'payment_type', 'additional_charges', 'ride_type', 'trips_pooled', 'shared_trip_authorized']


In [4]:
#----------
# join taxi and rideshare data
#----------
col_list = list(Taxi_sdf.columns)
Uber_lyft_sdf = Uber_lyft_sdf.select(col_list) #select rideshare columns present in taxi df
Taxi_Uber_lyft_sdf = Uber_lyft_sdf.unionAll(Taxi_sdf)

#-----
# format timestamp
#-----
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_start_timestamp', 
                                                   F.from_unixtime(F.unix_timestamp('trip_start_timestamp',
                                                                                    'MM/dd/yyyy hh:mm:ss a'),
                                                                   'yyyy-MM-dd HH:mm:ss').cast('timestamp'))  

Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_end_timestamp', 
                                                   F.from_unixtime(F.unix_timestamp('trip_end_timestamp',
                                                                                    'MM/dd/yyyy hh:mm:ss a'),
                                                                   'yyyy-MM-dd HH:mm:ss').cast('timestamp'))  
#-----
# extract date (trip start)
#-----
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_start_date', 
                                                   F.from_unixtime(F.unix_timestamp('trip_start_timestamp',
                                                                                    'MM/dd/yyyy hh:mm:ss a'),
                                                                   'yyyy-MM-dd').cast('date'))  
#-----
# extract month, day, year, and dow from 'trip_start_date'
#-----
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_start_month',F.month('trip_start_date'))
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_start_day',F.dayofmonth('trip_start_date'))
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_start_year',F.year('trip_start_date'))
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_start_dow',F.dayofweek('trip_start_date'))
#Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_end_month',F.month('trip_end_timestamp'))
#Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_end_day',F.dayofmonth('trip_end_timestamp'))
#Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_end_year',F.year('trip_end_timestamp'))
#Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('trip_end_dow',F.dayofweek('trip_end_timestamp'))

#Taxi_Uber_lyft_sdf.select('start_day','end_month','start_year','start_dow').drop_duplicates().show(3)

#-----
# add weekend vs weekday dummy
#-----
weekend_days = [1,6,7]
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.withColumn('weekend_dummy',\
                                                   F.when(F.col('trip_start_dow').isin(weekend_days),1)\
                                                   .otherwise(0))

Taxi_Uber_lyft_sdf.show(2,truncate=False)

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+-------------------+------------+----------+---------------------+----------------------+----+----+----------+------------+------------------+---------+------------+----------------------+---------------+----------------+--------------+---------------+--------------+-------------+
|trip_start_timestamp|trip_end_timestamp |trip_seconds|trip_miles|pickup_community_area|dropoff_community_area|fare|tips|trip_total|payment_type|additional_charges|ride_type|trips_pooled|shared_trip_authorized|trip_start_date|trip_start_month|trip_start_day|trip_start_year|trip_start_dow|weekend_dummy|
+--------------------+-------------------+------------+----------+---------------------+----------------------+----+----+----------+------------+------------------+---------+------------+----------------------+---------------+----------------+--------------+---------------+--------------+-------------+
|2020-05-22 07:45:00 |2020-05-22 08:00:00|809         |1.4       |7                    |

                                                                                

In [5]:
Taxi_Uber_lyft_sdf.dtypes

[('trip_start_timestamp', 'timestamp'),
 ('trip_end_timestamp', 'timestamp'),
 ('trip_seconds', 'int'),
 ('trip_miles', 'double'),
 ('pickup_community_area', 'int'),
 ('dropoff_community_area', 'int'),
 ('fare', 'double'),
 ('tips', 'double'),
 ('trip_total', 'double'),
 ('payment_type', 'string'),
 ('additional_charges', 'double'),
 ('ride_type', 'string'),
 ('trips_pooled', 'int'),
 ('shared_trip_authorized', 'boolean'),
 ('trip_start_date', 'date'),
 ('trip_start_month', 'int'),
 ('trip_start_day', 'int'),
 ('trip_start_year', 'int'),
 ('trip_start_dow', 'int'),
 ('weekend_dummy', 'int')]

In [6]:
#Taxi_Uber_lyft_sdf.count() #462314865 for all years

                                                                                

462314865

In [7]:
#Taxi_Uber_lyft_sdf.groupBy('trip_start_year').count().show()

                                                                                

+---------------+---------+
|trip_start_year|    count|
+---------------+---------+
|           2018| 38193541|
|           2015| 32385875|
|           2022| 49072066|
|           2013| 27217716|
|           null|        4|
|           2014| 37395436|
|           2019|125190465|
|           2020| 40998888|
|           2016| 31759339|
|           2017| 24988003|
|           2021| 55113532|
+---------------+---------+



In [6]:
Taxi_Uber_lyft_sdf = Taxi_Uber_lyft_sdf.filter((Taxi_Uber_lyft_sdf.trip_start_year == 2019) | (Taxi_Uber_lyft_sdf.trip_start_year == 2020) | (Taxi_Uber_lyft_sdf.trip_start_year == 2021))

In [9]:
#Taxi_Uber_lyft_sdf.groupBy('trip_start_year').count().show()



+---------------+---------+
|trip_start_year|    count|
+---------------+---------+
|           2019|125190465|
|           2020| 40998888|
|           2021| 55113532|
+---------------+---------+



                                                                                

In [24]:
#Taxi_Uber_lyft_sdf.count() #expect 250 mill, this is the actual number 221,302,885

                                                                                

221302885

### Weather Data

***Station Names in weather data do NOT align with community areas. To solve this issue, create a "chicagoland view" by summing all station's values for each date.***

NOTE: 9’s in a field (e.g.9999) indicate missing data or data that has not been received.

The five core values are:
- PRCP = Precipitation (mm or inches as per user preference, inches to hundredths on Daily Form pdf file)
- SNOW = Snowfall (mm or inches as per user preference, inches to tenths on Daily Form pdf file)
- SNWD = Snow depth (mm or inches as per user preference, inches on Daily Form pdf file)
- TMAX = Maximum temperature (Fahrenheit or Celsius as per user preference, Fahrenheit to tenths on Daily Form pdf file
- TMIN = Minimum temperature (Fahrenheit or Celsius as per user preference, Fahrenheit to tenths on Daily Form pdf file

In [7]:
#----------
# Load Weather csv files
#----------
W2021_sdf = read_data('gs://big-data-final/raw-weather/2021 weather data.csv')
W1920_sdf = read_data('gs://big-data-final/raw-weather/2019-2020 weather data.csv')
#print(W2021_sdf.columns)
#print(W1920_sdf.columns)

from pyspark.sql.functions import col

def align_weather_data(datain):
    # temprarture is null - only keeping snow and precipitation
    w_sdf = datain.select('NAME','DATE','PRCP','SNOW')        
    # date type
    w_sdf = w_sdf.withColumn('DATE', to_date(col('DATE'),'yyyy-MM-dd'))    
    return w_sdf

Weather_all_sdf = align_weather_data(W2021_sdf).unionAll(align_weather_data(W1920_sdf)).fillna(0)

#-----
# sum precipitation and snow by date, for all Chicago stations
#-----
Weather_all_sdf = Weather_all_sdf.groupBy("DATE").sum("PRCP","SNOW")
Weather_all_sdf = Weather_all_sdf.withColumnRenamed("sum(PRCP)","PRCP")
Weather_all_sdf = Weather_all_sdf.withColumnRenamed("sum(SNOW)","SNOW")

Weather_all_sdf.show(2)

22/11/26 06:58:52 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor.afterExecute(HadoopThreadPoolExecutor.java:90)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1157)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[Stage 7:>                                                          (0 + 2) / 2]

+----------+------------------+----+
|      DATE|              PRCP|SNOW|
+----------+------------------+----+
|2021-06-22|              0.02| 0.0|
|2021-08-27|1.7200000000000002| 0.0|
+----------+------------------+----+
only showing top 2 rows



                                                                                

In [8]:
#----------
# merge weather and taxi/rideshare data
#----------
Taxi_Uber_lyft_sdf.createOrReplaceTempView("taxi")
Weather_all_sdf.createOrReplaceTempView("weather")

# We are choosing to merge by start dates - as is when the user made the decision to order a ride
Taxi_Uber_lyft_w_sdf = spark.sql("""SELECT t.*, w.*
                                    FROM 
                                    taxi t left join weather w 
                                    on t.trip_start_date = w.DATE """)

# drop "DATE" column (from weather data)
Taxi_Uber_lyft_w_sdf = Taxi_Uber_lyft_w_sdf.drop('DATE')

Taxi_Uber_lyft_w_sdf.dtypes

[('trip_start_timestamp', 'timestamp'),
 ('trip_end_timestamp', 'timestamp'),
 ('trip_seconds', 'int'),
 ('trip_miles', 'double'),
 ('pickup_community_area', 'int'),
 ('dropoff_community_area', 'int'),
 ('fare', 'double'),
 ('tips', 'double'),
 ('trip_total', 'double'),
 ('payment_type', 'string'),
 ('additional_charges', 'double'),
 ('ride_type', 'string'),
 ('trips_pooled', 'int'),
 ('shared_trip_authorized', 'boolean'),
 ('trip_start_date', 'date'),
 ('trip_start_month', 'int'),
 ('trip_start_day', 'int'),
 ('trip_start_year', 'int'),
 ('trip_start_dow', 'int'),
 ('weekend_dummy', 'int'),
 ('PRCP', 'double'),
 ('SNOW', 'double')]

In [None]:
#Taxi_Uber_lyft_w_sdf.count()

### Events Data

Events data includes Chicago sporting events.

NOTE: `nb_code` is equivalent to `community_area_code`

In [9]:
#----------
# Load Events csv file
#----------
All_events_data_sdf = read_data('gs://big-data-final/raw-events/All_events_aligned.csv')
All_events_data_sdf = All_events_data_sdf.withColumnRenamed('nb_code','event_community_area')
#All_events_data_sdf = All_events_data_sdf.withColumnRenamed('NEIGHBORHOOD','event_community_area_name')

#-----
# count the number of events by date, in each community
#-----
All_events_data_sdf = All_events_data_sdf.groupby('DAY','MONTH','YEAR','event_community_area').count()

All_events_data_sdf.show(5)
All_events_data_sdf.dtypes

+---+-----+----+--------------------+-----+
|DAY|MONTH|YEAR|event_community_area|count|
+---+-----+----+--------------------+-----+
| 19|    5|2019|                  34|    1|
| 27|    5|2019|                  34|    2|
| 29|    5|2021|                   6|    1|
| 20|   12|2021|                  33|    1|
|  2|    9|2021|                  34|    1|
+---+-----+----+--------------------+-----+
only showing top 5 rows



[('DAY', 'int'),
 ('MONTH', 'int'),
 ('YEAR', 'int'),
 ('event_community_area', 'int'),
 ('count', 'bigint')]

In [10]:
# Merge with Uber and Taxis data
Taxi_Uber_lyft_w_sdf.createOrReplaceTempView("taxi2")
All_events_data_sdf.createOrReplaceTempView("events")

# merge by ride start date

# 1. left join events data by date and PICKUP COMMUNITY
Taxi_Uber_lyft_w_e_sdf = spark.sql("""SELECT t.*,
                                             e.*
                                      FROM taxi2 t left join events e 
                                          on t.trip_start_day = e.DAY and 
                                             t.trip_start_month = e.MONTH and
                                             t.trip_start_year = e.YEAR and
                                             t.pickup_community_area = e.event_community_area""")

Taxi_Uber_lyft_w_e_sdf = Taxi_Uber_lyft_w_e_sdf.drop('DAY','MONTH','YEAR','event_community_area')
Taxi_Uber_lyft_w_e_sdf = Taxi_Uber_lyft_w_e_sdf.withColumnRenamed('count','pickup_community_eventCnt')
#Taxi_Uber_lyft_w_e_sdf.dtypes

In [11]:
# 2. left join events data by date and DROPOFF COMMUNITY
Taxi_Uber_lyft_w_e_sdf.createOrReplaceTempView("taxi_e")

Taxi_Uber_lyft_w_e_sdf = spark.sql("""SELECT t.*,
                                             e.*
                                      FROM taxi_e t left join events e 
                                          on t.trip_start_day = e.DAY and 
                                             t.trip_start_month = e.MONTH and
                                             t.trip_start_year = e.YEAR and
                                             t.dropoff_community_area = e.event_community_area""")

Taxi_Uber_lyft_w_e_sdf = Taxi_Uber_lyft_w_e_sdf.drop('DAY','MONTH','YEAR','event_community_area')
Taxi_Uber_lyft_w_e_sdf = Taxi_Uber_lyft_w_e_sdf.withColumnRenamed('count','dropoff_community_eventCnt')

Taxi_Uber_lyft_w_e_sdf = Taxi_Uber_lyft_w_e_sdf.fillna(0)

#-----
# create a total count of the events occurring in a rider's pickup & dropoff community
# if the pickup and dropoff community are the same, do NOT double count events
#-----
Taxi_Uber_lyft_w_e_sdf = Taxi_Uber_lyft_w_e_sdf.withColumn('community_eventCnt',
                                                           F.when(F.col('pickup_community_area')==F.col('dropoff_community_area'), 
                                                                  col('dropoff_community_eventCnt')
                                                                 ).otherwise(col('dropoff_community_eventCnt')+col('pickup_community_eventCnt')))

Taxi_Uber_lyft_w_e_sdf.dtypes

22/11/26 06:59:00 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[('trip_start_timestamp', 'timestamp'),
 ('trip_end_timestamp', 'timestamp'),
 ('trip_seconds', 'int'),
 ('trip_miles', 'double'),
 ('pickup_community_area', 'int'),
 ('dropoff_community_area', 'int'),
 ('fare', 'double'),
 ('tips', 'double'),
 ('trip_total', 'double'),
 ('payment_type', 'string'),
 ('additional_charges', 'double'),
 ('ride_type', 'string'),
 ('trips_pooled', 'int'),
 ('shared_trip_authorized', 'boolean'),
 ('trip_start_date', 'date'),
 ('trip_start_month', 'int'),
 ('trip_start_day', 'int'),
 ('trip_start_year', 'int'),
 ('trip_start_dow', 'int'),
 ('weekend_dummy', 'int'),
 ('PRCP', 'double'),
 ('SNOW', 'double'),
 ('pickup_community_eventCnt', 'bigint'),
 ('dropoff_community_eventCnt', 'bigint'),
 ('community_eventCnt', 'bigint')]

Check transformation
```
Taxi_Uber_lyft_w_e_sdf.filter(Taxi_Uber_lyft_w_e_sdf.community_eventCnt>0
                             ).select('dropoff_community_area',
                                      'pickup_community_area',
                                      'pickup_community_eventCnt',
                                      'dropoff_community_eventCnt',
                                      'community_eventCnt').show(5)

Taxi_Uber_lyft_w_e_sdf.filter((Taxi_Uber_lyft_w_e_sdf.community_eventCnt>0) &
                             (Taxi_Uber_lyft_w_e_sdf.dropoff_community_area==Taxi_Uber_lyft_w_e_sdf.pickup_community_area
                             )).select('dropoff_community_area',
                                       'pickup_community_area',               
                                       'pickup_community_eventCnt',
                                       'dropoff_community_eventCnt',
                                       'community_eventCnt').show(5)
```

### Add Communities data 

In [12]:
#----------
# Load Community Names parquet file
#----------
chi_comunities_sdf = spark.read.parquet("gs://big-data-final/clean-parquet/chi_communities.parquet")
#chi_comunities_sdf.show(5)

#print(Taxi_Uber_lyft_w_e_sdf.columns)

Taxi_Uber_lyft_w_e_sdf.createOrReplaceTempView("taxi")
chi_comunities_sdf.createOrReplaceTempView("chicom")

# 1. merge pickup_community_name
Taxi_Uber_lyft_w_e_sdf = spark.sql("""SELECT t.*,
                                        c.community_area_name as pickup_community_name
                                      FROM 
                                        taxi t left join chicom c 
                                        on t.pickup_community_area = c.community_area""")

# 2. merge dropoff_community_name
Taxi_Uber_lyft_w_e_sdf.createOrReplaceTempView("taxi2")
Taxi_Uber_lyft_w_e_sdf = spark.sql("""SELECT t.*,
                                        c.community_area_name as dropoff_community_name
                                      FROM 
                                        taxi2 t left join chicom c 
                                        on t.dropoff_community_area = c.community_area""")

In [13]:
# Dummy for rides that started or finished outside Chicagoland
def dummy_communities(dat,var_in,dummy_out):
    dat = \
    dat.withColumn(dummy_out,F.when(F.col(var_in) == 0,0).otherwise(1))
    return dat

Taxi_Uber_lyft_w_e_sdf = dummy_communities(Taxi_Uber_lyft_w_e_sdf,'pickup_community_area','chicago_pickup')
Taxi_Uber_lyft_w_e_sdf = dummy_communities(Taxi_Uber_lyft_w_e_sdf,'dropoff_community_area','chicago_dropoff')

Taxi_Uber_lyft_w_e_sdf = Taxi_Uber_lyft_w_e_sdf.withColumn('outside_chicago_ride',\
                             F.when((F.col('chicago_pickup') == 0) & (F.col('chicago_dropoff') == 0),0).\
                                                               otherwise(1))

### Add Covid data

In [14]:
#----------
# Load Covid-19 csv file
#----------
covid_sdf = read_data('gs://big-data-final/raw-covid/covid.csv')
#covid_sdf.show(5)
covid_sdf = covid_sdf.select('Date','Cases - Total','Deaths - Total','Hospitalizations - Total')

# rename columns
for col in ['Cases - Total','Deaths - Total','Hospitalizations - Total']:
    covid_sdf = covid_sdf.withColumnRenamed(col, col.lower().replace('-','').replace('  ','_'))

covid_sdf = covid_sdf.withColumn('date_to_use',F.from_unixtime(F.unix_timestamp('Date','MM/dd/yyyy')\
                                                                   ,'yyyy-MM-dd').cast('date'))
# drop Date string variable
covid_sdf = covid_sdf.drop("Date")

# sort by date_to_use, ascending
covid_sdf = covid_sdf.sort("date_to_use")

# create month, year, and day columns
covid_sdf1 = covid_sdf.withColumn('MONTH', F.month('date_to_use')
                                 ).withColumn('YEAR', F.year('date_to_use')
                                             ).withColumn('day',F.dayofmonth('date_to_use'))

# drop rows with null value of "date_to_use" 
covid_sdf1 = covid_sdf1.na.drop(subset=["date_to_use"])

#-----
# create rolling average of Covid-19 metrics
#-----
covid_sdf1.createOrReplaceTempView("covid1")
covid_sdf2 = spark.sql("""SELECT  *, 
                              avg(`cases_total`) OVER(ORDER BY date_to_use
                              ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)
                               as covid_cases_sma7,
                              avg(`hospitalizations_total`) OVER(ORDER BY date_to_use
                              ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)
                               as covid_hosp_sma7,
                              avg(`deaths_total`) OVER(ORDER BY date_to_use
                              ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)
                                as covid_deaths_sma7
                          FROM covid1 """)

covid_sdf2 = covid_sdf2.fillna(0)

In [15]:
# merge data
Taxi_Uber_lyft_w_e_sdf.createOrReplaceTempView("taxi2")
covid_sdf2.createOrReplaceTempView("covid2")

Taxi_Uber_lyft_w_e_c_sdf = spark.sql("""SELECT t.*,c.*
                                        FROM 
                                        taxi2 t left join covid2 c 
                                        on t.trip_start_date = c.date_to_use """)

Taxi_Uber_lyft_w_e_c_sdf = Taxi_Uber_lyft_w_e_c_sdf.drop('day','MONTH','YEAR','date_to_use',
                                                         'cases_total','deaths_total','hospitalizations_total')

Taxi_Uber_lyft_w_e_c_sdf.dtypes

[('trip_start_timestamp', 'timestamp'),
 ('trip_end_timestamp', 'timestamp'),
 ('trip_seconds', 'int'),
 ('trip_miles', 'double'),
 ('pickup_community_area', 'int'),
 ('dropoff_community_area', 'int'),
 ('fare', 'double'),
 ('tips', 'double'),
 ('trip_total', 'double'),
 ('payment_type', 'string'),
 ('additional_charges', 'double'),
 ('ride_type', 'string'),
 ('trips_pooled', 'int'),
 ('shared_trip_authorized', 'boolean'),
 ('trip_start_date', 'date'),
 ('trip_start_month', 'int'),
 ('trip_start_day', 'int'),
 ('trip_start_year', 'int'),
 ('trip_start_dow', 'int'),
 ('weekend_dummy', 'int'),
 ('PRCP', 'double'),
 ('SNOW', 'double'),
 ('pickup_community_eventCnt', 'bigint'),
 ('dropoff_community_eventCnt', 'bigint'),
 ('community_eventCnt', 'bigint'),
 ('pickup_community_name', 'string'),
 ('dropoff_community_name', 'string'),
 ('chicago_pickup', 'int'),
 ('chicago_dropoff', 'int'),
 ('outside_chicago_ride', 'int'),
 ('covid_cases_sma7', 'double'),
 ('covid_hosp_sma7', 'double'),
 ('cov

In [None]:
#Taxi_Uber_lyft_w_e_c_sdf.count() #221,302,885

                                                                                

221302885

In [16]:
Taxi_Uber_lyft_w_e_c_sdf = Taxi_Uber_lyft_w_e_c_sdf.na.fill(value=0)

In [None]:
#Taxi_Uber_lyft_w_e_c_sdf.write.parquet('gs://big-data-final/model-data/final-model2.parquet')
#print('done')

22/11/26 05:42:41 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 05:42:44 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 05:42:44 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 05:42:44 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 05:42:44 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

done


# Machine Learning Models

## Feature Engineering

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer,IndexToString,VectorAssembler,OneHotEncoder

In [18]:
# filter for rides with non-zero fare
modData = Taxi_Uber_lyft_w_e_c_sdf.filter(F.col('fare')>0)

# define 'label'
#    1: rider leaves a tip
#    0: rider does not leave a tip
modData = modData.withColumn("label",
                             F.when((F.col('tips')>0),1).otherwise(0))

# convert 'ride_type' to integer
#    1: rideshare
#    0: taxi
modData = modData.withColumn("ride_type",
                             F.when((F.col('ride_type')=="rideshare"),1).otherwise(0).cast('integer'))

# define 'fare_add' as fare + additional charges (*note, trip_total includes tip)
modData = modData.withColumn("fare_add",
                             (F.col('fare')+F.col('additional_charges')))

# define 'tip_pct' as the tip dollar value / (fare + additional_charges)
modData = modData.withColumn("tip_pct",
                             (F.col('tips') / F.col('fare_add')))

# define 'add_charge_pct' as the percent of the pre-tip ride cost coming from additional_charges
# (additional_charges / (fare + additional_charges))
modData = modData.withColumn("add_charge_pct",
                             (F.col('additional_charges') / F.col('fare_add')))

# add hour column
modData = modData.withColumn('hour', hour(modData.trip_start_timestamp))

# add rain_snow column
modData = modData.withColumn("rain_snow",
                             F.when((F.col('PRCP')+F.col('SNOW'))>0, 1).otherwise(0))

# add 'winter' column
modData = modData.withColumn("winter",
                             F.when(((F.col('trip_start_month')==1) | 
                                     (F.col('trip_start_month')==2) | 
                                     (F.col('trip_start_month')==3)), 1).otherwise(0))
# add 'spring' column
modData = modData.withColumn("spring",
                             F.when(((F.col('trip_start_month')==4) | 
                                     (F.col('trip_start_month')==5) | 
                                     (F.col('trip_start_month')==6)), 1).otherwise(0))
# add 'summer' column
modData = modData.withColumn("summer",
                             F.when(((F.col('trip_start_month')==7) | 
                                     (F.col('trip_start_month')==8) | 
                                     (F.col('trip_start_month')==9)), 1).otherwise(0))
# add 'autumn' column
modData = modData.withColumn("autumn",
                             F.when(((F.col('trip_start_month')==10) | 
                                     (F.col('trip_start_month')==11) | 
                                     (F.col('trip_start_month')==12)), 1).otherwise(0))
# create dow columns, Monday=0, Sunday=6
modData = modData.withColumn("sunday",F.when(F.col('trip_start_dow')==1,1).otherwise(0))
modData = modData.withColumn("monday",F.when(F.col('trip_start_dow')==2,1).otherwise(0))
modData = modData.withColumn("tuesday",F.when(F.col('trip_start_dow')==3,1).otherwise(0))
modData = modData.withColumn("wednesday",F.when(F.col('trip_start_dow')==4,1).otherwise(0))
modData = modData.withColumn("thursday",F.when(F.col('trip_start_dow')==5,1).otherwise(0))
modData = modData.withColumn("friday",F.when(F.col('trip_start_dow')==6,1).otherwise(0))
modData = modData.withColumn("saturday",F.when(F.col('trip_start_dow')==7,1).otherwise(0))

# convert 'shared_trip_authorized' from boolean to integer
modData = modData.withColumn("shared_trip_authorized",
                             when(F.col('shared_trip_authorized')==True,1).otherwise(0).cast('integer'))

# convert categorical features to string type
#modData = modData.withColumn("trip_start_month", F.col("trip_start_month").cast("string"))
#modData = modData.withColumn("trip_start_dow", F.col("trip_start_dow").cast("string"))

# drop unneeded columns                                              
modData = modData.drop('trip_end_timestamp',
                       'pickup_community_area', 'dropoff_community_area', #numeric codes
                       'pickup_community_eventCnt','dropoff_community_eventCnt', #keeping combined count
                       'chicago_pickup','chicago_dropoff', #keeping combined count
                       'PRCP','SNOW' #keeping binary 'rain_show' variable
                      )
modData.dtypes

[('trip_start_timestamp', 'timestamp'),
 ('trip_seconds', 'int'),
 ('trip_miles', 'double'),
 ('fare', 'double'),
 ('tips', 'double'),
 ('trip_total', 'double'),
 ('payment_type', 'string'),
 ('additional_charges', 'double'),
 ('ride_type', 'int'),
 ('trips_pooled', 'int'),
 ('shared_trip_authorized', 'int'),
 ('trip_start_date', 'date'),
 ('trip_start_month', 'int'),
 ('trip_start_day', 'int'),
 ('trip_start_year', 'int'),
 ('trip_start_dow', 'int'),
 ('weekend_dummy', 'int'),
 ('community_eventCnt', 'bigint'),
 ('pickup_community_name', 'string'),
 ('dropoff_community_name', 'string'),
 ('outside_chicago_ride', 'int'),
 ('covid_cases_sma7', 'double'),
 ('covid_hosp_sma7', 'double'),
 ('covid_deaths_sma7', 'double'),
 ('label', 'int'),
 ('fare_add', 'double'),
 ('tip_pct', 'double'),
 ('add_charge_pct', 'double'),
 ('hour', 'int'),
 ('rain_snow', 'int'),
 ('winter', 'int'),
 ('spring', 'int'),
 ('summer', 'int'),
 ('autumn', 'int'),
 ('sunday', 'int'),
 ('monday', 'int'),
 ('tuesday',

In [19]:
# this is the final data, use this
modData.write.parquet('gs://big-data-final/model-data/final-model-with-feature.parquet')
print('done')

22/11/26 07:00:05 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 07:00:08 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 07:00:09 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 07:00:09 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/26 07:00:09 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

done


In [23]:
#modData.count() #221,302,885, this is #220,167,785

                                                                                

220167785