In [45]:
from io import StringIO
import os

import boto3
import pandas as pd

pd.set_option("display.max_columns", 50)

In [32]:
aws_access_key_id = os.getenv("AWS_ACCESS_KEY")
aws_secret_key = os.getenv("AWS_SECRET_KEY")

In [33]:
def read_csv_from_s3(bucket: str, path: str, filename: str) -> pd.DataFrame:
    """Downloads a csv file from an S3 bucket.

    Parameters
    ----------
    bucket : str
        The bucket where the files at.
    path : str
        The folders to the file.
    filename : str
        Name of the file.

    Returns
    -------
    pd.DataFrame
        A DataFrame of the downloaded file.
    """

    s3 = boto3.client("s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_key)
    full_path = f"{path}{filename}"

    object = s3.get_object(Bucket=bucket, Key=full_path)
    object = object["Body"].read().decode("utf-8")
    output_df = pd.read_csv(StringIO(object))

    return output_df

In [34]:
s3 = boto3.client(
        "s3",
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_key
    )
bucket = "cubix-chicago-taxi-bb-rita"

community_areas_path = "transformed_data/community_areas/"
company_path = "transformed_data/company/"
date_path = "transformed_data/date/"
payment_type_path = "transformed_data/payment_type/"
taxi_trips_path = "transformed_data/taxi_trips/"
weather_path = "transformed_data/weather/"

In [35]:
community_areas = read_csv_from_s3(bucket, community_areas_path, "community_areas_master.csv")
company = read_csv_from_s3(bucket, company_path, "company_master.csv")
date = read_csv_from_s3(bucket, date_path, "date_dimension.csv")
payment_type = read_csv_from_s3(bucket=bucket, path=payment_type_path, filename="payment_type_master.csv")

In [36]:
trips_list = []
weather_list = []

In [37]:
for file in s3.list_objects(Bucket=bucket, Prefix=taxi_trips_path)['Contents']:
    taxi_trip_key = file['Key']

    if taxi_trip_key.split("/")[-1].strip() != "":
        if taxi_trip_key.split(".")[-1] == "csv":
            filename = taxi_trip_key.split("/")[-1]
            trip = read_csv_from_s3(bucket, taxi_trips_path, filename)

            trips_list.append(trip)
            print(f"{filename} has been added.")

taxi_2024-10-03.csv has been added.
taxi_2024-10-04.csv has been added.
taxi_2024-10-05.csv has been added.
taxi_2024-10-06.csv has been added.
taxi_2024-10-07.csv has been added.
taxi_2024-10-08.csv has been added.
taxi_2024-10-09.csv has been added.
taxi_2024-10-10.csv has been added.
taxi_2024-10-11.csv has been added.
taxi_2024-10-12.csv has been added.
taxi_2024-10-13.csv has been added.
taxi_2024-10-14.csv has been added.
taxi_2024-10-15.csv has been added.
taxi_2024-10-16.csv has been added.
taxi_2024-10-17.csv has been added.
taxi_2024-10-18.csv has been added.
taxi_2024-10-19.csv has been added.
taxi_2024-10-20.csv has been added.
taxi_2024-10-21.csv has been added.


In [38]:
trips = pd.concat(trips_list, ignore_index = True)

In [39]:
trips.head()
trips.info()
trips.shape

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 332805 entries, 0 to 332804
Data columns (total 20 columns):
 #   Column                      Non-Null Count   Dtype  
---  ------                      --------------   -----  
 0   trip_id                     332805 non-null  object 
 1   taxi_id                     332805 non-null  object 
 2   trip_start_timestamp        332805 non-null  object 
 3   trip_end_timestamp          332805 non-null  object 
 4   trip_seconds                332805 non-null  int64  
 5   trip_miles                  332805 non-null  float64
 6   pickup_community_area_id    332805 non-null  int64  
 7   dropoff_community_area_id   332805 non-null  int64  
 8   fare                        332805 non-null  float64
 9   tips                        332805 non-null  float64
 10  tolls                       332805 non-null  float64
 11  extras                      332805 non-null  float64
 12  trip_total                  332805 non-null  float64
 13  pickup_centroi

(332805, 20)

In [40]:
for file in s3.list_objects(Bucket=bucket, Prefix=weather_path)["Contents"]:
    weather_key = file["Key"]

    if weather_key.split("/")[-1].strip() != "":
        if weather_key.split(".")[-1] == "csv":

            filename = weather_key.split("/")[-1]
            weather_daily = read_csv_from_s3(bucket, weather_path, filename)

            weather_list.append(weather_daily)
            print(f"{filename} has been added.")

weather_2024-10-03.csv has been added.
weather_2024-10-04.csv has been added.
weather_2024-10-05.csv has been added.
weather_2024-10-06.csv has been added.
weather_2024-10-07.csv has been added.
weather_2024-10-08.csv has been added.
weather_2024-10-09.csv has been added.
weather_2024-10-10.csv has been added.
weather_2024-10-11.csv has been added.
weather_2024-10-12.csv has been added.
weather_2024-10-13.csv has been added.
weather_2024-10-14.csv has been added.
weather_2024-10-15.csv has been added.
weather_2024-10-16.csv has been added.
weather_2024-10-17.csv has been added.
weather_2024-10-18.csv has been added.
weather_2024-10-19.csv has been added.
weather_2024-10-20.csv has been added.
weather_2024-10-21.csv has been added.


In [41]:
weather = pd.concat(weather_list, ignore_index = True)

In [42]:
weather.head()
weather.info()
weather.shape

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 456 entries, 0 to 455
Data columns (total 5 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   datetime       456 non-null    object 
 1   temperature    456 non-null    float64
 2   wind_speed     456 non-null    float64
 3   rain           456 non-null    float64
 4   precipitation  456 non-null    float64
dtypes: float64(4), object(1)
memory usage: 17.9+ KB


(456, 5)

In [46]:
trips_full = pd.merge(trips, weather, left_on="datetime_for_weather", right_on="datetime", how="inner")
trips_full = trips_full.drop(columns=["datetime"])


In [47]:
trips_full = pd.merge(trips_full, company, left_on="company_id", right_on="company_id", how="inner")
trips_full = trips_full.drop(columns=["company_id"])


In [48]:

trips_full = pd.merge(trips_full, payment_type, left_on="payment_type_id", right_on="payment_type_id", how="inner")
trips_full = trips_full.drop(columns=["payment_type_id"])


In [49]:

trips_full = pd.merge(trips_full, community_areas, left_on="pickup_community_area_id", right_on="area_code", how="inner")
trips_full = trips_full.drop(columns=["pickup_community_area_id", "area_code"])
trips_full.rename(columns={"community_name": "pickup_community_area_name"}, inplace=True)


In [50]:

trips_full = pd.merge(trips_full, community_areas, left_on="dropoff_community_area_id", right_on="area_code", how="inner")
trips_full = trips_full.drop(columns=["dropoff_community_area_id", "area_code"])
trips_full.rename(columns={"community_name": "dropoff_community_area_name"}, inplace=True)

In [53]:
date['date'] = pd.to_datetime(date['date'])
trips_full['trip_start_timestamp'] = pd.to_datetime(trips_full['trip_start_timestamp'])
trips_full['trip_start_date'] = trips_full['trip_start_timestamp'].dt.date
trips_full['trip_start_date'] = pd.to_datetime(trips_full['trip_start_date'])


In [54]:

trips_full = pd.merge(trips_full, date, left_on="trip_start_date", right_on="date", how="inner")
trips_full = trips_full.drop(columns=["date"])

In [55]:
trips_full.head()
trips_full.info()
trips_full.shape

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 332329 entries, 0 to 332328
Data columns (total 30 columns):
 #   Column                       Non-Null Count   Dtype         
---  ------                       --------------   -----         
 0   trip_id                      332329 non-null  object        
 1   taxi_id                      332329 non-null  object        
 2   trip_start_timestamp         332329 non-null  datetime64[ns]
 3   trip_end_timestamp           332329 non-null  object        
 4   trip_seconds                 332329 non-null  int64         
 5   trip_miles                   332329 non-null  float64       
 6   fare                         332329 non-null  float64       
 7   tips                         332329 non-null  float64       
 8   tolls                        332329 non-null  float64       
 9   extras                       332329 non-null  float64       
 10  trip_total                   332329 non-null  float64       
 11  pickup_centroid_latitude  

(332329, 30)

In [8]:
community_areas.head()

Unnamed: 0,area_code,community_name
0,1,Rogers Park
1,2,West Ridge
2,3,Uptown
3,4,Lincoln Square
4,5,North Center


In [12]:
company.tail()

Unnamed: 0,company_id,company
30,31,3556 - 36214 RC Andrews Cab
31,32,6574 - Babylon Express Inc.
32,33,Tac - Yellow Non Color
33,34,Metro Jet Taxi A.
34,35,3591 - 63480 Chuks Cab


In [10]:
date.head()

Unnamed: 0,date,year,month,day,day_of_week,is_weekend
0,2023-01-01,2023,1,1,7,True
1,2023-01-02,2023,1,2,1,False
2,2023-01-03,2023,1,3,2,False
3,2023-01-04,2023,1,4,3,False
4,2023-01-05,2023,1,5,4,False


In [11]:
payment_type.head()

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,Cash
2,3,Unknown
3,4,Prcard
4,5,Mobile
