# Model Training

In this exercise we will perform feature engineering on our training set then train a model, deploy it and write a [model card](https://doi.org/10.1145/3287560.3287596) to provide reporting on the model.

## Task 0 - Setup

These are just some preliminary steps for getting started with this section.

### 🔄 Task

- Load any environment variables located in `.env`
- Get the username for content deployed to Posit Connect

### 🧑‍💻 Code

In [None]:
import datetime
import os
import shutil
from pathlib import Path

import duckdb
import hvplot.polars
import pins
import polars as pl
import polars.selectors as cs
import vetiver
from dotenv import load_dotenv
from posit.connect import Client
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from vetiver import VetiverModel


In [2]:
if Path(".env").exists():
    load_dotenv()

In [None]:
with Client() as client:
    username = client.me.username

print(f"Connect username is: '{username}'")

## Task 1 - Reading the data

First we will read in the data prepared in [02-data-exploration-and-validation](../02-data-exploration-and-validation/notebook.ipynb) and take a quick look at it prepare for modeling.

### 🔄 Task

- Read in and glimpse the vessel history data
- Read in and glimpse the vessel verbose data
- Read in and glimpse the weather data

### 🧑‍💻 Code

In [None]:
con = duckdb.connect('md:', read_only=True)
database_name = "washington_ferries"
con.execute(f"USE {database_name}")

In [None]:
vessel_history = con.sql("SELECT * FROM vessel_history").pl()
vessel_history

In [None]:
vessel_verbose = con.sql("SELECT * FROM vessel_verbose").pl()
vessel_verbose

In [None]:
weather = con.sql("SELECT * FROM terminal_weather").pl()
weather

In [8]:
con.close()

## Task 2 - Feature Engineering

### 🔄 Task

- Join the `vessel_history`, `vessel_verbose` and `weather` data into a form useful for modeling
- Transform the columns in new ones we can use for modeling

### 🧑‍💻 Code

First we compute a departure delay based on the difference between the actual and scheduled departure times and derive new features representing the day of the week and hour of departure.

In [None]:
trips_combined = vessel_history.with_columns(
    (pl.col("ActualDepart") - pl.col("ScheduledDepart"))
    .dt.total_seconds()
    .alias("Delay"),
    pl.col("Date").dt.weekday().alias("Weekday"),
    pl.col("Date").dt.hour().alias("Hour"),
).drop("EstArrival")

trips_combined.head(3)

Taking a look at `Delay` shows it is very skewed with a lot of values near zero and some even less than zero.

In [None]:
trips_combined.hvplot.hist("Delay", bin_range=(-1800, 7200), bins=30)

Since we're not interested in negative values here, we'll clip the values below zero. We can then log them, turning into it a nicer, more "normal" distribution. 

In [None]:
trips_combined = trips_combined.select(
    pl.exclude("Delay"),
    pl.col("Delay").map_elements(lambda x: max(x, 1), return_dtype=pl.Float64).log().alias("LogDelay")
)

trips_combined.hvplot.hist("LogDelay")

Now we prepare the data describing the individual vessels by only keeping the columns we're interested in and extracting the year from `YearBuilt` and `YearRebuilt`.

In [None]:
vessel_info = vessel_verbose.select(
    pl.col("VesselName"),
    pl.col("ClassName"),
    # we can also select multiple columns in one `pl.col(...)`
    pl.col(
        "SpeedInKnots",
        "EngineCount",
        "Horsepower",
        "MaxPassengerCount",
        "PassengerOnly",
        "FastFerry",
        "PropulsionInfo",
    ),
    pl.col("YearBuilt", "YearRebuilt").dt.year(),
)

vessel_info.head(3)

The trips and vessel data are joined together.

In [None]:
trips_combined = trips_combined.join(
    vessel_info, left_on="Vessel", right_on="VesselName", how="left", coalesce=True
)

trips_combined.head(3)

Joining in the weather data is a little harder since the weather data has a temporal resolution of one-hour whereas the trips can leave at any arbitrary time, making a naive join on timestamps not work. Here, we round the the departure time of the trips to the nearest hour and then join on the weather data.

In [None]:
weather = weather.select(
    pl.col(
        "time",
        "weather_code",
        "temperature_2m",
        "precipitation",
        "cloud_cover",
        "wind_speed_10m",
        "wind_direction_10m",
        "wind_gusts_10m",
        "terminal_name",
    )
)

trips_combined = (
    trips_combined.with_columns(pl.col("Date").dt.round("1h").alias("time"))
    .join(
        weather.rename(lambda col_name: f"departing_{col_name}"),
        how="left",
        left_on=["Departing", "time"],
        right_on=["departing_terminal_name", "departing_time"],
        coalesce=True,
    )
    .join(
        weather.rename(lambda col_name: f"arriving_{col_name}"),
        how="left",
        left_on=["Arriving", "time"],
        right_on=["arriving_terminal_name", "arriving_time"],
        coalesce=True,
    )
    .select(pl.exclude("time"))
)

trips_combined.head(3)

Let's take a look at how many `null`s there are in the data.

In [None]:
trips_combined.null_count()

We want the drop the `null`s except for `YearRebuilt` since we expect to have weather data available. We can expect `YearRebuilt` in the case of vessels that have not yet been rebuilt.

In [None]:
trips_combined = trips_combined.drop_nulls(subset=cs.exclude("YearRebuilt"))

trips_combined.null_count()

Now we enumerate all our numerical and categorical features. This will be needed later in setting up preprocessing for our model. For the categorical features we take a count of how often each category occurs to make sure our model isn't dependant on categories that are uncommon in the data.

In [None]:
numeric_features = [
    "SpeedInKnots",
    "EngineCount",
    "Horsepower",
    "MaxPassengerCount",
    "YearBuilt",
    "YearRebuilt",
    "departing_temperature_2m",
    "departing_cloud_cover",
    "departing_wind_speed_10m",
    "departing_wind_direction_10m",
    "departing_wind_gusts_10m",
    "arriving_temperature_2m",
    "arriving_cloud_cover",
    "arriving_wind_speed_10m",
    "arriving_wind_direction_10m",
    "arriving_wind_gusts_10m",
]

categorical_features = [
    "Vessel",
    "Weekday",
    "Hour",
    "Departing",
    "Arriving",
    "ClassName",
    "PropulsionInfo",
    "departing_weather_code",
    "arriving_weather_code",
]

for cf in categorical_features:
    print(
        f"feature: '{cf}', count:",
        trips_combined.group_by(cf).agg(pl.len()).sort("len"),
    )

We can see that some of the weather codes don't occur as frequently as would be ideal and decide to keep note of which codes these are.

In [None]:
low_count_weather_codes = set(
    [
        *trips_combined.group_by("departing_weather_code")
        .agg(pl.len())
        .sort("len")
        .filter(pl.col("len") < 300)["departing_weather_code"]
        .to_list(),
        *trips_combined.group_by("arriving_weather_code")
        .agg(pl.len())
        .sort("len")
        .filter(pl.col("len") < 300)["arriving_weather_code"]
        .to_list(),
    ]
)

low_count_weather_codes

These less common weather codes are binned into their own category.

In [19]:
def recode_weather_codes(code):
    return "other" if code in low_count_weather_codes else str(code)


trips_combined = trips_combined.with_columns(
    pl.col("departing_weather_code").map_elements(
        recode_weather_codes, return_dtype=pl.String
    ),
    pl.col("arriving_weather_code").map_elements(
        recode_weather_codes, return_dtype=pl.String
    ),
)

Finally, some unnecessary columns are dropped and the `Date` column is turned into a proper date.

In [None]:
trips_combined = trips_combined.select(
    cs.exclude("ScheduledDepart", "ActualDepart")
).with_columns(pl.col("Date").dt.date())

trips_combined

## Task 3 - Model Training

### 🔄 Task

Define a `scikit-learn` pipeline that

- Transform the data for the model to ingest
- Trains a gradient boosted machine model to predict the logged departure delay

### 🧑‍💻 Code

Here a column transformer is defined to preprocess our data. Numerical and categorical columns are treated differently with the former being passed through as-is and the latter being one-hot encoded.

In [21]:
column_transformer = ColumnTransformer(
    [
        # this just passes the variables through as-is
        ("numeric_features", "passthrough", numeric_features),
        # this one-hot encodes the variables
        ("categorical_features", OneHotEncoder(), categorical_features),
    ]
)

We next define our actual model, a [gradient boosted machine](https://scikit-learn.org/stable/modules/ensemble.html#histogram-based-gradient-boosting) akin to [LightGBM](https://github.com/Microsoft/LightGBM).

In [22]:
regressor = HistGradientBoostingRegressor(verbose=2, random_state=2)

Our model requires non-sparse data, so a custom transformer is defined to take a sparse dataset and turn it into a dense one. See [toarray](https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.toarray.html#scipy.sparse.csr_matrix.toarray).

In [23]:
class DenseTransformer(TransformerMixin, BaseEstimator):
    def fit(self, X, y=None, **params):
        return self

    def transform(self, X, y=None, **params):
        return X.toarray()

Now the transformers and model are combined into a single pipeline.

In [24]:
model = Pipeline(
    [
        ("column-transformer", column_transformer),
        ("densify", DenseTransformer()),
        ("regressor", regressor),
    ]
)

We set aside more recent for [model monitoring](../04-model-monitoring/monitoring_dashboard.qmd).

In [25]:
train_test_data = trips_combined.filter(
    pl.col("Date") < (datetime.date.today() - datetime.timedelta(weeks=4))
)

monitoring_data = trips_combined.filter(
    pl.col("Date") >= (datetime.date.today() - datetime.timedelta(weeks=4))
)

In [26]:
con = duckdb.connect('md:', read_only=False)
database_name = "washington_ferries"
con.execute(f"USE {database_name}")

table_name = "monitoring_data"
con.sql(f'DROP TABLE IF EXISTS {table_name}')
con.sql(f'CREATE TABLE {table_name} as SELECT * FROM {table_name}') 

con.close()

Now the remaining data is split into a training set and testing set for evaluating the model.

In [None]:
X = train_test_data.drop("LogDelay", "Date")
y = train_test_data["LogDelay"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=2)

print(f"Number of rows of training data: {X_train.shape[0]}")
print(f"Number of rows testing data:  {X_test.shape[0]}")

In [28]:
con = duckdb.connect('md:', read_only=False)
database_name = "washington_ferries"
con.execute(f"USE {database_name}")

table_name = "x_test"
con.sql(f'DROP TABLE IF EXISTS {table_name}')
con.sql(f'CREATE TABLE {table_name} as SELECT * FROM X_test') 

con.close()

Finally, our model is trained. This also trains the encodings for our one-hot encoder.

In [None]:
%%time
model.fit(X_train.to_pandas(), y_train)

The model is evaluated.

In [None]:
model.score(X_test, y_test)

## Task 4 - Model Deployment

### 🔄 Task

- Deploy the model using `vetiver` and `pins` onto Posit Connect
- Deploy an API around the model onto Posit

### 🧑‍💻 Code

Our model is wrapped up in a [`VetiverModel` object for serving it](https://rstudio.github.io/vetiver-python/stable/reference/VetiverModel.html#vetiver.VetiverModel).

In [31]:
v = VetiverModel(
    model, model_name=f"{username}/ferry_delay", prototype_data=X.to_pandas()
)

Our model is now deployed to Connect, making it available to use elsewhere.

In [None]:
model_board = pins.board_connect(allow_pickle_read=True)
vetiver.vetiver_pin_write(model_board, model=v)

In order to more easily use our model, we need to define an API that loads and serves it. This API will also be deployed to Connect.

In [None]:
Path("api").mkdir(parents=True, exist_ok=True)

# vetiver.write_app(model_board, f"{username}/ferry_delay", file="api/app.py")
shutil.copy2("requirements.txt", "api/requirements.txt")

In [None]:
%%time

api_guid = os.getenv("API_GUID")
if api_guid is None:
    print("Creating a new deployment")
    !rsconnect deploy fastapi --entrypoint app:api --title "{username}/ferry_delay_vetiver" api/
else:
    print(f"Updating existing deployment {api_guid}")
    !rsconnect deploy fastapi --app-id "{api_guid}" --entrypoint app:api --title "{username}/ferry_delay_vetiver" api/

## Task 5 - Model Card

### 🔄 Task

- Use a model card to describe various metrics for how the model performs
- Deploy the card to Connect

### 🧑‍💻 Code

In [35]:
# vetiver.templates.model_card()

In [None]:
print("✅ Complete")