In [1]:
import pandas as pd
import plotly.figure_factory as ff

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression, Lasso, ElasticNet, ElasticNetCV
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline


from sklearn.metrics import mean_squared_error

In [41]:
ls ../data

fhv_tripdata_2021-01.parquet    green_tripdata_2021-02.parquet
fhv_tripdata_2021-02.parquet    green_tripdata_2022-01.parquet
green_tripdata_2021-01.parquet


## Train model

In [73]:
train_set_filepath = "../data/fhv_tripdata_2021-01.parquet"
val_set_filepath = "../data/fhv_tripdata_2021-02.parquet"


dfs = []
for filepath in [train_set_filepath, val_set_filepath]:
    df = pd.read_parquet(filepath)
    
    df["duration"] = df.dropOff_datetime - df.pickup_datetime
    df["duration"] = df["duration"].dt.total_seconds() / 60
    
    print(df.shape)
    print(df.duration.mean())
    
    df_prepared = df.query("1 <= duration <= 60").copy()
    
    categorical = ['PUlocationID', 'DOlocationID']

    df_prepared[categorical] = df_prepared[categorical].astype(str)
    
    df_prepared["PU_DO"] = df_prepared['PUlocationID'] + "_" + df_prepared['DOlocationID']
    
    dfs.append(df_prepared)

df_train = dfs[0]
df_val = dfs[1]

(1154112, 8)
19.167224093791006
(1037692, 8)
20.706986225199763


In [82]:
categorical = ['PUlocationID', 'DOlocationID']
(df_train[categorical] == "nan").any(axis=1).sum() / len(df_train)

0.8453180949085712

In [83]:

numerical = ["trip_distance"]
target = "duration"

dv = DictVectorizer()

train_dicts = df_train[categorical].to_dict(orient="records")
X_train = dv.fit_transform(train_dicts)

val_dicts = df_val[categorical].to_dict(orient="records")
X_val = dv.transform(val_dicts)

y_train = df_train[target].values
y_val = df_val[target].values


In [84]:
X_train

<1109826x525 sparse matrix of type '<class 'numpy.float64'>'
	with 2219652 stored elements in Compressed Sparse Row format>

In [88]:
lr = LinearRegression()
lr.fit(X_train, y_train)

y_pred = lr.predict(X_val)

mean_squared_error(y_val, y_pred, squared=False)

11.014286566178097

In [None]:
lr = Lasso(alpha=0.01)
lr.fit(X_train, y_train)

y_pred = lr.predict(X_val)

mean_squared_error(y_val, y_pred, squared=False)

In [None]:
lr = ElasticNet()
lr.fit(X_train, y_train)

y_pred = lr.predict(X_val)

mean_squared_error(y_val, y_pred, squared=False)

In [None]:
fig = ff.create_distplot([y_pred, y_val], group_labels=["prediction", "actual"])
fig.update_layout(
    height=600
)

fig.show()

## Pipelining with Kedro

In [31]:
def read_dataframe(filepath):    
    df = pd.read_parquet(filepath)
    return df


def prepare_data(df):
    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df["duration"] = df["duration"].dt.total_seconds() / 60

    df_prepared = df.query("1 <= duration <= 60").copy()
    
    categorical = ['PULocationID', 'DOLocationID']

    df_prepared[categorical] = df_prepared[categorical].astype(str)
    
    df_prepared["PU_DO"] = df_prepared['PULocationID'] + "_" + df_prepared['DOLocationID']
    
    return df_prepared

def to_records(df):
    return df.to_dict(orient="records")


def get_X_y(df, features, target):
    return df[features], df[target]

def train_model(X_train, y_train, model_type: str = "lr"):
    if model_type == "lr":
        model = LinearRegression()
    else:
        raise NotImplementedError()

    p = Pipeline(
        [
            ("torecords", FunctionTransformer(to_records)),
            ("vectorizer", DictVectorizer()),
            ("model", model)
        ]
    )
    
    p.fit(X_train, y_train)
    
    return p

def predict(model, X):
    return model.predict(X)


def evaluate_model(y_val, y_pred):
    metrics = {"rmse":  mean_squared_error(y_val, y_pred, squared=False)}
    return metrics
    
    

In [30]:
target = "duration"
categorical = ['PULocationID', 'DOLocationID']
numerical = ["trip_distance"]
features = categorical + numerical

df_train = read_dataframe("../data/green_tripdata_2021-01.parquet")
df_val = read_dataframe("../data/green_tripdata_2021-02.parquet")

df_train = prepare_data(df_train)
df_val = prepare_data(df_val)

X_train, y_train = get_X_y(df_train, features, target)
X_val, y_val = get_X_y(df_val, features, target)

model = train_model(X_train, y_train)

y_pred = predict(model, X_val)

metrics =  evaluate_model(y_val, y_pred)

metrics

{'rmse': 10.499110710078236}

In [32]:
from kedro.pipeline import pipeline, node


training_pipeline = pipeline(
    [
        node(prepare_data, inputs={"df": "training_set"}, outputs="df_prepared"),
        node(
            get_X_y, 
            inputs={
                "df": "df_prepared",
                "features": "params:FEATURES", 
                "target": "params:TARGET"
            },
            outputs=["X_train", "y_train"]
        ),
        node(
            train_model,
            inputs=["X_train", "y_train"],
            outputs="model"
        ),
    ]
)

inference_pipeline = pipeline(
    [
        node(prepare_data, inputs={"df": "inference_set"}, outputs="df_prepared_inference"),
        node(
            get_X_y, 
            inputs={
                "df": "df_prepared_inference",
                "features": "params:FEATURES", 
                "target": "params:TARGET"
            },
            outputs=["X", "y"]
        ),
        node(
            predict,
            inputs={"model": "model", "X": "X"},
            outputs="y_pred"
        ),
    ]
)


evaluation_pipeline = pipeline(
    [
        inference_pipeline,
        node(evaluate_model, inputs={"y_pred": "y_pred", "y_val": "y"}, outputs="metrics")
    ]
)

training_and_evaluation_pipeline = pipeline(
    training_pipeline + evaluation_pipeline,
    inputs = {"training_set": "training_set", "inference_set": "validation_set"},
    outputs="metrics"
)

training_and_evaluation_pipeline

Pipeline([
Node(prepare_data, {'df': 'training_set'}, 'df_prepared', None),
Node(prepare_data, {'df': 'validation_set'}, 'df_prepared_inference', None),
Node(get_X_y, {'df': 'df_prepared_inference', 'features': 'params:FEATURES', 'target': 'params:TARGET'}, ['X', 'y'], None),
Node(get_X_y, {'df': 'df_prepared', 'features': 'params:FEATURES', 'target': 'params:TARGET'}, ['X_train', 'y_train'], None),
Node(train_model, ['X_train', 'y_train'], 'model', None),
Node(predict, {'model': 'model', 'X': 'X'}, 'y_pred', None),
Node(evaluate_model, {'y_pred': 'y_pred', 'y_val': 'y'}, 'metrics', None)
])

In [33]:
from kedro.runner import SequentialRunner
from kedro.io import DataCatalog
from kedro.extras.datasets.pandas import ParquetDataSet

runner = SequentialRunner()

catalog = DataCatalog(
    {
        "training_set": ParquetDataSet("../data/green_tripdata_2021-01.parquet"),
        "validation_set": ParquetDataSet("../data/green_tripdata_2021-01.parquet")
    },
    feed_dict = {
        "params:FEATURES": features,
        "params:TARGET": target
    }
)

runner.run(training_and_evaluation_pipeline, catalog)

{'metrics': {'rmse': 9.838799799829626}}