In [None]:
!pip install pandas_datareader
!pip3 install -U pip
!pip3 install -U setuptools wheel

# CPU version of pytorch has smaller footprint - see installation instructions in
# pytorch documentation - https://pytorch.org/get-started/locally/
!pip3 install torch==1.12+cpu torchvision==0.13.0+cpu torchtext==0.13.0 -f https://download.pytorch.org/whl/cpu/torch_stable.html

# !pip3 install autogluon
# !pip install yfinance sklearn
!pip install yfinance
!pip install autogluon

Collecting pandas_datareader
  Using cached pandas_datareader-0.10.0-py3-none-any.whl (109 kB)
Collecting lxml
  Using cached lxml-4.9.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl (7.1 MB)
Installing collected packages: lxml, pandas-datareader
Successfully installed lxml-4.9.2 pandas-datareader-0.10.0
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-ad9bef9c-0a4a-4add-a308-f9255bc8d842/bin/python -m pip install --upgrade pip' command.[0m
Collecting pip
  Using cached pip-22.3.1-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.2.4
    Uninstalling pip-21.2.4:
      Successfully uninstalled pip-21.2.4
Successfully installed pip-22.3.1
Collecting setuptools
  Using cached setuptools-65.6.3-py3-none-any.whl (1.2 MB)
Collecting wheel
  Using cached wheel-0.38.4-py3-none-any.whl (36 kB)
Installing collected packages: wheel, s

In [None]:
import warnings

import numpy as np
import datetime
from pandas_datareader import data as pdr
import yfinance as yf
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score

import mlflow.sklearn

# get data -> train -> register -> inference (from s3 bucket)


def acquire_training_data():
    yf.pdr_override()
    y_symbols = ["BTC-USD"]

    startdate = datetime.datetime(2022, 1, 1)
    enddate = datetime.datetime(2022, 12, 31)
    df = pdr.get_data_yahoo(y_symbols, start=startdate, end=enddate)
    df.to_csv()
    return df


def digitize(n):
    if n > 0:
        return 1
    return 0


def rolling_window(a, window):
    """
    Takes np.array 'a' and size 'window' as parameters
    Outputs an np.array with all the ordered sequences of values of 'a' of size 'window'
    e.g. Input: ( np.array([1, 2, 3, 4, 5, 6]), 4 )
         Output:
                 array([[1, 2, 3, 4],
                       [2, 3, 4, 5],
                       [3, 4, 5, 6]])
    """
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)


def prepare_training_data(data):

    """
    Return a prepared numpy dataframe
    input : Dataframe with expected schema

    """
    data["Delta"] = data["Close"] - data["Open"]
    data["to_predict"] = data["Delta"].apply(lambda d: digitize(d))
    return data


def register_model(model_name, rf_uri):

    model_uri = rf_uri + "/" + model_name
    new_model_version = mlflow.register_model(model_uri, model_name)
    return new_model_version


def pull_data():
    input_df = (
        spark.read.option("header", True)
        .option("inferschema", True)
        .csv(f"/dbfs/tmp/export.csv")
    )
    btc_mat = input_df.toPandas().to_numpy()

    WINDOW_SIZE = 14

    X = rolling_window(btc_mat[:, 7], WINDOW_SIZE)[:-1, :]
    Y = input_df.toPandas()["to_predict"].to_numpy()[WINDOW_SIZE:]

    X_train, X_test, y_train, y_test = train_test_split(
        X, Y, test_size=0.25, random_state=4284, stratify=Y
    )
    return X_test, y_test


def inference(model_name, model_version):
    X_test, y_test = pull_data()
    rf_model = mlflow.sklearn.load_model(f"models:/{model_name}/{model_version}")
    return rf_model.predict(X_test)


if __name__ == "__main__":
    warnings.filterwarnings("ignore")

    n_estimators = [50, 100, 200]
    criterion = ["gini", "entropy"]
    min_weight_fraction_leaf = [0.0, 0.1]

