In [21]:
%reload_ext autoreload
%autoreload 2

In [2]:
import src.config as config
import hopsworks

In [3]:
# Connect to project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# Connect to feature store
feature_store = project.get_feature_store()

# Connect to the feature group
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION
)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/42132
Connected. Call `.close()` to terminate connection gracefully.


In [6]:
# Create feature view
# Feature view uses only one feature group
print(config.FEATURE_GROUP_NAME, config.FEATURE_VIEW_NAME )
try:
    # Create feature view if one doesn't exist
    feature_store.create_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION,
        query=feature_group.select_all()
    )

except:
    print('Feature view already existed. Skip creation...')


# Get feature view
feature_view = feature_store.get_feature_view(
    name=config.FEATURE_VIEW_NAME,
    version=config.FEATURE_VIEW_VERSION
)     

ts_hourly_feat_group ts_hourly_feat_view
Feature view already existed. Skip creation...


In [7]:
ts_data, _ = feature_view.training_data(
    description="Time series hourly taxi rides"
)

2023-05-16 20:03:11,993 INFO: USE `ml_prod_pipline_featurestore`
2023-05-16 20:03:12,980 INFO: SELECT `fg0`.`pickup_hr` `pickup_hr`, `fg0`.`rides` `rides`, `fg0`.`pickup_loc_id` `pickup_loc_id`
FROM `ml_prod_pipline_featurestore`.`ts_hourly_feat_group_1` `fg0`




In [8]:
ts_data

Unnamed: 0,pickup_hr,rides,pickup_loc_id
0,2022-10-02 10:00:00,0,15
1,2022-10-06 15:00:00,2,10
2,2022-10-19 05:00:00,3,43
3,2022-03-03 18:00:00,0,222
4,2022-02-22 17:00:00,0,122
...,...,...,...
2666369,2023-02-27 12:00:00,1,130
2666370,2022-08-10 06:00:00,0,159
2666371,2022-08-12 13:00:00,0,160
2666372,2023-02-03 10:00:00,172,142


In [11]:
from src.data import processData2FeatTgt

feats, tgts = processData2FeatTgt(
    ts_data,
    input_feat_len=24*28, # Month
    step_size=23,
)

feats_and_tgts = feats.copy()
feats_and_tgts['tgt_rides_nxt_hr'] = tgts

print(f"{feats_and_tgts.shape=}")

100%|██████████| 262/262 [00:42<00:00,  6.23it/s]

feats_and_tgts.shape=(108468, 675)





In [13]:
from datetime import date, timedelta
from pytz import timezone
import pandas as pd
from src.data_split import trainTestSplit

# Training data -> from Jan 2022 up until 2 months ago
# Test Data -> last 2 months

cutoff_date = pd.to_datetime(date.today() - timedelta(days=28*3))

print(f"{cutoff_date=}")


X_train, y_train, X_test, y_test = trainTestSplit(
    feats_and_tgts,
    cutoff_date,
    tgt_col_name="tgt_rides_nxt_hr"
)

print(f"{X_train.shape}")
print(f"{y_train.shape}")
print(f"{X_test.shape}")
print(f"{y_test.shape}")

cutoff_date=Timestamp('2023-02-21 00:00:00')
(106394, 674)
(106394,)
(2074, 674)
(2074,)


In [14]:
import numpy as np
from sklearn.model_selection import KFold, TimeSeriesSplit
from sklearn.pipeline import make_pipeline
from sklearn.metrics import mean_absolute_error
import optuna

from src.model import getPipeline

def objective(trial: optuna.trial.Trial) -> float:
    """
    Given set of hyper-parameters, train a model and
    compute an average validation error based on a 
    TimeSeriesSplit 
    """
    
    # pick hyper-parameters
    hyperparams = {
        "metric" : "mae",
        "verbose": -1,
        "num_leaves": trial.suggest_int("num_leaves", 2, 256),
        "feature_fraction": trial.suggest_float("feature_fraction", 0.2, 1.0),
        "bagging_fraction": trial.suggest_float("bagging_fraction", 0.2, 1.0),
        "min_child_samples": trial.suggest_int("min_child_samples", 3, 100),
    }
    
    tss = TimeSeriesSplit(n_splits=2)
    scores = []
    
    for train_index, val_index in tss.split(X_train):
        
        # split data for training and validation
        X_train_, X_val_ = X_train.iloc[train_index,:],X_train.iloc[val_index,:]
        y_train_, y_val_ = y_train.iloc[train_index],y_train.iloc[val_index]
    
        # train the model
        pipeline = getPipeline(**hyperparams)
        pipeline.fit(X_train_, y_train_)
        
        # evaluate the model
        y_pred = pipeline.predict(X_val_)
        mae = mean_absolute_error(y_val_, y_pred)
        
        scores.append(mae)
        
    # Return mean score
    return np.array(scores).mean()

In [15]:
study  = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=1)

[32m[I 2023-05-16 21:03:52,648][0m A new study created in memory with name: no-name-9c466993-958e-4fb9-b2c2-1795a1d472c2[0m
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
[32m[I 2023-05-16 21:04:25,495][0m Trial 0 finished with value: 7.079075262216465 and parameters: {'num_leaves': 13, 'feature_fraction': 0.7917837664186249, 'bagging_fraction': 0.6978934586367291, 'min_child_samples': 61}. Best is trial 0 with value: 7.079075262216465.[0m


In [16]:
best_params = study.best_trial.params
print(f"{best_params=}")


best_params={'num_leaves': 13, 'feature_fraction': 0.7917837664186249, 'bagging_fraction': 0.6978934586367291, 'min_child_samples': 61}


In [17]:

pipeline = getPipeline(**best_params)
pipeline.fit(X_train,y_train)



In [18]:
predictions = pipeline.predict(X_test)
test_mae = mean_absolute_error(y_test, predictions)
print(f"{test_mae=:.4f}")

test_mae=6.3754


In [24]:
import joblib
from src.paths import MODELS_DIR

joblib.dump(pipeline, MODELS_DIR / "model.pkl")

['/home/ktk/taxi_demand_predictor/models/model.pkl']

In [25]:
from hsml.schema import Schema
from hsml.model_schema import ModelSchema

input_schema = Schema(X_train)
output_schema = Schema(y_train)
model_schema = ModelSchema(input_schema=input_schema, 
                           output_schema=output_schema)


In [26]:
model_registry = project.get_model_registry()

model = model_registry.sklearn.create_model(
    name="taxi_demand_predictor_next_hour",
    metrics={"test_mae": test_mae},
    description="LightGBM regressor with hyper-parameter tuning",
    input_example=X_train.sample(),
    model_schema=model_schema
)

model.save(MODELS_DIR / 'model.pkl')

Connected. Call `.close()` to terminate connection gracefully.


  0%|          | 0/6 [00:00<?, ?it/s]

Model created, explore it at https://c.app.hopsworks.ai:443/p/42132/models/taxi_demand_predictor_next_hour/1


Model(name: 'taxi_demand_predictor_next_hour', version: 1)