In [1]:
#Import required packages
from bs4 import BeautifulSoup
import requests
import os.path
import urllib.request
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, desc
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampNTZType

In [2]:
#os.makedirs("Yellow Taxi Trip Data")

In [3]:
#Get the data from the website
url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

page = requests.get(url)

In [4]:
#Use BeautifulSoup for data scraping
soup = BeautifulSoup(page.content,"html.parser")
#print(soup)

In [5]:
#Get the links to download each parquet file
urls = []

#We have data from 2009-2023. This means there are 15 years of data for each month of the year 
for i in range(15):
    table_year = soup.find_all('table')[i] #find the table that is related to the year data

    yt = table_year.find_all('a',{"title":"Yellow Taxi Trip Records"}) #find the data related to Yellow Taxi Trip Records

    for month in range(len(yt)):
        #Get the link for each month of the year and add it to the urls list
        urls.append(table_year.find_all('a',{"title":"Yellow Taxi Trip Records"})[month].get("href"))


In [6]:
#Check if all links of data files are there 14years(from 2009-2022)*12months + 7months (2023) = 175
len(urls)

175

In [7]:
#Create a path for each data file 
filename_list = [] #all the name files will be added into that list
for url in urls:
    #cut the name of the url to get only the information about the date of the file
    test_name = url.rsplit('/', 1)[-1]
    final_name = test_name.split('.')[0]
    filename_list.append(final_name)

    filename = os.path.join('Yellow Taxi Trip Data', final_name) #create the filename path for each data file

    # Download the file if it does not exist
    if not os.path.isfile(filename):
        print('Downloading: ' + filename)
        try:
            urllib.request.urlretrieve(url, filename)
        except Exception as inst:
            print(inst)
            print('Encountered unknown error. Continuing.')

In [8]:
# Initialize a SparkSession
spark = SparkSession.builder.appName('YellowTaxiTripData').getOrCreate()

23/10/23 09:39:15 WARN Utils: Your hostname, Reveccas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.10 instead (on interface en0)
23/10/23 09:39:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/23 09:39:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/23 09:39:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [9]:
#Read and add all the parquet format files into a spark dataframe and then add them into a list
df_all = []
for file in range(len(filename_list)):
    parquet_file_path = "/Users/reveccachristou/Desktop/Aptitude/Yellow Taxi Trip Data/" + filename_list[file]
    df = spark.read.parquet(parquet_file_path)
    df_all.append(df)



CodeCache: size=131072Kb used=22660Kb max_used=22660Kb free=108411Kb
 bounds [0x00000001069e0000, 0x0000000108030000, 0x000000010e9e0000]
 total_blobs=8806 nmethods=7925 adapters=793
 compilation: disabled (not enough contiguous free space left)


In [10]:
#Check the first dataframe with data from July 2023
df1 = spark.read.parquet("/Users/reveccachristou/Desktop/Aptitude/Yellow Taxi Trip Data/" + filename_list[0])
df1.columns #get the column names of the dataframe

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [11]:
#Check all possible column names of all dataframes
col_list = []
for df in df_all: #for each dataframe
    cols = df.columns
    for c in cols:
        if c not in col_list: #if the specific column name does not exist, then add it to the list
            col_list.append(c) 
col_list

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'Airport_fee',
 'vendor_id',
 'pickup_datetime',
 'dropoff_datetime',
 'pickup_longitude',
 'pickup_latitude',
 'rate_code',
 'dropoff_longitude',
 'dropoff_latitude',
 'surcharge',
 '__index_level_0__',
 'vendor_name',
 'Trip_Pickup_DateTime',
 'Trip_Dropoff_DateTime',
 'Passenger_Count',
 'Trip_Distance',
 'Start_Lon',
 'Start_Lat',
 'Rate_Code',
 'store_and_forward',
 'End_Lon',
 'End_Lat',
 'Payment_Type',
 'Fare_Amt',
 'Tip_Amt',
 'Tolls_Amt',
 'Total_Amt']

In [12]:
#Create a dictionary manually with all the possible names of the same column
#Not sure how this can be done automatically, since not all dataframes have the same schema and order of columns
#'pickup_longitude':['Start_Lon'],'pickup_latitude':['Start_Lat'],'dropoff_longitude':['End_Lon'], and
#'dropoff_latitude':['End_Lat'] do not exist at the first dataframe, however, they are essential to match with the
#remaining dataframes. We will create a new dataframe that has the schema of all the possible columns of all dataframes 

d = {'VendorID':['vendor_id','vendor_name'],
 'tpep_pickup_datetime':['pickup_datetime','Trip_Pickup_DateTime'],
 'tpep_dropoff_datetime':['dropoff_datetime','Trip_Dropoff_DateTime'],
 'passenger_count':['Passenger_Count'],
 'trip_distance':['Trip_Distance'],
 'RatecodeID':['rate_code','Rate_Code'],
 'store_and_fwd_flag':['store_and_forward'],
 'PULocationID':[],
 'DOLocationID':[],
 'payment_type':['Payment_Type'],
 'fare_amount':['Fare_Amt'],
 'extra':['surcharge'],
 'mta_tax':[],
 'tip_amount':['Tip_Amt'],
 'tolls_amount':['Tolls_Amt'],
 'improvement_surcharge':[],
 'total_amount':['Total_Amt'],
 'congestion_surcharge':[],
 'airport_fee':['Airport_fee'],
 'pickup_longitude':['Start_Lon'],
 'pickup_latitude':['Start_Lat'],
 'dropoff_longitude':['End_Lon'],
 'dropoff_latitude':['End_Lat'],
 '__index_level_0__':[]}

