In [95]:
import pandas as pd
import requests

import os 

import json

pd.set_option('display.max.columns', 30)

from datetime import datetime
from dateutil.relativedelta import relativedelta 

In [18]:
""" 
1. get  the data from S3
2. weather_data transformations
3. taxi_trips transformations - Done
4. update payment_type_master - Done
5. update company_master - Done 
6. update taxi_trips with company and payment_type ids (replace the string values with ids from the latest master tables) - DONE
7. upload weather data to s3
8. upload taxi_data to s3
9. upload the newest payment_type_master and company_master  

"""

' \n1. get  the data from S3\n2. weather_data transformations\n3. taxi_trips transformations\n4. update payment_type_master\n5. update company_master\n6. update taxi_trips with company and payment_type ids (replace the string values with ids from the latest master tables)\n7. upload weather data to s3\n8. upload taxi_data to s3\n9. upload the newest payment_type_master and company_master  \n\n'

### taxi_trips transformation codes

In [19]:
current_datetime = datetime.now() - relativedelta(months=8)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = (
    f"https://data.cityofchicago.org/resource/wrvz-psew.json?"
    f"$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00'"
    f"AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"
)

#headers  = {"X-App-Token": os.environ.get("CHICAGO_API_TOKEN")}

response = requests.get(url) 

data = response.json()

In [22]:
taxi_trips = pd.DataFrame(data)