#     mlflow.set_experiment(experiment_name='/Users/ashish.duhan@thoughtworks.com/demo1')

    for n_est in n_estimators:
        for crit in criterion:
            for mwfl in min_weight_fraction_leaf:

                print("*" * 100)
                model_name = "random_forest_model_{}_{}_{}".format(n_est, crit, mwfl)
                print(
                    "Triggering training with params :: estimators : {}, criterion : {} and min_weight_fraction : {}".format(
                        n_est, crit, mwfl
                    )
                )
                with mlflow.start_run(
                    run_name=f"stock_estimator_{n_est}_{crit}_{mwfl}"
                ) as run:
                    training_data = acquire_training_data()

                    mlflow.sklearn.autolog()

                    prepared_training_data_df = prepare_training_data(training_data)

                    btc_mat = prepared_training_data_df.to_numpy()

                    WINDOW_SIZE = 14

                    X = rolling_window(btc_mat[:, 7], WINDOW_SIZE)[:-1, :]
                    Y = prepared_training_data_df["to_predict"].to_numpy()[WINDOW_SIZE:]

                    X_train, X_test, y_train, y_test = train_test_split(
                        X, Y, test_size=0.25, random_state=4284, stratify=Y
                    )

                    # clf is my Model
                    clf = RandomForestClassifier(
                        bootstrap=True,
                        criterion=crit,
                        min_samples_split=2,
                        min_weight_fraction_leaf=mwfl,
                        n_estimators=n_est,
                        random_state=4284,
                        verbose=0,
                    )

                    # training
                    clf.fit(X_train, y_train)

                    print(" --- Model Predict ---- ")
                    # inference
                    predicted = clf.predict(X_test)
                    #                     mlflow.sklearn.log_model(clf, model_name)

                    print(classification_report(y_test, predicted))

                    #                     mlflow.log_metric(
                    #                         "precision_label_0", precision_score(y_test, predicted, pos_label=0)
                    #                     )
                    #                     mlflow.log_metric(
                    #                         "recall_label_0", recall_score(y_test, predicted, pos_label=0)
                    #                     )
                    #                     mlflow.log_metric("f1score_label_0", f1_score(y_test, predicted, pos_label=0))
                    #                     mlflow.log_metric(
                    #                         "precision_label_1", precision_score(y_test, predicted, pos_label=1)
                    #                     )
                    #                     mlflow.log_metric(
                    #                         "recall_label_1", recall_score(y_test, predicted, pos_label=1)
                    #                     )
                    #                     mlflow.log_metric("f1score_label_1", f1_score(y_test, predicted, pos_label=1))

                    rf_uri = run.info.artifact_uri

****************************************************************************************************
Triggering training with params :: estimators : 50, criterion : gini and min_weight_fraction : 0.0
[*********************100%***********************]  1 of 1 completed
 --- Model Predict ---- 
              precision    recall  f1-score   support

           0       0.47      0.47      0.47        47
           1       0.39      0.39      0.39        41

    accuracy                           0.43        88
   macro avg       0.43      0.43      0.43        88
weighted avg       0.43      0.43      0.43        88

****************************************************************************************************
Triggering training with params :: estimators : 50, criterion : gini and min_weight_fraction : 0.1
[*********************100%***********************]  1 of 1 completed
 --- Model Predict ---- 
              precision    recall  f1-score   support

           0       0.50     

In [None]:
def find_best_run(experiment_name: int, metric: str = "training_f1_score"):
    client = mlflow.MlflowClient()
    experiment = client.search_experiments(filter_string= "name = '{}'".format(experiment_name))
    experiment_runs = client.search_runs(experiment_ids=[experiment[0].experiment_id])
    best_run_id = None
    best_artifact_uri = None
    best_metric_score = None
    for run in experiment_runs:
        if not metric in run.data.metrics:
            raise Exception("Bad metric passed for evaluation.")
        else:
            metrics = run.data.metrics
            if best_run_id is None:
                best_run_id = run.info.run_id
                best_artifact_uri = run.info.artifact_uri
                best_metric_score = metrics[metric]
            else:
                if metrics[metric] >= best_metric_score:
                    best_run_id = run.info.run_id
                    best_artifact_uri = run.info.artifact_uri
                    best_metric_score = metrics[metric]
                    
    return best_run_id, best_artifact_uri, best_metric_score