In [13]:
#The final dataframe will have 24 different columns
cols = list(d.keys())
len(list(d.keys()))

24

In [14]:
print(cols)

['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', '__index_level_0__']


In [15]:
#Check the data type of each column of the first dataframe with data from July 2023
column_data_types = df1.dtypes
print(column_data_types)
df1_types = []
for i in column_data_types:
    df1_types.append(i[1])

#The last 5 column names that do not exist in df1 have double type
for i in range(5):
    df1_types.append('double') 

[('VendorID', 'bigint'), ('tpep_pickup_datetime', 'timestamp_ntz'), ('tpep_dropoff_datetime', 'timestamp_ntz'), ('passenger_count', 'double'), ('trip_distance', 'double'), ('RatecodeID', 'double'), ('store_and_fwd_flag', 'string'), ('PULocationID', 'bigint'), ('DOLocationID', 'bigint'), ('payment_type', 'bigint'), ('fare_amount', 'double'), ('extra', 'double'), ('mta_tax', 'double'), ('tip_amount', 'double'), ('tolls_amount', 'double'), ('improvement_surcharge', 'double'), ('total_amount', 'double'), ('congestion_surcharge', 'double'), ('airport_fee', 'double')]


In [16]:
df1_types

['bigint',
 'timestamp_ntz',
 'timestamp_ntz',
 'double',
 'double',
 'double',
 'string',
 'bigint',
 'bigint',
 'bigint',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double',
 'double']

In [17]:
#Adjust the data types to match the spark datatypes to create a schema
data_types = [IntegerType(),TimestampNTZType(),TimestampNTZType(),DoubleType(),DoubleType(),DoubleType(),StringType(),IntegerType(),IntegerType(),IntegerType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType()]
print(data_types)

[IntegerType(), TimestampNTZType(), TimestampNTZType(), DoubleType(), DoubleType(), DoubleType(), StringType(), IntegerType(), IntegerType(), IntegerType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType()]


In [18]:
#Create a StructType schema based on the column name and data type list we created above
schema = StructType([StructField(name, data_type, nullable=True) for name, data_type in zip(cols, data_types)])

In [19]:
#Create a new dataframe where all the dataframes will be joined
final_df = spark.createDataFrame([], schema=schema)
#final_df.show()

# ***** For the following cell, I tried to reduce the number of merged dataframes to check if errors still occur, but still getting the same errors.

In [20]:
for df in df_all: #***** Reduce the number of merged dataframes to check if no errors occur, but still the same issue
    #Check if the column name of the newly created schema matches the column name of the existing dataframe
    for col in df.columns:
        if col in d.keys(): #if the column name matches the schema column name 
            continue
        else:
            #if it doesn't match, then rename the column name according to the schema
            for key, value in d.items(): 
                if col in value:
                    df = df.withColumnRenamed(col, key)
                    
    #if the dataframe doesn't have all the columns from the new schema, then add the column to the dataframe and
    #populate the columns with NULL values
    for c in cols:
        if c not in df.columns:
        # If it doesn't exist, add the column with null values
            df = df.withColumn(c, lit(None).cast("string"))   
    
    #Since all the appropriate changes have been made in order each dataframe to match the new schema, we can perform the merge of the dataframe to the final dataframe
    final_df = final_df.union(df)

In [21]:
#Check how the final dataframe looks like after the merging of all 175 dataframes
#final_df.show()

In [22]:
#Find how many records has the final dataframe
row_count = final_df.count()
print(row_count)



1721158822


                                                                                

In [23]:
#We need only the top 10% trips based on distance travelled
#We need to calculate how many records are 10% of all data
keep10 = int(row_count*10/100)
print(keep10)

172115882


In [24]:
#We filter the trip_distance column in descending order to have the longest trips at the top of the dataframe
final_df = final_df.orderBy(desc('trip_distance'))

In [25]:
#We keep only the top 10% trips based on distance travelled 
final_df = final_df.limit(keep10)
#final_df.show()
#final_df.count()

In [26]:
final_df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable =

In [27]:
final_df.describe('total_amount').head(5)

23/10/23 09:40:11 WARN DAGScheduler: Broadcasting large task binary with size 1056.3 KiB
23/10/23 09:40:14 ERROR Executor: Exception in task 35.0 in stage 179.0 (TID 1584)
java.lang.OutOfMemoryError: Java heap space
23/10/23 09:40:14 ERROR Executor: Exception in task 43.0 in stage 179.0 (TID 1592)
java.lang.OutOfMemoryError: Java heap space
23/10/23 09:40:14 ERROR Executor: Exception in task 27.0 in stage 179.0 (TID 1576)
java.lang.OutOfMemoryError: Java heap space
23/10/23 09:40:14 ERROR Executor: Exception in task 11.0 in stage 179.0 (TID 1560)
java.lang.OutOfMemoryError: Java heap space
23/10/23 09:40:14 ERROR Executor: Exception in task 51.0 in stage 179.0 (TID 1600)
java.lang.OutOfMemoryError: Java heap space
	at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:657)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:42)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$doExecute$10(limit.scala:324)
	at org.apache.spark.sql.executio

23/10/23 09:40:15 ERROR Executor: Exception in task 59.0 in stage 179.0 (TID 1608)
java.lang.OutOfMemoryError: Java heap space
23/10/23 09:40:15 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 59.0 in stage 179.0 (TID 1608),5,main]
java.lang.OutOfMemoryError: Java heap space
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 62161)
Traceback (most recent call last):
  File "/Users/reveccachristou/anaconda3/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/reveccachristou/anaconda3/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/Users/reveccachristou/anaconda3/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/User

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
spark.stop()