In [17]:
import pandas as pd
import uuid
import argparse
import datetime
import mlflow
from datetime import date
from datetime import datetime, timedelta

In [3]:
def generate_uuids(len):
    ride_ids = []
    for i in range(len):
        ride_ids.append(str(uuid.uuid4()))
    return ride_ids

In [4]:
# load the data
def read_dataframe(year, month, taxi_type='yellow'):
    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_tripdata_{year}-{month:02d}.parquet'
    
    df = pd.read_parquet(url)
    
    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    df['ride_id'] = generate_uuids(len(df))
    
    return df

In [5]:
# prepare the data
def prepare_data_dict(df):
    df['PU_DO'] = '%s_%s' % (df['PULocationID'], df['DOLocationID'])
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    return dicts

In [25]:
# load the model and predict
def load_model(run_id):
    logged_model = '/workspaces/mlops-training/04-deployment/web-service-mlflow/artifacts/1/ea5d80a75c6548f7a42d857a6d412935/artifacts/model'
    
    model = mlflow.pyfunc.load_model(logged_model)
    return model

In [7]:
def save_results(df, y_pred, run_id, output_file):
    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['lpep_pickup_datetime'] = df['lpep_pickup_datetime']
    df_result['PULocationID'] = df['PULocationID']
    df_result['DOLocationID'] = df['DOLocationID']
    df_result['actual_duration'] = df['duration']
    df_result['predicted_duration'] = y_pred
    df_result['diff'] = df_result['actual_duration'] - df_result['predicted_duration']
    df_result['model_version'] = run_id

    df_result.to_parquet(output_file, index=False)

In [8]:
# @task(name="Scheduled run, get current year and month")
def get_current_year_month():
    today = datetime.today()
    return today.year, today.month

In [29]:

# @task(name="Validate arguments")
def validate_args(year, month):
    if (year is None) != (month is None):
        raise ValueError("You must provide both year and month together, or neither.")
    if year is not None and (year < 2020 or year > datetime.now().year):
        raise ValueError("Year must be between 2020 and current year.")
    if month is not None and (month < 1 or month > 12):
        raise ValueError("Month must be between 1 and 12")
    
def main_flow(taxi_type, year, month, run_id):
    # Assign defaults if neither was provided
    if year is None and month is None:
        print("None values for year and month, using 2 and 3 months prior.")
        year, month = get_current_year_month()
        if month <= 3:
            year -= 1
        month -= 3
    validate_args(year, month)
    df = read_dataframe(year=year, month=month, taxi_type=taxi_type)
    #logger = get_run_logger()
    #logger.info(f'reading the data from {input_file}...')
    dicts = prepare_data_dict(df)

    #logger.info(f'loading the model with RUN_ID={run_id}...')
    model = load_model(run_id)

    #logger.info(f'applying the model...')
    y_pred = model.predict(dicts)

    #logger.info(f'saving the result to {output_file}...')
    save_results(df, y_pred, run_id, "/workspaces/mlops-training/04-deployment/batchmode/output/output.parquet")
    
    

    

In [30]:
 
main_flow("green", 2021, 3, "19fe02adfaaf40dfb0a8a5939491a737")

In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Train a model to predict taxi trip duration.')
    parser.add_argument('--taxi-type', type=str, default="yellow", help='Taxi type (yellow/green)')
    parser.add_argument('--year', type=int, default=None, help='Year of the data to train on (e.g. 2025)')
    parser.add_argument('--month', type=int, default=None, help='Month of the data to train on (e.g. 1-12)')
    parser.add_argument('--run-id', type=str, help='MLflow run ID for tracking the model training')
    args = parser.parse_args()
    
    main_flow(taxi_type=args.taxi-type, year=args.year, month=args.month, run_id=args.run-id)