In [1]:
%reload_ext autoreload
%autoreload 2

In [42]:
import os
from pathlib import Path
import polars as pl
import pandas as pd
import requests
from tqdm import tqdm

from src.paths import *
from src.logger import get_logger

logger = get_logger()

# Download files

In [4]:

def download_file_from_source_into_raw_folder(year:int, month:int) -> Path:
    file = RAW_DATA_DIR / Path(FILE_PATTERN.format(year=year, month=month))
    url = BASE_URL.format(year=year, month=month)
    response = requests.get(url)
    response.raise_for_status()
    if response.status_code == 200:
        logger.info(f"Downloading file from {url} to {file}")
        with open(file, "wb") as f:
            f.write(response.content)
    return file


path = download_file_from_source_into_raw_folder(2020, 1)

# Validate a file
- Timestamp are within the file month. e.g, a file named `2020-07-01.csv` should have all timestamps within July 2020.

In [5]:
df = pl.read_parquet(path)
df.head()

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
i64,datetime[ns],datetime[ns],f64,f64,f64,str,i64,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,null
1,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,1.2,1.0,"""N""",238,239,1,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5,
1,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,1.2,1.0,"""N""",239,238,1,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5,
1,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,0.6,1.0,"""N""",238,238,1,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5,
1,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,0.8,1.0,"""N""",238,151,1,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0,
2,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,0.0,1.0,"""N""",193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0,


In [39]:
def validate_file(file:Path, year:int, month:int) -> pl.DataFrame:
    """
    Validates the given parquet file to ensure all records are within the specified year and month.

    This function reads the parquet file, filters the records to only include those where the
    'tpep_pickup_datetime' falls within the specified year and month, and then returns a DataFrame
    containing only the 'pickup_datetime' and 'pickup_location_id' columns for the filtered records.

    Parameters:
    - file (Path): The path to the parquet file to be validated.
    - year (int): The year to filter the 'tpep_pickup_datetime' by.
    - month (int): The month to filter the 'tpep_pickup_datetime' by.

    Returns:
    - pl.DataFrame: A DataFrame containing the 'pickup_datetime' and 'pickup_location_id' columns
      for records that fall within the specified year and month.
    """
    
    df = pl.read_parquet(file)

    range_expresion = (
        ( (pl.col("tpep_pickup_datetime").dt.year() == year)
            & (pl.col("tpep_pickup_datetime").dt.month() == month))
    )


    aggregate_values = (
        df
        .select([
            pl.col("tpep_pickup_datetime").filter(range_expresion).count().alias("records_in_range")
            , pl.col("tpep_pickup_datetime").count().alias("total_records")
        ]
        )
    )

    records_in_range = aggregate_values["records_in_range"].item()
    total_records = aggregate_values["total_records"].item()

    logger.info("Validation for file: ", file)
    logger.info(f"Total records: {total_records}")
    logger.info(f"Records deleted: {total_records-records_in_range}")
    logger.info(f"Percentage: {records_in_range / total_records * 100:.2f}%")

    clean_data = (
        df
        .filter(range_expresion)
        .select([
            pl.col("tpep_pickup_datetime").alias("pickup_datetime")
            , pl.col("PULocationID").alias("pickup_location_id")
        ])
    )

    return clean_data


validate_file(path, 2020, 1).head()

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-01.parquet
Total records: 6405008
Records deleted: 212
Percentage: 100.00%


pickup_datetime,pickup_location_id
datetime[ns],i64
2020-01-01 00:28:15,238
2020-01-01 00:35:39,239
2020-01-01 00:47:41,238
2020-01-01 00:55:23,238
2020-01-01 00:01:58,193


In [46]:
from typing import Optional

def load_raw_data(year:int, months: list[int] | None = None) -> None:
    """
    Loads raw taxi trip data for a specified year and optional list of months, validates it, and saves the validated data.

    This function downloads raw taxi trip data for the specified year and months. If no months are provided, it defaults to all months in the year.
    After downloading, it validates the data by checking if the records fall within the specified year and month(s) and then saves the validated data
    into a processed data directory in parquet format.

    Parameters:
    - year (int): The year for which to download and validate the data.
    - months (Optional[list[int]]): An optional list of integers representing the months for which to download and validate the data.
      If None, data for all months in the specified year will be processed.

    Returns:
    None. The function saves the validated data into a processed data directory without returning any value.
    """
    
    logger.info(f"Downloading data for year {year}")
    
    if months is None: 
        months = range(1, 13)
        
    if isinstance(months, int):
        months = [months]

        
    for month in tqdm(months):
        try:
            file = download_file_from_source_into_raw_folder(year, month)
            validate_file(file, year, month).write_parquet(PROCESSED_DATA_DIR / Path(FILE_PATTERN.format(year=year, month=month)))
        except requests.exceptions.HTTPError as e:
            logger.error(f"Error downloading data for year {year} and month {month}: {e}")
            continue
    logger.info(f"Data for year {year} has been downloaded and validated")
    
load_raw_data(2020)

  0%|          | 0/12 [00:00<?, ?it/s]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-01.parquet
Total records: 6405008
Records deleted: 212
Percentage: 100.00%


  8%|▊         | 1/12 [00:04<00:52,  4.73s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-02.parquet
Total records: 6299367
Records deleted: 303
Percentage: 100.00%


 17%|█▋        | 2/12 [00:10<00:55,  5.50s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-03.parquet
Total records: 3007687
Records deleted: 426
Percentage: 99.99%


 33%|███▎      | 4/12 [00:13<00:22,  2.84s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-04.parquet
Total records: 238073
Records deleted: 132
Percentage: 99.94%


 42%|████▏     | 5/12 [00:15<00:16,  2.30s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-05.parquet
Total records: 348415
Records deleted: 23
Percentage: 99.99%


 50%|█████     | 6/12 [00:15<00:10,  1.75s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-06.parquet
Total records: 549797
Records deleted: 9
Percentage: 100.00%


 58%|█████▊    | 7/12 [00:17<00:08,  1.76s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-07.parquet
Total records: 800412
Records deleted: 10
Percentage: 100.00%


 67%|██████▋   | 8/12 [00:19<00:07,  1.77s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-08.parquet
Total records: 1007286
Records deleted: 16
Percentage: 100.00%
Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-09.parquet
Total records: 1341017
Records deleted: 134
Percentage: 99.99%


 75%|███████▌  | 9/12 [00:21<00:05,  1.86s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-10.parquet
Total records: 1681132
Records deleted: 32
Percentage: 100.00%


 83%|████████▎ | 10/12 [00:24<00:04,  2.03s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-11.parquet
Total records: 1509000
Records deleted: 112
Percentage: 99.99%


 92%|█████████▏| 11/12 [00:26<00:02,  2.12s/it]

Validation for file:  C:\Users\selaf\OneDrive\Documentos\dev\real-ml-course\taxi_demand_predictor\data\raw\yellow_tripdata_2020-12.parquet
Total records: 1461898
Records deleted: 35
Percentage: 100.00%


100%|██████████| 12/12 [00:28<00:00,  2.38s/it]
