In [1]:
import json
import pandas as pd

In [2]:
input = {
    'trip_id': 712382,
    'trip_start_time': "1/1/2017 0:00",
    'trip_stop_time': "1/1/2017 0:03",
    'trip_duration_seconds': 223,
    'from_station_id': 7051,
    "from_station_name": "Wellesley St E / Yonge St Green P",
    'to_station_id': 7089,
    "to_station_name": "Church St  / Wood St",
    "user_type": "Member",
}

In [12]:
input_series = pd.Series(input)
input_series

trip_id                                 1
trip_start_time          2022/1/1 9:40 pm
trip_stop_time           2022/1/1 9:55 pm
trip_duration_seconds                 200
from_station_id                       333
from_station_name                    pape
to_station_id                         444
to_station_name                     bloor
user_type                          Casual
dtype: object

In [7]:
type(input_series) == pd.Series

True

In [14]:
params = {}
params['axis'] = 1
print(type(params))
print(params.get('axis'))

<class 'dict'>
1


In [26]:
params.pop('axis')

1

In [28]:
foo = input
foo.pop(['trip_id', 'user_type'])

TypeError: unhashable type: 'list'

In [17]:
import logging
import sys
from typing import Union
def preprocess(
    df_bikes: Union[pd.DataFrame, pd.Series], 
    logger_level: int = logging.DEBUG):
    """Preprocesses the bikeshare data

    Converts the datetimes from str obj to datetime objects, and extracts
    the time of day to convert into hour floats

    This step is prior to feeding the arrays into the pipeline.
    """
    logging.basicConfig(stream=sys.stdout, level=logger_level)
    df_bikes["dt_start"] = pd.to_datetime(
        df_bikes["trip_start_time"],
        infer_datetime_format=True,
    )
    logging.debug(df_bikes["dt_start"])

    df_bikes["dt_end"] = pd.to_datetime(
        df_bikes["trip_stop_time"],
        infer_datetime_format=True,
    )
    logging.debug(df_bikes["dt_end"])
    # get day of week
    params = {}
    if type(df_bikes) == pd.DataFrame:
        logging.debug("dataframe accepted")
        params['axis'] = 1
        
    df_bikes["day_of_week"] = df_bikes.apply(
        lambda x: x["dt_start"].day_of_week, **params
    )
    # get hours
    df_bikes["start_hour"] = df_bikes.apply(
        lambda x: x["dt_start"].hour + x["dt_start"].minute / 60,
        **params,
    )
    df_bikes["end_hour"] = df_bikes.apply(
        lambda x: x["dt_end"].hour + x["dt_end"].minute / 60,
        **params,
    )
    df_bikes["target"] = df_bikes["user_type"].apply(lambda type: type == "Member")
    drops = [
        "trip_start_time",
        "trip_stop_time",
        "from_station_name",
        "to_station_name",
        "dt_start",
        "dt_end",
        "user_type",
    ]
    df_bikes = df_bikes.drop(drops, **params)
    return df_bikes

`apply()` doesn't really extend to Series; instead of selecting the columns for the transformation, `pd.Series.apply()` will simply apply to every element, since it assumes it's already in column form. Instead of only selecting the `dt_start`, it's trying to *subscript* each element of my `bikes` data array, which would not make sense for int fields like `trip_id` or `duration`.

Pass it as a pd.DataFrame.

Other problem, having issues constructing dataframe from dict, as the dict should have the following structure:

```py
data = {'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']}
```

Whereas my sample data are not in a list

In [20]:
# This will fail
try:
    prep = preprocess(input_series)
except TypeError as e:
    print(e)

'int' object is not subscriptable


In [24]:
input_df = pd.DataFrame.from_dict(input)
prep = preprocess(input_df)

ValueError: If using all scalar values, you must pass an index

In [3]:
# from datetime import datetime
from datetime import datetime
date_fmt = """%d/%m/%Y %H:%M"""
dt_start = datetime.strptime(input['trip_start_time'], date_fmt)
dt_start

datetime.datetime(2017, 1, 1, 0, 0)

In [9]:
import mlflow
from mlflow.tracking import MlflowClient
import os
import logging

