In [61]:
import os, json,  pandas as pd, requests
from datetime import datetime
from dateutil.relativedelta import relativedelta
pd.set_option("display.max_columns", 30)

In [None]:
"""
1. get the data from S3
2. weather data transformations
3. taxi trips transformations - DONE
4. update payment_type_master - DONE
5. update company_master - DONE
6. update taxi_trips with company and payment_type ids (replace the string values with ids from the latest master tables) - DONE
7. upload weather data to s3
8. upload taxi data to s3
9. upload the newest payment_type_master and company_master
"""


'\n1. get the data from S3\n2. weather data transformations\n3. taxi trips transformations\n4. update payment_type_master\n5. update company_master\n6. update taxi_trips with company and payment_type ids (replace the string values with ids from the latest master tables)\n7. upload weather data to s3\n8. upload taxi data to s3\n9. upload the newest payment_type_master and company_master\n'

#### taxi_trips transformation

In [3]:
current_datetime = datetime.now() - relativedelta(months=2)

formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = f"http://data.cityofchicago.org/resource/ajtu-isnz.json?$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"

headers = {"X-App-Token":os.environ.get("CHICAGO_API_TOKEN")}

response = requests.get(url)

data = response.json()



In [4]:
taxi_trips = pd.DataFrame(data)

In [5]:
taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

taxi_trips.dropna(inplace=True)

taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id",
                           "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")

In [6]:
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
0,dc1a1c8dfc7133b1d5d98679ee340910f3c4ae05,ae31f2151e8b85c5c1981e09c227aa451a6fda9b739d63...,2024-10-16T23:45:00.000,2024-10-16T23:45:00.000,9,0.04,32,32,50.0,7.58,0,0,58.08,Credit Card,Blue Ribbon Taxi Association,41.878865584,-87.625192142,41.878865584,-87.625192142,2024-10-16 23:00:00
1,e6dc8dad7d81d22aaa83db5f98d83e82e5de084a,46c4778c3a32454511f96752c509c608955e293da45dbf...,2024-10-16T23:45:00.000,2024-10-17T00:00:00.000,1380,17.8,76,8,44.0,9.7,0,4,57.7,Credit Card,Taxi Affiliation Services,41.980264315,-87.913624596,41.899602111,-87.633308037,2024-10-16 23:00:00
2,f1b57c2ec9082f32a5c25c8e4919ea7b2ef281db,f1595f3f9536df1d8d7df672cb3f0835fe79e1754f6c7a...,2024-10-16T23:45:00.000,2024-10-17T00:00:00.000,1135,6.52,3,8,19.75,0.0,0,0,19.75,Prcard,Flash Cab,41.96581197,-87.655878786,41.899602111,-87.633308037,2024-10-16 23:00:00
3,fbc5f1ddd31521f328abb59655a96d085830bbf0,6fa0364cac8d604e8a7613c08fdcbe5bfa05e361d4a758...,2024-10-16T23:45:00.000,2024-10-17T00:15:00.000,1747,14.7,28,50,38.25,0.0,0,0,38.25,Prcard,Flash Cab,41.874005383,-87.66351755,41.706125752,-87.598255838,2024-10-16 23:00:00
5,00f2ac10b0bd2f5a9792611b2535002f94ffecc8,5adbc97abe353b4ad223abfb31d0311bd30800e15f43a2...,2024-10-16T23:45:00.000,2024-10-17T00:15:00.000,1383,15.93,76,8,40.5,11.25,0,4,56.25,Credit Card,Taxicab Insurance Agency Llc,41.980264315,-87.913624596,41.899602111,-87.633308037,2024-10-16 23:00:00


#### taxi_trips transformation function

In [7]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Perform transformation with the taxi data.

    Args:
        taxi_trips (pd.DataFrame): the DataFrame holding the daily taxi trips

    Returns:
        pd.DataFrame: The cleaned, transformed DataFrame holding the daily taxi trips.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips is not a valid pandas DataFrame.")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", 
                     "pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id",
                            "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")

    return taxi_trips

#### company update codes

In [10]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop=True)

company_master = pd.DataFrame(
    {
        "company_id": range(1, len(company_master) + 1),
        "company": company_master
    }
)
company_master.tail()

Unnamed: 0,company_id,company
29,30,Tac - Yellow Non Color
30,31,5167 - 71969 5167 Taxi Inc
31,32,Star North Taxi Management Llc
32,33,Metro Jet Taxi A.
33,34,4053 - 40193 Adwar H. Nikola


In [11]:
new_company_data = [
    {"company": "6574 - Babylon Express Inc."},
    {"company": "x"},
    {"company": "y"}
    
]

new_company_mapping = pd.DataFrame(new_company_data)
new_company_mapping

Unnamed: 0,company
0,6574 - Babylon Express Inc.
1,x
2,y


In [16]:
company_max_id = company_master["company_id"].max()
company_max_id

np.int64(34)

