In [None]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
import json
import os

import pandas as pd
pd.set_option("display.max_columns", 30)
import requests

In [None]:
#
def formated_day_back(days: int):
    current_datetime = datetime.now() - relativedelta(days=days)
    return current_datetime.strftime("%Y-%m-%d")

In [None]:
"""
1. get the data from s3
2. weather data transformation
3. taxi trips transformation
4. update payment_type_master
5. update company_master
6. update taxi trips with company and payment_type ids (from master tables)
7. upload the newest payment_type_master and company_master
8. upload weather data to s3
9. upload taxi data to s3
"""

### Taxi trips transformation

In [None]:
formated_datetime = formated_day_back(60)

url = "https://data.cityofchicago.org/resource/ajtu-isnz.json"
params = f"$where=trip_start_timestamp>='{formated_datetime}T00:00:00' AND trip_start_timestamp<='{formated_datetime}T23:59:59'&$limit=30000"
headers = {"X-App-Token": os.environ.get("CHICAGO_API_TOKEN")}

response = requests.get(url, headers = headers, params = params)

data = response.json()

In [None]:
taxi_trips = pd.DataFrame(data)

#### taxi_trips transformation function

In [None]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    
    """ Performs transformations on a DataFrame containing taxi trip data.

    Parameters:
        taxi_trips (pd.DataFrame): A DataFrame containing taxi trip data.

    Returns:
        pd.DataFrame: The cleaned, transformed DataFrame.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips is not a valid pandas Dataframe.")
    
    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", "pickup_centroid_location", "dropoff_centroid_location"],
                    axis=1, inplace=True)

    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id",
                            "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["trip_start_timestamp"] = pd.to_datetime(taxi_trips["trip_start_timestamp"])
    taxi_trips["trip_end_timestamp"] = pd.to_datetime(taxi_trips["trip_end_timestamp"])

    taxi_trips["datetime_for_weather"] = taxi_trips["trip_start_timestamp"].dt.floor("H")

    return taxi_trips

In [None]:
taxi_trips_transformations(taxi_trips)

### The update codes of the master tables

In [None]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop = True)

company_master = pd.DataFrame(
    {
        "company_id": range(1, len(company_master) + 1),
        "company": company_master
    }
)

company_master.tail()

In [None]:
new_company_data = [
    {"company": "312 Medallion Management Corp"},
    {"company": "6574 - Babylon Express Inc."},
    {"company": "XXX"},
    {"company": "XY"}
]

new_company_mapping = pd.DataFrame(new_company_data)
new_company_mapping 

In [None]:
company_max_id = company_master["company_id"].max()

new_companies_list_more_lines = []

for company in new_company_mapping["company"].values:
    if company not in company_master["company"].values:
        new_companies_list_more_lines.append(company)

new_companies_list_more_lines

In [None]:
# in one program line
new_companies_list = \
    [company for company in new_company_mapping["company"].values if company not in company_master["company"].values]

new_companies_list

In [None]:
new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    "company": new_companies_list
})

new_companies_df

In [None]:
def update_company_master(taxi_trips, company_master):
    """
    Extends the company master dataframe with new companies found in the given taxi trips data.

    Args:
    - taxi_trips (DataFrame): the daily taxi trips data, where each row represents a trip.
    - company_master (DataFrame): the master list of taxi companies.

    Returns:
    - updated_company_master (DataFrame): the updated master list of taxi companies after adding new companies found in the taxi trips data.
    """

    company_max_id = company_master["company_id"].max()
    
    new_companies_list = \
        [company for company in taxi_trips["company"].values if company not in company_master["company"].values]
    
    new_companies_df = pd.DataFrame({
        "company_id": range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
        "company": new_companies_list
    })

    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index = True)

    return updated_company_master

In [None]:
updated_company_master = update_company_master(taxi_trips = new_company_mapping, company_master = company_master)
updated_company_master

In [None]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop = True)

payment_type_master = pd.DataFrame(
    {
        "payment_type_id": range(1, len(payment_type_master) + 1),
        "payment_type": payment_type_master
    }
)

taxi_trips_payment_type_only = pd.DataFrame(
    {
        "payment_type_id": range(1, 5),
        "payment_type": ["Credit Card", "X", "Y", "Test"]
    }
)

In [None]:
def update_payment_type_master(taxi_trips, payment_type_master):
    """
    Extends the payment_type master dataframe with new payment types found in the given taxi trips data.

    Args:
    - taxi_trips (DataFrame): the daily taxi trips data, where each row represents a trip.
    - payment_type_master (DataFrame): the master list of taxi payment types.

    Returns:
    - updated_payment_type_master (DataFrame): the updated master list of taxi payment types
        after adding new payment types found in the taxi trips data.
    """

    payment_type_max_id = payment_type_master["payment_type_id"].max()
    
    new_payment_types_list = \
        [payment_type for payment_type in taxi_trips["payment_type"].values if payment_type not in payment_type_master["payment_type"].values]
    
    new_payment_types_df = pd.DataFrame({
        "payment_type_id": range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_types_list) + 1),
        "payment_type": new_payment_types_list
    })

    updated_payment_type_master = pd.concat([payment_type_master, new_payment_types_df], ignore_index = True)

    return updated_payment_type_master

In [None]:
updated_payment_type_master = update_payment_type_master(taxi_trips = taxi_trips_payment_type_only,
                                            payment_type_master = payment_type_master)
updated_payment_type_master

### Creating a generic update master table function

In [None]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """
    Extends the master DataFrame with new values found in the given taxi trips data.

    Args:
    - taxi_trips (DataFrame): the daily taxi trips data, where each row represents a trip.
    - payment_type_master (DataFrame): the master list of taxi payment types.

    Returns:
    - updated_master (DataFrame): the updated master list
        after adding new values found in the taxi trips data.
    - id_column (str): the id column of the master DataFrame
    - value_column (str): the value column of the master DataFrame
    """

    max_id = master[id_column].max()
    
    new_values_list = \
        [value for value in taxi_trips[value_column].values if value not in master[value_column].values]
    
    new_values_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
        value_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index = True)

    return updated_master

In [None]:
updated_payment_type_master = update_master(taxi_trips = taxi_trips_payment_type_only,
                                            master = payment_type_master,
                                            id_column="payment_type_id", value_column="payment_type")
updated_payment_type_master

### update taxi_trips with the most recent master ids

In [None]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame,
                                       company_master: pd.DataFrame) -> pd.DataFrame:

    #todo: ds c197 0:29
    
    taxi_trips_id = taxi_trips.merge(payment_type_master, on="payment_type")
    taxi_trips_id = taxi_trips_id.merge(company_master, on="company")

    taxi_trips_id.drop(["payment_type", "company"], axis = 1, inplace = True)

    return taxi_trips_id

In [None]:
taxi_trips_id = update_taxi_trips_with_master_data(taxi_trips=taxi_trips, payment_type_master=payment_type_master,
                                                   company_master=company_master)

taxi_trips_id.sample(5)