from dotenv import load_dotenv
load_dotenv()

MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI", "sqlite:///mlflow.db")
MLFLOW_EXP_NAME = os.getenv("MLFLOW_EXP_NAME", "TO-bikeshare-classifier")
MLFLOW_REGISTERED_MODEL = os.getenv("MLFLOW_REGISTERED_MODEL", "TO-bikeshare-clf")

logger = logging.getLogger("MLFLOW")
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(console_handler)
# logger.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def retrieve():
    """Retrieves and returns the latest version of the registered model"""
    client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
    # search string requires quotation marks around everything
    # only one search expression in 1.26
    mv_search = client.search_model_versions(f"name='{MLFLOW_REGISTERED_MODEL}'")
    for mv in mv_search:
        # pprint(dict(mv), indent=4)
        logger.debug(f"version: {mv.version}\nStage: {mv.current_stage}\nSource: {mv.source}")

    model_uris = [mv.source for mv in mv_search if mv.current_stage == "Production"]
    logger.debug(model_uris)

    # alternatively model_uri could also be direct path to s3 bucket:
    # s3://{MLFLOW_ARTIFACT_STORE}/<exp_id>/<run_id>/artifacts/models
    # the models:/model_name/Production uri is only useable if MLflow server is up
    model = mlflow.pyfunc.load_model(
        # model_uri=f'models:/{MLFLOW_REGISTERED_MODEL}/Production'
        model_uri=model_uris[0]
    )
    return model

In [24]:
model = retrieve()

DEBUG:urllib3.connectionpool:Resetting dropped connection: 13.215.46.159
DEBUG:urllib3.connectionpool:http://13.215.46.159:5000 "GET /api/2.0/preview/mlflow/model-versions/search?filter=name%3D%27TO-bikeshare-clf%27 HTTP/1.1" 200 2140
version: 1
Stage: Archived
Source: s3://mlflow-artifacts-remote-1212/4/fcf666f0460b4ad8b6b6ab2bfe14a902/artifacts/models
DEBUG:MLFLOW:version: 1
Stage: Archived
Source: s3://mlflow-artifacts-remote-1212/4/fcf666f0460b4ad8b6b6ab2bfe14a902/artifacts/models
version: 2
Stage: Archived
Source: s3://mlflow-artifacts-remote-1212/4/0838497703b54c4899907174add1162d/artifacts/models
DEBUG:MLFLOW:version: 2
Stage: Archived
Source: s3://mlflow-artifacts-remote-1212/4/0838497703b54c4899907174add1162d/artifacts/models
version: 3
Stage: Archived
Source: s3://mlflow-artifacts-remote-1212/4/0838497703b54c4899907174add1162d/artifacts/models
DEBUG:MLFLOW:version: 3
Stage: Archived
Source: s3://mlflow-artifacts-remote-1212/4/0838497703b54c4899907174add1162d/artifacts/models


In [7]:
def preprocess(ride):
    """Preprocess the json input"""
    logging.info("Preprocessing request")
    
    features = {}
    date_fmt = """%d/%m/%Y %H:%M"""
    dt_start = datetime.strptime(ride['trip_start_time'], date_fmt)
    dt_end = datetime.strptime(ride['trip_stop_time'], date_fmt)
    features['day_of_week'] = dt_start.weekday()
    features['start_hour'] = dt_start.hour
    features['end_hour'] = dt_end.hour
    features['target'] = ride['user_type'] == "Member"
    features['from_station_id'] = ride['from_station_id']
    features['to_station_id'] = ride['to_station_id']
    features['trip_duration_seconds'] = ride['trip_duration_seconds']
    logging.debug(f'Num features returned: {len(features)}')
    return features

In [8]:
features = preprocess(input)
features

INFO:root:Preprocessing request
DEBUG:root:Num features returned: 7


{'day_of_week': 6,
 'start_hour': 0,
 'end_hour': 0,
 'target': True,
 'from_station_id': 7051,
 'to_station_id': 7089,
 'trip_duration_seconds': 223}

In [12]:
features.keys()