In [None]:
new_companies_list = []

# for company in new_company_mapping["company"].values:
#     if company not in company_master["company"].values:
#         new_companies_list.append(company)

# one line
new_companies_list = [company for company in new_company_mapping["company"].values if company not in company_master["company"].values]
new_companies_list

['x', 'y']

In [17]:
new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    "company": new_companies_list
})

new_companies_df

Unnamed: 0,company_id,company
0,35,x
1,36,y


In [18]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

updated_company_master.tail()

Unnamed: 0,company_id,company
31,32,Star North Taxi Management Llc
32,33,Metro Jet Taxi A.
33,34,4053 - 40193 Adwar H. Nikola
34,35,x
35,36,y


In [34]:
def update_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the company master with new companies if there are new companies.

    Args:
        taxi_trips (pd.DataFrame): 
            - DateFrame holding the daily taxi trips.
        company_master (pd.DataFrame): 
            - DateFrame holding the company_master data.

    Returns:
        pd.DataFrame: 
            - The updated company_master data, if new companies are in the taxi data, they will be loaded to it.
    """

    company_max_id = company_master["company_id"].max()
    
    new_companies_list = [company for company in taxi_trips["company"].values if company not in company_master["company"].values]
    new_companies_df = pd.DataFrame({
        "company_id": range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
        "company": new_companies_list
    })

    new_companies_df

    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

    return updated_company_master



    


In [35]:
taxi_trips_company_only = pd.DataFrame({
    "company_id": [1,2,3],
    "company": ["Star North Taxi Management Llc", "X", "Y"]
})

taxi_trips_company_only

Unnamed: 0,company_id,company
0,1,Star North Taxi Management Llc
1,2,X
2,3,Y


In [36]:
updated_company_master = update_company_master(taxi_trips=taxi_trips_company_only, company_master=company_master)

In [37]:
updated_company_master.tail()

Unnamed: 0,company_id,company
31,32,Star North Taxi Management Llc
32,33,Metro Jet Taxi A.
33,34,4053 - 40193 Adwar H. Nikola
34,35,X
35,36,Y


#### payment_type_master codes

In [41]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop=True)

payment_type_master = pd.DataFrame(
    {
        "payment_type_id": range(1, len(payment_type_master) + 1),
        "payment_type": payment_type_master
    }
)

taxi_trips_payment_type_only = pd.DataFrame({
    "payment_type_id": [1,2,3],
    "payment_type": ["Credit Card", "X", "Y"]
})

taxi_trips_payment_type_only


Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,X
2,3,Y


In [42]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the payment type master with new payment types if there are new payment types.

    Args:
        taxi_trips (pd.DataFrame): 
            - DateFrame holding the daily taxi trips.
        payment_type_master (pd.DataFrame): 
            - DateFrame holding the payment_type_master data.

    Returns:
        pd.DataFrame: 
            - The updated payment_type_master data, if new payment types are in the taxi data, they will be loaded to it.
    """

    payment_type_max_id = payment_type_master["payment_type_id"].max()
    
    new_payment_types_list = [payment_type for payment_type in taxi_trips["payment_type"].values if payment_type not in payment_type_master["payment_type"].values]
    new_payment_type_df = pd.DataFrame({
        "payment_type_id": range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_types_list) + 1),
        "payment_type": new_payment_types_list
    })

    new_payment_type_df

    updated_payment_type_master = pd.concat([payment_type_master, new_payment_type_df], ignore_index=True)

    return updated_payment_type_master



    


In [44]:
updated_payment_type_master = update_payment_type_master(taxi_trips=taxi_trips_payment_type_only, payment_type_master=payment_type_master)
updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,Prcard
2,3,Cash
3,4,Mobile
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,X
8,9,Y


### Creating a generic update master table function

