In [None]:
import pandas as pd
import requests
import os
from datetime import datetime
from dateutil.relativedelta import relativedelta
import json

In [None]:
""""
1. get the data from S3
2. weather data trnasformations
3. taxi trip transformations
4. update payment type master
5. update company master
6. update taxi trips with payment and company ids
7. upload weather data to s3
8. upload taxi data to s3
9. upload latest company and payment master
"""

### Taxi trip transformation

In [None]:
current_datetime = datetime.now() - relativedelta(months=2)

formatted_datetime = current_datetime.strftime("%Y-%m-%d")

# headers ={"X-App Token": os.environ.get("CHICAGO_API_TOKEN")}

url = (
    f"https://data.cityofchicago.org/resource/ajtu-isnz.json?"
    f"$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' "
    f"AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000")


response = requests.get(url)

data = response.json()

taxi_trips = pd.DataFrame(data)

In [None]:
taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)
taxi_trips.dropna(inplace=True)

taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")
# taxi_trips["datetime_for_weather"] = taxi_trips["datetime_for_weather"].dt.floor("h")

#### Taxi trip transform: function

In [None]:
def taxi_trip_tranformation(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Perform transformation with taxi data

    Parameters
    ----------
    taxi_trips: pd.DataFrame
        dataframe holding the daily tayi infos

    Returns
    -------
    pd.DataFrame
        cleaned, transformed dataframe
    """
    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi trips is not a valid dataframe")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
    taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)
    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")

    return taxi_trips

#### company update codes

In [None]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop=True)

company_master = pd.DataFrame(
    {
        "company_id" : range(1, len(company_master) + 1),
        "company": company_master
    })

company_master.tail()

In [None]:
new_company_data = [
    {"company": "Sun Taxi"},
    {"company": "X"},
    {"company": "Y"}]

new_company_mapping = pd.DataFrame(new_company_data)

new_company_mapping

In [None]:
company_max_id = company_master["company_id"].max()

In [None]:
new_companies_list = []

for company in new_company_mapping["company"].values:
    if company not in company_master["company"].values:
        new_companies_list.append(company)


In [None]:
new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    "company": new_companies_list
})

new_companies_df

In [None]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

updated_company_master.tail()

In [None]:
def update_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """

    Parameters
    ----------

    Returns
    -------

    """
    company_max_id = company_master["company_id"].max()

    new_companies_list = []

    for company in taxi_trips["company"].values:
        if company not in company_master["company"].values:
            new_companies_list.append(company)

    new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    "company": new_companies_list
    })
    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

    return updated_company_master

In [None]:
taxi_trips_only_company = pd.DataFrame({
    "company_id": [1, 2, 3],
    "company": ["Sun Tax", "X", "Y"]
})

updated_company_master = update_company_master(taxi_trips=taxi_trips_only_company, company_master=company_master)

### paymanet type master codes

In [None]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop=True)

payment_type_master = pd.DataFrame(
    {
        "payment_type_id" : range(1, len(payment_type_master) + 1),
        "payment_type": payment_type_master
    })


taxi_trips_paymant_type_only = pd.DataFrame({
    "payment_type_id": [1, 2, 3],
    "payment_type": ["Credit Card", "X", "Y"]
})


In [None]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """

    Parameters
    ----------

    Returns
    -------

    """
    payment_type_max_id = payment_type_master["payment_type_id"].max()

    new_payment_type_list = []

    for payment_type in taxi_trips["payment_type"].values:
        if payment_type not in payment_type_master["payment_type"].values:
            new_payment_type_list.append(payment_type)

    new_payment_type_df = pd.DataFrame({
    "payment_type_id": range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_type_df) + 1),
    "payment_type": new_payment_type_list
    })

    updated_payment_master = pd.concat([payment_type_master, new_payment_type_df], ignore_index=True)

    return updated_payment_master

### Create general master table function

In [None]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """

    Parameters
    ----------

    Returns
    -------

    """
    max_id = master[id_column].max()

    new_values_list = []

    for value in taxi_trips[value_column].values:
        if value not in master[value_column].values:
            new_values_list.append(value)

    new_values_df = pd.DataFrame({
    id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
    value_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master

### update taxi_trips with most recent masters

In [None]:
def update_taxi_trips_with_masters(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd. DataFrame) -> pd.DataFrame:
    """_summary_

    Args:
        taxi_trips (pd.DataFrame): _description_
        payment_type_master (pd.DataFrame): _description_
        company_master (pd.DataFrame): _description_

    Returns:
        pd.DataFrame: _description_
    """

    taxi_trips_id = taxi_trips.merge(payment_type_master, on = "payment_type")

    taxi_trips_id = taxi_trips_id.merge(company_master, on = "company")

    taxi_trips_id.drop(["payment_type", "company"], axis = 1, inplace=True)

    return taxi_trips_id

In [None]:
taxi_trips_id = update_taxi_trips_with_masters(taxi_trips=taxi_trips, payment_type_master=payment_type_master, company_master=company_master)

taxi_trips_id.sample(5)

### weather transformation function

In [None]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make tranforms on daily api weather responses

    Parameters:
        weather_data: (JSON)

    Returns:
        pd.DataFrame: _description_
    """

    weather_data_filtered = {"date_time": weather_data["hourly"]["time"], 
        "temperature": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation" : weather_data["hourly"]["precipitation"] }


    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df["date_time"] = pd.to_datetime(weather_df["date_time"])

    return weather_df


In [None]:
# Test

url_mod = "https://archive-api.open-meteo.com/v1/era5"

current_datetime = datetime.now() - relativedelta(months=2)

formatted_datetime = current_datetime.strftime("%Y-%m-%d")

params = {
    "latitude" : 41.85, 
    "longitude" : -87.65,
    "start_date" : formatted_datetime,
    "end_date" : formatted_datetime,
    "hourly" : "temperature_2m,wind_speed_10m,rain,precipitation"
    }

response_mod = requests.get(url_mod, params=params)

weather_data = response_mod.json()

weather_data_df = transform_weather_data(weather_data=weather_data)

In [None]:
weather_data_df.sample(5)