In [7]:
import os
import sys
import requests

In [10]:
# Define inputs
TAXI_TYPE = "yellow"  # Example: "yellow" or "green"
YEAR = "2020"         # Example: 2020 or ####

# URL prefix
URL_PREFIX = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download"

In [12]:
for month in range(1, 13):
    # Format month to two digits (e.g., 01, 02, ..., 12)
    fmonth = f"{month:02d}"

    # Construct the URL
    url = f"{URL_PREFIX}/{TAXI_TYPE}/{TAXI_TYPE}_tripdata_{YEAR}-{fmonth}.csv.gz"

    # Define local file path
    local_prefix = os.path.join("data", "raw", TAXI_TYPE, YEAR, fmonth)
    local_file = f"{TAXI_TYPE}_tripdata_{YEAR}_{fmonth}.csv.gz"
    local_path = os.path.join(local_prefix, local_file)
    
   # Check if the file already exists
    if os.path.exists(local_path):
        print(f"File already exists: {local_path}. Skipping download.")
        continue  # Skip to the next iteration
        
    # Create directory if it doesn't exist
    os.makedirs(local_prefix, exist_ok=True)

    # Download the file
    print(f"Downloading {url} to {local_path}")
    response = requests.get(url)
    if response.status_code == 200:
        with open(local_path, "wb") as f:
            f.write(response.content)
    else:
        print(f"Failed to download {url}. HTTP Status Code: {response.status_code}")

File already exists: data/raw/yellow/2020/01/yellow_tripdata_2020_01.csv.gz. Skipping download.
File already exists: data/raw/yellow/2020/02/yellow_tripdata_2020_02.csv.gz. Skipping download.
File already exists: data/raw/yellow/2020/03/yellow_tripdata_2020_03.csv.gz. Skipping download.
File already exists: data/raw/yellow/2020/04/yellow_tripdata_2020_04.csv.gz. Skipping download.
File already exists: data/raw/yellow/2020/05/yellow_tripdata_2020_05.csv.gz. Skipping download.
Downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-06.csv.gz to data/raw/yellow/2020/06/yellow_tripdata_2020_06.csv.gz
File already exists: data/raw/yellow/2020/07/yellow_tripdata_2020_07.csv.gz. Skipping download.
File already exists: data/raw/yellow/2020/08/yellow_tripdata_2020_08.csv.gz. Skipping download.
File already exists: data/raw/yellow/2020/09/yellow_tripdata_2020_09.csv.gz. Skipping download.
File already exists: data/raw/yellow/2020/10/yellow_tripdata

In [13]:
import pyspark
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 09:31:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/06 09:31:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [15]:
import pandas as pd

In [16]:
from pyspark.sql import types

In [17]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [28]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [19]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3


                                                                                

processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [29]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1


                                                                                

processing data for 2020/2


                                                                                

processing data for 2020/3


                                                                                

processing data for 2020/4


                                                                                

processing data for 2020/5


                                                                                

processing data for 2020/6


                                                                                

processing data for 2020/7


                                                                                

processing data for 2020/8


                                                                                

processing data for 2020/9


                                                                                

processing data for 2020/10


                                                                                

processing data for 2020/11


                                                                                

processing data for 2020/12


                                                                                

In [24]:
df_pandas = pd.read_csv("data/raw/yellow/2020/01/yellow_tripdata_2020_01.csv.gz", compression='gzip', nrows=100)

In [25]:
df_pandas

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2020-01-01 00:28:15,2020-01-01 00:33:03,1,1.20,1,N,238,239,1,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1,2020-01-01 00:35:39,2020-01-01 00:43:04,1,1.20,1,N,239,238,1,7.0,3.0,0.5,1.50,0.0,0.3,12.30,2.5
2,1,2020-01-01 00:47:41,2020-01-01 00:53:52,1,0.60,1,N,238,238,1,6.0,3.0,0.5,1.00,0.0,0.3,10.80,2.5
3,1,2020-01-01 00:55:23,2020-01-01 01:00:14,1,0.80,1,N,238,151,1,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2,2020-01-01 00:01:58,2020-01-01 00:04:16,1,0.00,1,N,193,193,2,3.5,0.5,0.5,0.00,0.0,0.3,4.80,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2020-01-01 00:53:56,2020-01-01 01:13:51,1,3.75,1,N,236,107,1,15.5,0.5,0.5,2.00,0.0,0.3,21.30,2.5
96,1,2020-01-01 00:14:05,2020-01-01 00:40:50,2,2.10,1,N,48,107,1,16.5,3.0,0.5,4.05,0.0,0.3,24.35,2.5
97,2,2020-01-01 00:14:44,2020-01-01 00:23:35,1,1.78,1,N,161,141,2,8.0,0.5,0.5,0.00,0.0,0.3,11.80,2.5
98,2,2020-01-01 00:25:31,2020-01-01 00:38:12,1,3.16,1,N,141,224,2,11.5,0.5,0.5,0.00,0.0,0.3,15.30,2.5


In [27]:
len(df_pandas.columns)

18