In [None]:
import json
from io import StringIO
import boto3
import pandas as pd

def read_json_from_s3(bucket: str, key_name: str) -> dict:
    """Evaluate json file from s3

    Parameters
    ----------
    bucket: str
        The name of bucket where the file has been stored
    
    key_name: str
        name of file

    Returns
    -------
    dictionary
        transformed data from json
    """
    
    s3 = boto3.client("s3")
    response = s3.get_object(Bucket=bucket, Key=key_name)
    content = response["Body"]
    taxi_trips_data_json = json.loads(content.read())

    return taxi_trips_data_json

def taxi_trip_tranformation(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Perform transformation with taxi data

    Parameters
    ----------
    taxi_trips: pd.DataFrame
        dataframe holding the daily tayi infos

    Returns
    -------
    pd.DataFrame
        cleaned, transformed dataframe
    """
    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi trips is not a valid dataframe")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
    taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)
    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")

    return taxi_trips

def update_taxi_trips_with_masters(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd. DataFrame) -> pd.DataFrame:
    """_summary_

    Args:
        taxi_trips (pd.DataFrame): _description_
        payment_type_master (pd.DataFrame): _description_
        company_master (pd.DataFrame): _description_

    Returns:
        pd.DataFrame: _description_
    """

    taxi_trips_id = taxi_trips.merge(payment_type_master, on = "payment_type")

    taxi_trips_id = taxi_trips_id.merge(company_master, on = "company")

    taxi_trips_id.drop(["payment_type", "company"], axis = 1, inplace=True)

    return taxi_trips_id

def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """

    Parameters
    ----------

    Returns
    -------

    """
    max_id = master[id_column].max()

    new_values_list = []

    for value in taxi_trips[value_column].values:
        if value not in master[value_column].values:
            new_values_list.append(value)

    new_values_df = pd.DataFrame({
    id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
    value_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master

def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make tranforms on daily api weather responses

    Parameters:
        weather_data: (JSON)

    Returns:
        pd.DataFrame: _description_
    """

    weather_data_filtered = {"date_time": weather_data["hourly"]["time"], 
        "temperature": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation" : weather_data["hourly"]["precipitation"] }


    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df["date_time"] = pd.to_datetime(weather_df["date_time"])

    return weather_df

def read_csv_from_s3(bucket: str, path: str, filename: str) -> pd.DataFrame:
    """Downloads a csv file from an S3 bucket.

    Parameters
     ----------
    bucket : str 
        The bucket where the files at.

    path : str
         The folders to the file.

    filename : str
        Name of the file.

    Returns
    ------
    pd.DataFrame
        A DataFrame of the downloaded file.
    """

    s3 = boto3.client("s3") 

    full_path = f"{path}{filename}"

    object = s3.get_object(Bucket=bucket, Key=full_path)
    object = object["Body"].read().decode("utf-8")
    output_df = pd.read_csv(StringIO(object))

    return output_df

def upload_dataframe_to_s3(dataframe: pd.DataFrame, bucket: str, path: str):
    s3 = boto3.client("s3")
    buffer = StringIO()
    dataframe.to_csv(buffer, index=False)
    df_content = buffer.getvalue()
    s3.put_object(Bucket=bucket, Key=path, Body=df_content)


def upload_master_data_to_s3(bucket: str, path: str, file_type: str, dataframe: pd.DataFrame):
    """
    Uploads master data (payment_type or company) to S3. Copies the previous version and creates the new one.

    Parameters
    ----------
    bucket : str 
        Name of the S3 bucket where we want to store the files.

    path : str 
        Path within the bucket to upload the files.

    file_type : str 
        Either "company" or "payment_type".

    dataframe : pd.DataFrame
        The dataframe to be uploaded.

    Returns
    -------
    None
    """

    s3 = boto3.client("s3")

    master_file_path = f"{path}{file_type}_master.csv"
    previous_master_file_path = f"transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv"

    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key": master_file_path},
        Key=previous_master_file_path
    )

    upload_dataframe_to_s3(bucket=bucket, dataframe=dataframe, path=master_file_path)

def upload_and_move_file_on_s3(dataframe: pd.DataFrame,
                            datetime_col: str, 
                            bucket: str,
                            file_type: str,
                            filename: str,
                            source_path: str,
                            target_path_raw: str,  
                            target_path_transformed: str):
    s3 = boto3.client("s3")

    formatted_date = dataframe[datetime_col].iloc[0].strftime("%Y-%m-%d")
    new_path_with_filename = f"{target_path_transformed}{file_type}_{formatted_date}.csv"

    upload_dataframe_to_s3(bucket=bucket, dataframe=dataframe, path=new_path_with_filename)

    s3.copy_object(Bucket=bucket,
                CopySource={"Bucket": bucket, "Key":f"{source_path}{filename}"},
                Key=f"{target_path_raw}{filename}")

    #s3.delete_object(Bucket=bucket, Key=f"{source_path}{filename}")


def lambda_handler(event, context):
    s3 = boto3.client('s3')

    bucket = "cubix-taxi-data-tm"
    raw_weather_folder = "raw_data/to_process/weather_data/"
    raw_taxi_trips_folder = "raw_data/to_process/taxi_data/"
    target_taxi_trips_folder = "raw_data/processed/taxi_data/"
    target_weather_folder = "raw_data/processed/weather_data/"

    transformed_taxi_trips_folder = "transformed_data/taxi_trips/"
    transformed_weather_folder = "transformed_data/weather/"

    payment_type_master_folder = "transformed_data/payment_type/"
    company_master_folder = "transformed_data/company/"

    payment_type_master_filename = "Payment_type_master.csv"
    company_type_master_filename = "Company_master.csv"

    payment_type_master = read_csv_from_s3(bucket=bucket, path=payment_type_master_folder, filename=payment_type_master_filename)
    company_master = read_csv_from_s3(bucket=bucket, path=company_master_folder, filename=company_type_master_filename)


    # TAXI TRIP DATA transform and loading
    for file in s3.list_objects(Bucket=bucket, Prefix=raw_taxi_trips_folder)["Contents"]:
        taxi_trip_key = file["Key"]

        if taxi_trip_key.split("/")[-1].strip() != "":
            if taxi_trip_key.split(".")[1] == "json":

                filename = taxi_trip_key.split("/")[-1]

                # using the "helper-funcion":
                taxi_trip_data_json = read_json_from_s3(bucket = bucket, key_name=taxi_trip_key)

                taxi_trip_data_raw = pd.DataFrame(taxi_trip_data_json)
                taxi_trips_transformed = taxi_trip_tranformation(taxi_trip_data_raw)

                company_master_updated = update_master(taxi_trips_transformed, company_master, "company_id", "company")
                payment_type_master_updated = update_master(taxi_trips_transformed, payment_type_master, "payment_type_id", "payment_type")

                taxi_trips = update_taxi_trips_with_masters(taxi_trips_transformed, payment_type_master_updated, company_master_updated)

                upload_and_move_file_on_s3(dataframe= taxi_trips,
                                            datetime_col= "datetime_for_weather",
                                            bucket= bucket, 
                                            file_type= "taxi",
                                            filename= filename,
                                            source_path= raw_taxi_trips_folder,
                                            target_path_raw= target_taxi_trips_folder,
                                            target_path_transformed= transformed_taxi_trips_folder)


                upload_master_data_to_s3(bucket=bucket, path=payment_type_master_folder, file_type="Payment_type", dataframe=payment_type_master_updated)
                upload_master_data_to_s3(bucket=bucket, path=company_master_folder, file_type="Company", dataframe=company_master_updated)

    # WEATHER DATA transform and loading
    for file in s3.list_objects(Bucket=bucket, Prefix=raw_weather_folder)["Contents"]:
        weather_key = file["Key"]

        if weather_key.split("/")[-1].strip() != "":
            if weather_key.split(".")[1] == "json":

                filename = weather_key.split("/")[-1]

                # using the "helper-funcion":
                weather_data_json = read_json_from_s3(bucket = bucket, key_name=weather_key)

                weather_data = transform_weather_data(weather_data_json)

                upload_and_move_file_on_s3(dataframe= weather_data,
                            datetime_col= "date_time",
                            bucket= bucket, 
                            file_type= "weather",
                            filename= filename,
                            source_path= raw_weather_folder,
                            target_path_raw= target_weather_folder,
                            target_path_transformed= transformed_weather_folder)