In [None]:
experiment_name = '/Users/ashish.duhan@thoughtworks.com/GiLeadPOC/InferencePipeline'
best_run_id, best_artifact_uri, best_metric_score = find_best_run(experiment_name=experiment_name)

model_version = register_model(model_name='model', rf_uri=best_artifact_uri)
print(f"model registeration complete {model_name}:{model_version.version}")

Registered model 'model' already exists. Creating a new version of this model...
2023/01/06 15:03:48 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: model, version 7
model registeration complete random_forest_model_200_entropy_0.1:7
Created version '7' of model 'model'.


In [None]:
client = mlflow.MlflowClient()
client.transition_model_version_stage(
    name='model',
    version=model_version.version,
    stage='STAGING'
)

Out[12]: <ModelVersion: creation_timestamp=1673017428202, current_stage='Staging', description='', last_updated_timestamp=1673017686394, name='model', run_id='', run_link='', source='dbfs:/databricks/mlflow-tracking/294838824832781/72422b3ec15c456aa9dffb6ac27d91ab/artifacts/model', status='READY', status_message='', tags={}, user_id='910574891456575', version='7'>

In [None]:
client.transition_model_version_stage(
    name='model',
    version=model_version.version,
    stage='PRODUCTION'
)

Out[13]: <ModelVersion: creation_timestamp=1673017428202, current_stage='Production', description='', last_updated_timestamp=1673017750929, name='model', run_id='', run_link='', source='dbfs:/databricks/mlflow-tracking/294838824832781/72422b3ec15c456aa9dffb6ac27d91ab/artifacts/model', status='READY', status_message='', tags={}, user_id='910574891456575', version='7'>

In [None]:
print(" --- Model Inference ---- ")
model_name = 'model'
print(f"\ntriggering inference with {model_name}:{model_version.version}")
predictions = inference(model_name, model_version.version)
print(predictions, "\n")

 --- Model Inference ---- 

triggering inference with model:7
[1 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 1 0 0 0 1 1 0 1 1 1 0 0 1 0 0 1 0 0 1 0 1
 1 0 1 1 0 1 0 0 1 1 1 0 0 0 0 0 1 0 0 0 1 0 1 0 0 0 1 1 1 0 0 1 1 1 1 1 1
 0 1 1 0 1 1 0 1 1 1 1 0 0 1] 



In [None]:
best_run_id, best_artifact_uri, best_metric_score = find_best_run(experiment_name=experiment_name, metric='acc')
model_version = register_model(model_name='model', rf_uri=best_artifact_uri)
print(f"model registeration complete {model_name}:{model_version.version}")

Registered model 'model' already exists. Creating a new version of this model...
2023/01/06 13:17:18 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: model, version 5
model registeration complete model:5
Created version '5' of model 'model'.


In [None]:
print(" --- Model Inference ---- ")
model_name = 'model'
print(f"\ntriggering inference with {model_name}:{model_version.version}")
X_test, y_test = pull_data()
loaded_model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_version.version}")
loaded_model.predict(X_test)
# rf_model = mlflow.sklearn.load_model(f"models:/{model_name}/{model_version}")
print(predictions, "\n")

 --- Model Inference ---- 

triggering inference with model:5
This means that the predictor was fit in a version `<=0.3.1`.


[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
File [0;32m<command-3072912389488897>:5[0m
[1;32m      3[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124mtriggering inference with [39m[38;5;132;01m{[39;00mmodel_name[38;5;132;01m}[39;00m[38;5;124m:[39m[38;5;132;01m{[39;00mmodel_version[38;5;241m.[39mversion[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m      4[0m X_test, y_test [38;5;241m=[39m pull_data()
[0;32m----> 5[0m loaded_model [38;5;241m=[39m mlflow[38;5;241m.[39mpyfunc[38;5;241m.[39mload_model([38;5;124mf[39m[38;5;124m"[39m[38;5;124mmodels:/[39m[38;5;132;01m{[39;00mmodel_name[38;5;132;01m}[39;00m[38;5;124m/[39m[38;5;132;01m{[39;00mmodel_version[38;5;241m.[39mversion[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m      6[0m loaded_model[38;5;241m.[39mpredict(X_test