In [None]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_columns: str, value_column: str) -> pd.DataFrame:
    """Extend the master DataFrame with new values if there are any.

    Args:
        taxi_trips (pd.DataFrame): 
            - DateFrame holding the daily taxi trips.
        master (pd.DataFrame): 
            - DateFrame holding the master data.
        id_columns (str):
            - The id column of the master DataFrame.
        value_column (str):
            - Name of the column on master_df containing the values.
    Returns:
        pd.DataFrame: 
            - The updated master data, if new values are in the taxi data, they will be loaded to it.
    """

    max_id = master[id_columns].max()
    
    new_values_list = list(set(taxi_trips[value_column].values) - set(master[value_column].values))
    new_values_df = pd.DataFrame({
        id_columns: range(max_id + 1, max_id + len(new_values_list) + 1),
        value_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master



    


In [48]:
test_payment_type_master = update_master(taxi_trips=taxi_trips_payment_type_only, master=payment_type_master, id_columns="payment_type_id", value_column="payment_type")

In [49]:
test_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,Prcard
2,3,Cash
3,4,Mobile
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,X
8,9,Y


In [50]:
test_company_master = update_master(taxi_trips=taxi_trips_company_only, master=company_master, id_columns="company_id", value_column="company")

In [51]:
test_company_master.tail()

Unnamed: 0,company_id,company
31,32,Star North Taxi Management Llc
32,33,Metro Jet Taxi A.
33,34,4053 - 40193 Adwar H. Nikola
34,35,X
35,36,Y


### update taxi_trips with the most recent company_master and payment_type master function

In [59]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Update the tayi_trips DataFrame with the company_master and payment_type_master ids, and delete the string columns.
    Args:
        taxi_trips (pd.DataFrame):
            - The DataFrame with the daily taxi trips.
        payment_type_master (pd.DataFrame):
            - The payment type master table.
        company_master (pd.DataFrame):
            - The company master table.

    Returns:
        pd.DataFrame:
            - The taxi trips data, with only payment_type_id and company_id, without company or payment_type values.
    """


    taxi_trips_id = taxi_trips.merge(payment_type_master, on="payment_type")
    taxi_trips_id = taxi_trips_id.merge(company_master, on="company")

    taxi_trips_id.drop(["payment_type", "company"], axis=1, inplace=True)

    return taxi_trips_id

In [60]:
taxi_trips_id = update_taxi_trips_with_master_data(taxi_trips=taxi_trips, payment_type_master=payment_type_master, company_master=company_master)

taxi_trips_id.sample(5)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
10557,9eca159e80f068cd5f5dd1adcfd7c5f3082a9dfb,f8f44c9d76773e3c761356078d786b151d2f206685454f...,2024-10-16T13:45:00.000,2024-10-16T14:00:00.000,420,1.1,8,8,6.75,0.0,0,1,7.75,41.890922026,-87.618868355,41.891971508,-87.612945414,2024-10-16 13:00:00,3,2
10477,e09e0cb5c24b94834b124b09c2d2ed26b05b1558,0e8e342e2e2db3801515e84538a3901dbea9df9cd1716b...,2024-10-16T13:45:00.000,2024-10-16T14:15:00.000,2100,17.1,32,76,42.5,0.0,0,8,50.5,41.880994471,-87.632746489,41.97907082,-87.903039661,2024-10-16 13:00:00,3,2
13396,a328bd521739f084a96c1cc92b499248705dc690,fc68d8f57ebef02a03efc9ed6c5ebd4488403fa6458091...,2024-10-16T11:30:00.000,2024-10-16T11:45:00.000,591,2.01,33,8,9.0,2.96,0,0,11.96,41.859349715,-87.617358006,41.890922026,-87.618868355,2024-10-16 11:00:00,4,4
421,bec9a2263753656a4eee9324581b355277c16734,356744753023949fa63be6a6d7e0aef31f2d50dd3b0258...,2024-10-16T22:30:00.000,2024-10-16T23:00:00.000,1493,2.9,46,51,15.75,0.0,0,0,15.75,41.741242728,-87.551428197,41.690633347,-87.570058269,2024-10-16 22:00:00,2,3
8759,0c08c207b3fa6294d682e3dde66fd40f6bb606e2,19b4ae2f19dd457dced6a79a9a5feaf09ce871a00d2374...,2024-10-16T15:15:00.000,2024-10-16T15:15:00.000,435,0.9,8,8,9.0,2.3,0,0,11.3,41.899602111,-87.633308037,41.899602111,-87.633308037,2024-10-16 15:00:00,4,10


### weather transformations function

In [62]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make transformations on the daily weather api response.

    Args:
        weather_data (json):
            - The daily weather data from the Open Meteo API.

    Returns:
        pd.DataFrame:
            - A DataFrame representation of the data.
    """

    weather_data_filtered = {
        "datetime":weather_data['hourly']['time'],
        "tempretaure":weather_data['hourly']['temperature_2m'],
        "wind_speed":weather_data['hourly']['wind_speed_10m'],
        "rain":weather_data['hourly']['rain'],
        "precipitation":weather_data['hourly']['precipitation']
    }

    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df


In [63]:
# Test
current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = "https://archive-api.open-meteo.com/v1/era5"

params = {
    "latitude":41.85,
    "longitude": -87.65,
    "start_date": formatted_datetime,
    "end_date": formatted_datetime,
    "hourly": "temperature_2m,wind_speed_10m,precipitation,rain"
}

response = requests.get(url, params=params)

weather_data = response.json()

weather_data_df = transform_weather_data(weather_data)

In [64]:
weather_data_df.head()

Unnamed: 0,datetime,tempretaure,wind_speed,rain,precipitation
0,2024-10-16 00:00:00,9.0,27.1,0.0,0.0
1,2024-10-16 01:00:00,8.2,22.3,0.0,0.0
2,2024-10-16 02:00:00,7.6,26.1,0.0,0.0
3,2024-10-16 03:00:00,7.4,25.3,0.0,0.0
4,2024-10-16 04:00:00,7.1,24.2,0.0,0.0
