In [37]:
import csv
import pandas as pd
from datetime import datetime, timedelta
import os
from time import time

In [38]:
process_start = time()

In [39]:
file_taxi_rides = "./data/yellow_tripdata_2014-01/yellow_tripdata_2014-01.csv"
file_rides_groupped_hours = "./data/processing/hours_{0}.csv"
file_rides_groupped_days = "./data/processing/days_{0}.csv"
file_rides_groupped_weekdays = "./data/processing/weekdays_{0}.csv"
file_rides_groupped_weather = "./data/processing/weather_{0}.csv"

In [40]:
def store_groups(filename_template, grouped_df):
    for group in grouped_df.groups:
        if isinstance(group, float):
            group = int(group)
            
        tag = group if isinstance(group, int) else group.strftime("%Y%m%d_%H")
        file_group = filename_template.format(tag)
        if os.path.exists(file_group):
            grouped_df.get_group(group).to_csv(file_group, mode='a', header=False, index=False)
        else:
            grouped_df.get_group(group).to_csv(file_group, index=False)

In [41]:
file_weather = "./data/prepared_weather.csv"
df_weather = pd.read_csv(file_weather)
df_weather['datetime'] =  pd.to_datetime(df_weather['datetime'], format="%Y-%m-%d %H:%M:%S")
df_weather = df_weather.reset_index().set_index("datetime")
df_weather = df_weather.drop("index", axis=1)

In [42]:
extrating_columns = [
        'pickup_datetime',
        'dropoff_datetime',
        'passenger_count',
        'trip_distance',
        'payment_type',
        'tip_amount']

chunk_size = 1000000 * 3
chunks_to_process = None
count = 0
for chunk in pd.read_csv(file_taxi_rides, chunksize=chunk_size):
    start = time()

    columns_trimming = {col:col.strip() for col in chunk.columns}
    chunk = chunk.rename(columns=columns_trimming)

    # drop not neccessary columns

    columns_to_drop = [col for col in chunk.columns if col not in extrating_columns]
    chunk = chunk.drop(columns_to_drop, axis=1)

    # prepare for mapping by datetime

    chunk['pickup_datetime'] = pd.to_datetime(chunk['pickup_datetime'], format="%Y-%m-%d %H:%M:%S")

    chunk["by_weekday"] = chunk['pickup_datetime'].apply(lambda t: t.weekday())
    chunk["by_hour"] = chunk['pickup_datetime'].apply(lambda t: t.replace(second=0, microsecond=0, minute=0))
    chunk["by_day"] = chunk['pickup_datetime'].apply(lambda t: t.date())

    # join with weather

    chunk["datetime"] = chunk["by_hour"]
    chunk = chunk.reset_index().set_index("datetime")
    chunk = chunk.drop("index", axis=1)
    chunk = chunk.join(df_weather, on="datetime")

    # mapping
    hourly_groups = chunk.groupby("by_hour")
    daily_groups = chunk.groupby("by_day")
    weekly_groups = chunk.groupby("by_weekday")
    weather_groups = chunk.groupby("weather_description_code")

    store_groups(file_rides_groupped_hours, hourly_groups)
    store_groups(file_rides_groupped_days, daily_groups)
    store_groups(file_rides_groupped_weekdays, weekly_groups)
    store_groups(file_rides_groupped_weather, weather_groups)

    count += 1
    print(f"processed {count} chunk in {time()-start} sec")

    if chunks_to_process is not None and count >= chunks_to_process:
        break

processed 1 chunk in 301.39654302597046 sec
processed 2 chunk in 390.7491683959961 sec
processed 3 chunk in 301.82050490379333 sec
processed 4 chunk in 302.9894509315491 sec
processed 5 chunk in 185.72009801864624 sec


In [43]:
print(f"Done in {time()-process_start} sec")

Done in 1525.367035150528 sec
