In [6]:
import pickle

import pandas as pd

from sklearn.feature_extraction import DictVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import root_mean_squared_error
import mlflow
# pipeline
from sklearn.pipeline import make_pipeline
import os
import uuid

In [21]:
year = 2021
month = 2
taxi_type = 'green'
input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'
output_file = f'output/{taxi_type}/{year:04d}-{month:02}.parquet'
os.makedirs(os.path.dirname(output_file), exist_ok=True)

RUN_ID = os.getenv('RUN_ID', 'm-73b1fea3e7c0444ebff7192f9d16ed53')


In [22]:
def generate_uuid(n):
    rides_ids = []
    for _ in range(n):
        rides_ids.append(str(uuid.uuid4()))
    return rides_ids


def read_dataframe(filename: str):
    df = pd.read_parquet(filename)

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    df['ride_id'] = generate_uuid(len(df))
    return df


def prepare_dictionaries(df: pd.DataFrame):
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    dicts = df[categorical + numerical].to_dict(orient='records')
    return dicts

In [23]:
def load_model(run_id: str):
    logged_model = f's3://koomi-mlflow-artifacts-remote/2/models/{run_id}/artifacts'
    model = mlflow.pyfunc.load_model(logged_model)
    return model

def apply_model(input_file: str, run_id: str, output_file: str):
        
        
    df = read_dataframe(input_file)
    dicts = prepare_dictionaries(df)
    
    model = load_model(run_id)
    y_pred = model.predict(dicts)
    
    df_result = pd.DataFrame()
    df_result["ride_id"] = df["ride_id"]
    df_result["lpep_pickup_datetime"] = df["lpep_pickup_datetime"]
    df_result["PULocationID"] = df["PULocationID"]
    df_result["DOLocationID"] = df["DOLocationID"]
    df_result["actual_duration"] = df["duration"]
    df_result['predicted_duration'] = y_pred
    df_result['diff'] = df_result['actual_duration'] - df_result['predicted_duration']
    df_result['model_version'] = run_id
    df_result.to_parquet(output_file, index=False)


In [25]:
apply_model(input_file, RUN_ID, output_file)

Downloading artifacts: 100%|██████████| 5/5 [00:00<00:00, 12.93it/s]


In [27]:
!ls -lh output/green/

total 3.6M
-rw-rw-r-- 1 ubuntu ubuntu 3.6M Sep  5 17:52 2021-02.parquet