dict_keys(['day_of_week', 'start_hour', 'end_hour', 'target', 'from_station_id', 'to_station_id', 'trip_duration_seconds'])

In [13]:
features.values()

dict_values([6, 0, 0, True, 7051, 7089, 223])

In [18]:
df = pd.DataFrame(data=None, columns=features.keys())
df.head()

Unnamed: 0,day_of_week,start_hour,end_hour,target,from_station_id,to_station_id,trip_duration_seconds


In [21]:
df.iloc[0, :] = list(features.values())

IndexError: iloc cannot enlarge its target object

In [17]:
df

Unnamed: 0,day_of_week,start_hour,end_hour,target,from_station_id,to_station_id,trip_duration_seconds,0
0,,,,,,,,6
1,,,,,,,,0
2,,,,,,,,0
3,,,,,,,,True
4,,,,,,,,7051
5,,,,,,,,7089
6,,,,,,,,223


In [22]:
s3_path = "s3://to-bikeshare-data/source/2017/q1.csv"
df_sample = pd.read_csv(s3_path)

DEBUG:botocore.hooks:Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
DEBUG:botocore.hooks:Changing event name from before-call.apigateway to before-call.api-gateway
DEBUG:botocore.hooks:Changing event name from request-created.machinelearning.Predict to request-created.machine-learning.Predict
DEBUG:botocore.hooks:Changing event name from before-parameter-build.autoscaling.CreateLaunchConfiguration to before-parameter-build.auto-scaling.CreateLaunchConfiguration
DEBUG:botocore.hooks:Changing event name from before-parameter-build.route53 to before-parameter-build.route-53
DEBUG:botocore.hooks:Changing event name from request-created.cloudsearchdomain.Search to request-created.cloudsearch-domain.Search
DEBUG:botocore.hooks:Changing event name from docs.*.autoscaling.CreateLaunchConfiguration.complete-section to docs.*.auto-scaling.CreateLaunchConfiguration.complete-section
DEBUG:botocore.hooks:Changing event name from before-parameter-buil

In [25]:
def preprocess(df_bikes: pd.DataFrame):
    """Preprocesses the bikeshare data

    Converts the datetimes from str obj to datetime objects, and extracts
    the time of day to convert into hour floats

    This step is prior to feeding the arrays into the pipeline.
    """
    df_bikes["dt_start"] = pd.to_datetime(
        df_bikes["trip_start_time"],
        infer_datetime_format=True,
    )
    df_bikes["dt_end"] = pd.to_datetime(
        df_bikes["trip_stop_time"],
        infer_datetime_format=True,
    )
    # get day of week
    df_bikes["day_of_week"] = df_bikes.apply(
        lambda x: x["dt_start"].day_of_week, axis=1
    )
    # get hours
    df_bikes["start_hour"] = df_bikes.apply(
        lambda x: x["dt_start"].hour + x["dt_start"].minute / 60,
        axis=1,
    )
    df_bikes["end_hour"] = df_bikes.apply(
        lambda x: x["dt_end"].hour + x["dt_end"].minute / 60,
        axis=1,
    )
    df_bikes["target"] = df_bikes["user_type"].apply(lambda type: type == "Member")
    drops = [
        "trip_start_time",
        "trip_stop_time",
        "from_station_name",
        "to_station_name",
        "dt_start",
        "dt_end",
        "user_type",
    ]
    df_bikes = df_bikes.drop(drops, axis=1)
    return df_bikes

In [26]:
df_bikes = preprocess(df_sample)

In [28]:
y_preds = model.predict(df_bikes.drop('target', axis=1))

In [30]:
score = sum(y_preds == df_bikes['target'])/len(y_preds)
print(f'{score:.3%}')

87.243%


In [32]:
from sklearn.metrics import roc_auc_score
try:
    y_score = model.predict_proba(df_bikes.drop('target', axis=1))
    roc = roc_auc_score(df_bikes['target'], y_score)
except AttributeError as e:
    print(e)
    

'PyFuncModel' object has no attribute 'predict_proba'


[Need wrapper function to include .predict_proba()](https://github.com/mlflow/mlflow/issues/694)