In [23]:
taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract'], axis=1, inplace=True)
taxi_trips.drop(['pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)

taxi_trips.dropna(inplace=True)

taxi_trips.rename(columns={'pickup_community_area':'pickup_community_area_id',
                           'dropoff_community_area': 'dropoff_community_area_id'}, inplace=True)

taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')


In [24]:
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
0,9be5c6b78465f2c2d7e828e4539705c1858589a3,2ac720a815ee33389a480014503ab50ff33b7008f55397...,2023-12-09T23:45:00.000,2023-12-10T00:00:00.000,788,2.82,7,8,11.0,0.0,0,1,12.0,Cash,Taxicab Insurance Agency Llc,41.922686284,-87.649488729,41.899602111,-87.633308037,2023-12-09 23:00:00
1,00f01352b5038e992460f3fccf366e362148ad15,5cdde36a39ded2651da1686c7813baf589dac6eb873894...,2023-12-09T23:45:00.000,2023-12-10T00:00:00.000,840,0.1,8,28,11.0,0.0,0,1,12.0,Cash,Taxi Affiliation Services,41.899602111,-87.633308037,41.874005383,-87.66351755,2023-12-09 23:00:00
2,03bb341d718d7d7448d595497416a9812b3a528a,ca83178fa4ea3bcd66fe18442103abfb68222d7a440868...,2023-12-09T23:45:00.000,2023-12-10T00:00:00.000,173,0.51,8,32,4.75,0.0,0,1,5.75,Cash,Flash Cab,41.892042136,-87.63186395,41.884987192,-87.620992913,2023-12-09 23:00:00
3,06146c3fca9e8fdf2b1023d2c9cf2a02a9de09a6,e1cf6c401b0eff9128dad27435a344db33ed69d2be0a63...,2023-12-09T23:45:00.000,2023-12-10T00:00:00.000,317,2.99,8,7,17.25,5.44,0,0,22.69,Mobile,5 Star Taxi,41.899602111,-87.633308037,41.922686284,-87.649488729,2023-12-09 23:00:00
4,028b81029a35f04bac977638d16a0f1e500521f5,2a342ab47780aec1f79d04cc913ac20c099335d24571cd...,2023-12-09T23:45:00.000,2023-12-09T23:45:00.000,6,0.0,6,6,13.67,3.0,0,0,16.67,Mobile,Sun Taxi,41.944226601,-87.655998182,41.944226601,-87.655998182,2023-12-09 23:00:00


### taxi_trips tarnsformation function

In [None]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """ Perform transformation with the taxi data.
    
    Parameters
    ----------
    taxi_trips : pd.DataFrame
        The DataFrame holding the daily taxi trips

    Returns   
    -------   
    pd.DataFrame
       The cleaned, transformed DataFrame holding the daily taxi trips.     
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError('taxi_trips is not a valid pandas DataFrame.')

    taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract',
                     'pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)
    #taxi_trips.drop(['pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)

    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={'pickup_community_area':'pickup_community_area_id',
                            'dropoff_community_area': 'dropoff_community_area_id'}, inplace=True)

    taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')

    return taxi_trips

### company update codes

In [25]:
company_master = taxi_trips['company'].drop_duplicates().reset_index(drop=True)

company_master = pd.DataFrame(
    {
        'company_id':  range(1, len(company_master) + 1),
        'company': company_master
    }
)
company_master.tail()

Unnamed: 0,company_id,company
24,25,2733 - 74600 Benny Jona
25,26,6574 - Babylon Express Inc.
26,27,5167 - 71969 5167 Taxi Inc
27,28,Metro Jet Taxi A.
28,29,Petani Cab Corp


In [26]:
new_company_data = [
    {'company': '6574 - Babylon Express Inc.'},
    {'company': 'x'},
    {'company':'y'}
]

new_company_mapping = pd.DataFrame(new_company_data)

new_company_mapping

Unnamed: 0,company
0,6574 - Babylon Express Inc.
1,x
2,y


In [41]:
company_max_id = company_master['company_id'].max()
company_max_id

np.int64(29)

In [42]:
new_companies_list = []

for company in new_company_mapping['company'].values:
    if company not in company_master['company'].values:
        new_company_list.append(company)


# one line
new_companies_list = [company for company in new_company_mapping['company'].values if company not in company_master['company'].values]

new_companies_list

['x', 'y']

In [43]:
new_companies_df = pd.DataFrame({
    'company_id': range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    'company': new_companies_list
})

new_companies_df

Unnamed: 0,company_id,company
0,30,x
1,31,y


In [44]:
update_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

update_company_master.tail()

Unnamed: 0,company_id,company
26,27,5167 - 71969 5167 Taxi Inc
27,28,Metro Jet Taxi A.
28,29,Petani Cab Corp
29,30,x
30,31,y


In [51]:
def update_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """ Extend the company master with new companies if there are new companies.
    
    Parameters
    ----------
    taxi_trips : pd.DataFrame
        Dataframe holding the daily taxi trips.
    company_master : pd.DataFrame   
        DataFrame holding the company_master data.

    Returns   
    -------   
    pd.DataFrame
       The updated company_master data, if new companies are in the taxi data, they will be loaded to it.    
    """
    
    company_max_id = company_master['company_id'].max()

    new_companies_list = [company for company in taxi_trips['company'].values if company not in company_master['company'].values]
    new_companies_df = pd.DataFrame({
        'company_id': range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
        'company': new_companies_list
    })

    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

    return updated_company_master

In [52]:
taxi_trips_company_only = pd.DataFrame({
    'company_id': [1, 2, 3],
    'company': ['6574 - Babylon Express Inc.','X','Y']
})


taxi_trips_company_only

Unnamed: 0,company_id,company
0,1,6574 - Babylon Express Inc.
1,2,X
2,3,Y


In [53]:
update_company_master = update_company_master(taxi_trips=taxi_trips_company_only, company_master=company_master)

In [54]:
update_company_master.tail()

Unnamed: 0,company_id,company
26,27,5167 - 71969 5167 Taxi Inc
27,28,Metro Jet Taxi A.
28,29,Petani Cab Corp
29,30,X
30,31,Y


### payment_type_master codes

In [57]:
payment_type_master = taxi_trips['payment_type'].drop_duplicates().reset_index(drop=True)

payment_type_master = pd.DataFrame(
    {
        'payment_type_id':  range(1, len(payment_type_master) + 1),
        'payment_type': payment_type_master
    }
)

taxi_trips_payment_type_only = pd.DataFrame({
    'payment_type_id': [1, 2, 3],
    'payment_type': ['Credit Card.','X','Y']
})

taxi_trips_payment_type_only

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card.
1,2,X
2,3,Y


In [58]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """ Extend the payment_type_master with new payment types if there are new payment types.
    
    Parameters
    ----------
    taxi_trips : pd.DataFrame
        Dataframe holding the daily taxi trips.
    payment_type_master : pd.DataFrame   
        DataFrame holding the payment_type_master data.

    Returns   
    -------   
    pd.DataFrame
       The updated payment_type_master data, if new payment types are in the taxi data, they will be loaded to it.    
    """
    
    payment_type_max_id = payment_type_master['payment_type_id'].max()

    new_payment_types_list = [payment_type for payment_type in taxi_trips['payment_type'].values if payment_type not in payment_type_master['payment_type'].values]
    new_payment_type_df = pd.DataFrame({
        'payment_type_id': range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_types_list) + 1),
        'payment_type': new_payment_types_list
    })

    updated_payment_type_master = pd.concat([payment_type_master, new_payment_type_df], ignore_index=True)

    return updated_payment_type_master

In [60]:
updated_payment_type_master = update_payment_type_master(taxi_trips=taxi_trips_payment_type_only, payment_type_master=payment_type_master)

In [61]:
updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,Mobile
2,3,Credit Card
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Credit Card.
7,8,X
8,9,Y


### Creating a generic update master table function

In [69]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, values_column: str) -> pd.DataFrame:
    """ Extend the master DataFrame with new values if there are any.
    
    Parameters
    ----------
    taxi_trips : pd.DataFrame
        Dataframe holding the daily taxi trips.
    master : pd.DataFrame   
        DataFrame holding the master data.
    id_column: str
        The id_column of the master DataFrame.
    values_column: str
        Name of the column in master_df containing the values.

    Returns   
    -------   
    pd.DataFrame
       The updated master data, if new values are in the taxi data, they will be loaded to it.    
    """
    
    max_id = master[id_column].max()

    new_values_list = [value for value in taxi_trips[values_column].values if value not in master[values_column].values]
    new_values_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
        values_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master

In [73]:
test_payment_type_master = update_master(taxi_trips=taxi_trips_payment_type_only, master=payment_type_master, id_column='payment_type_id', values_column='payment_type')

In [74]:
test_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,Mobile
2,3,Credit Card
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Credit Card.
7,8,X
8,9,Y


In [76]:
tast_company_master = update_master(taxi_trips=taxi_trips_company_only, master=company_master, id_column='company_id', values_column='company')

In [78]:
tast_company_master.tail()

Unnamed: 0,company_id,company
26,27,5167 - 71969 5167 Taxi Inc
27,28,Metro Jet Taxi A.
28,29,Petani Cab Corp
29,30,X
30,31,Y


### update taxi_trip with the most recent company_master and payment_type master function

In [90]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Update the taxi_trips DataFrame with the company_master and payment_type_master ids, and delete the string column.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        The DataFrame with the daily taxi trips.
    payment_type_master : pd.DataFrame
        Tha payment type master table.
    company_master : pd.DataFrame
        The company master table.

    Returns
    -------        
    pd.DataFrame
        Tha taxi_trips data, with only paymant_type id and company_id, without company on payment_type values.
    """

    taxi_trips_id = taxi_trips.merge(payment_type_master, on='payment_type')

    taxi_trips_id = taxi_trips_id.merge(company_master, on='company')

    taxi_trips_id.drop(['payment_type', 'company'], axis = 1, inplace = True) 

    return taxi_trips_id

In [92]:
taxi_trips_id_id = update_taxi_trips_with_master_data(taxi_trips=taxi_trips, payment_type_master=payment_type_master,company_master=company_master)

taxi_trips_id_id.sample(5)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
9149,9fae03b50107a9ac153b05e967fdffe58c429d74,3a4ea954d4316a9f964becc37dbd73ecd4cd47cb9b8948...,2023-12-09T12:30:00.000,2023-12-09T12:45:00.000,1105,9.43,73,38,26.25,0,0,0,26.25,41.717493036,-87.648895072,41.812948939,-87.617859676,2023-12-09 12:00:00,4,3
12601,41c20bc548fa9a64a61182ae47bdd6e8c17a9b67,dff38396cdba045286033d298070468e3bfcd63550d4bd...,2023-12-09T01:45:00.000,2023-12-09T02:00:00.000,857,6.28,8,5,18.75,0,0,1,19.75,41.899602111,-87.633308037,41.947791586,-87.683834942,2023-12-09 01:00:00,1,5
12123,270c32e7333ad422d3ae0f846026ecbc3419bb95,6541195d72fc76851f5588fc585d48acbc5228936f6c9a...,2023-12-09T05:30:00.000,2023-12-09T05:45:00.000,1199,10.88,54,41,27.75,0,0,0,27.75,41.660136051,-87.60284764,41.794090253,-87.592310855,2023-12-09 05:00:00,4,9
4766,a4407b2ea3c08d94f269677ab3e425af5135405a,8aa56dbaf30cae892a59e732f54d5eb6694d054fd459e3...,2023-12-09T17:30:00.000,2023-12-09T18:00:00.000,1876,10.71,35,62,29.25,0,0,0,29.25,41.835117986,-87.618677767,41.792981903,-87.724208194,2023-12-09 17:00:00,4,3
2469,fb72af70260d4921f7bd5a4e0446bc10a0975cb6,e30325d7d21a95e76154ac70fd5f3cc84547b05fc66042...,2023-12-09T19:45:00.000,2023-12-09T20:00:00.000,1087,2.62,8,28,11.75,2,0,0,14.25,41.892042136,-87.63186395,41.88528132,-87.6572332,2023-12-09 19:00:00,3,7


### Weather transformations function

In [97]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make transformations on the daily weather API response.

    Parameters
    ----------
    weather_data : JSON
        The daily weather_data from the Open Meteo API

    Returns
    -------
    pd.DataFrame
        A DataFrame represontation of the data.
    """
    # Típusellenőrzés (ha szükséges)
    if isinstance(weather_data, str):
        weather_data = json.loads(weather_data)
    
    # Kulcsellenőrzés
    if 'hourly' not in weather_data:
        raise ValueError("Missing 'hourly' key in weather_data")
    
    required_keys = ['time', 'temperature_2m', 'wind_speed_10m', 'precipitation', 'rain']
    for key in required_keys:
        if key not in weather_data['hourly']:
            raise ValueError(f"Missing '{key}' key in weather_data['hourly']")

    weather_data_filtered = {
        'datatime': weather_data['hourly']['time'],
        'temperature': weather_data['hourly']['temperature_2m'],
        'wind_speed':weather_data['hourly']['wind_speed_10m'],
        'precipitation':weather_data['hourly']['precipitation'],
        'rain':weather_data['hourly']['rain'],
    }

    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df['datatime'] = pd.to_datetime(weather_df['datatime'])

    return weather_df

In [101]:
#test 


current_datetime = datetime.now() - relativedelta(months=8)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url ='https://archive-api.open-meteo.com/v1/era5'

params = {
    'latitude': 41.85,
    'longitude': -87.65,
    'start_date': formatted_datetime,
    'end_date': formatted_datetime,
    'hourly': 'temperature_2m,wind_speed_10m,precipitation,rain'
}

response = requests.get(url, params = params)

weather_data = response.json()

weather_data_df = transform_weather_data(weather_data)

In [102]:
weather_data_df.head()

Unnamed: 0,datatime,temperature,wind_speed,precipitation,rain
0,2023-12-10 00:00:00,5.4,23.9,0.0,0.0
1,2023-12-10 01:00:00,4.4,29.5,0.0,0.0
2,2023-12-10 02:00:00,3.5,26.3,0.0,0.0
3,2023-12-10 03:00:00,3.3,26.0,0.0,0.0
4,2023-12-10 04:00:00,2.6,26.4,0.0,0.0
