# Flyte + Sagemaker Operators demo
Trains a simple XGBoost classifier model for Diabetes Dataset.
Dataset Information https://archive.ics.uci.edu/ml/support/diabetes

In [19]:
import os
os.environ['FLYTE_PLATFORM_URL'] = "127.0.0.1:8089"
os.environ["AWS_PROFILE"] = "flytedemo"
project="aws-demo"
domain="development"
version="v4.0"

from flytekit.configuration import set_flyte_config_file
os.environ["FLYTE_INTERNAL_IMAGE"] = "236416911133.dkr.ecr.us-east-1.amazonaws.com/awsdemoplugin:v1.0"
set_flyte_config_file("../flyte.config")
os.environ["FLYTE_INTERNAL_CONFIGURATION_PATH"] = "/app/flyte.config"

from flytekit.clis.sdk_in_container.register import register_all, register_tasks_only
register_all(project, domain, ["workflows"], version=version, test=False)

Running task, workflow, and launch plan registration for aws-demo, development, ['workflows'] with version v4.0
Registering Task:                workflows.sagemaker_xgboost_hpo.convert_to_sagemaker_csv
Registering Task:                workflows.sagemaker_xgboost_hpo.untar_xgboost
Registering Task:                workflows.sagemaker_xgboost_hpo.xgtrainer_task
Registering Workflow:            workflows.sagemaker_xgboost_hpo.StructuredSagemakerXGBoostHPO
Registering Launch Plan:         workflows.sagemaker_xgboost_hpo.fit_lp
Registering Task:                workflows.diabetes_xgboost.metrics
Registering Task:                workflows.diabetes_xgboost.get_traintest_splitdatabase
Registering Task:                workflows.diabetes_xgboost.predict
Registering Workflow:            workflows.diabetes_sagemaker_xgboost.DiabetesXGBoostModelOptimizer
Registering Launch Plan:         workflows.diabetes_sagemaker_xgboost.DiabetesXGBoostModelOptimizer
Registering Task:                workflows.diabe

## PART I: Non Hyper parameters tuned Model Training
Trains an XGBoost model with static hyper parameters. the model runs as one python task
`Detailed Example in` demo/workflows/diabetes_xgboost.py

### Flyte supports Schematized inputs / Strictly typed
Since we are working with a specific dataset, we will create a strictly typed schema for the dataset.
If we wanted a generic data splitter we could use a Generic schema without any column type and name information
Example file: https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv
CSV Columns
1. Number of times pregnant
2. Plasma glucose concentration a 2 hours in an oral glucose tolerance test
3. Diastolic blood pressure (mm Hg)
4. Triceps skin fold thickness (mm)
5. 2-Hour serum insulin (mu U/ml)
6. Body mass index (weight in kg/(height in m)^2)
7. Diabetes pedigree function
8. Age (years)
9. Class variable (0 or 1)

Example Row: 6,148,72,35,0,33.6,0.627,50,1

In [20]:
from flytekit.sdk.types import Types
TYPED_COLUMNS = [
    ('#preg', Types.Integer),
    ('pgc_2h', Types.Integer),
    ('diastolic_bp', Types.Integer),
    ('tricep_skin_fold_mm', Types.Integer),
    ('serum_insulin_2h', Types.Integer),
    ('bmi', Types.Float),
    ('diabetes_pedigree', Types.Float),
    ('age', Types.Integer),
    ('class', Types.Integer),
]
# the input dataset schema
DATASET_SCHEMA = Types.Schema(TYPED_COLUMNS)
# the first 8 columns are features
FEATURES_SCHEMA = Types.Schema(TYPED_COLUMNS[:8])
# the last column is the class
CLASSES_SCHEMA = Types.Schema([TYPED_COLUMNS[-1]])

### How to write a Task: Fit Task
A Simple python function that takes Features in Some Feature Schema and Classes (columnar vector) and some static hyper parameters and fits an XGBoost model

In [21]:
from workflows.diabetes_xgboost import XGBoostModelHyperparams
from flytekit.sdk.tasks import python_task, outputs, inputs
from xgboost import XGBClassifier
import xgboost as xgb
import pickle
import pandas as pd

@inputs(x=FEATURES_SCHEMA, y=CLASSES_SCHEMA, hyperparams=Types.Generic)  # TODO support arbitrary jsonifiable classes
@outputs(model=Types.Blob)
@python_task(cache_version='2.0', cache=True, memory_limit="200Mi")
def fit(ctx, x, y, hyperparams, model):
    """
    This function takes the given input features and their corresponding classes to train a XGBClassifier.
    NOTE: We have simplified the number of hyper parameters we take for demo purposes
    """
    with x as r:
        x_df = r.read()
    with y as r:
        y_df = r.read()

    hp = XGBoostModelHyperparams.from_dict(hyperparams)
    # fit model no training data
    m = XGBClassifier(n_jobs=hp.n_jobs, max_depth=hp.max_depth, n_estimators=hp.n_estimators, booster=hp.booster,
                      objective=hp.objective, learning_rate=hp.learning_rate)
    m.fit(x_df, y_df)

    # TODO model Blob should be a file like object
    fname = "model.pkl"
    with open(fname, "wb") as f:
        pickle.dump(m, f)
    model.set(fname)

### How to write a workflow: Workflow (pipeline)
Creates a pipeline that
`Gets Data & Split into training and validation`
 -> `Fit`
 -> `Predict`
 -> `Compute metrics`

In [30]:
from flytekit.sdk.workflow import workflow_class, Output, Input
from workflows.diabetes_xgboost import get_traintest_splitdatabase, predict, metrics, fit
from workflows.sagemaker_xgboost_hpo import fit_lp as SagemakerXGBoostHPO
    
@workflow_class
class DiabetesXGBoostModelTrainer(object):
    """
    This pipeline trains an XGBoost mode for any given dataset that matches the schema as specified in
    https://github.com/jbrownlee/Datasets/blob/master/pima-indians-diabetes.names.
    """

    # Inputs dataset, fraction of the dataset to be split out for validations and seed to use to perform the split
    dataset = Input(Types.CSV, default=Types.CSV.create_at_known_location(
        "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"),
                    help="A CSV File that matches the format https://github.com/jbrownlee/Datasets/blob/master/pima-indians-diabetes.names")

    test_split_ratio = Input(Types.Float, default=0.33, help="Ratio of how much should be test to Train")
    seed = Input(Types.Integer, default=7, help="Seed to use for splitting.")

    # Split the dataset
    split = get_traintest_splitdatabase(dataset=dataset, seed=seed, test_split_ratio=test_split_ratio)

    # XGBoost training task
    #fit_task = fit(x=split.outputs.x_train, y=split.outputs.y_train, hyperparams=XGBoostModelHyperparams(max_depth=4,).to_dict())
    
    # This is the alternate task that performs Hyper parameter tuning
    fit_task = SagemakerXGBoostHPO(train_data=split.outputs.x_train, train_target=split.outputs.y_train, validation_data=split.outputs.x_test, validation_target=split.outputs.y_test)
    
    # Prediction task
    predicted = predict(model_ser=fit_task.outputs.model, x=split.outputs.x_test)
    
    # Score calculation task
    score_task = metrics(predictions=predicted.outputs.predictions, y=split.outputs.y_test)

    # Outputs: joblib seralized model and accuracy of the model
    model = Output(fit_task.outputs.model, sdk_type=Types.Blob)
    accuracy = Output(score_task.outputs.accuracy, sdk_type=Types.Float)

DiabetesXGBoostModelTrainer_lp = DiabetesXGBoostModelTrainer.create_launch_plan()

### Now let us register the workflow
We will skip talking about launch plans for now. 

In [31]:
#DiabetesXGBoostModelTrainer.register(project, domain, "DiabetesXGBoostModelTrainer", version)
#DiabetesXGBoostModelTrainer_lp.register(project, domain, "DiabetesXGBoostModelTrainer", version)
DiabetesXGBoostModelTrainer.register(project, domain, "DiabetesXGBoostModelTrainer_HPO", version)
DiabetesXGBoostModelTrainer_lp.register(project, domain, "DiabetesXGBoostModelTrainer_HPO", version)

'lp:aws-demo:development:DiabetesXGBoostModelTrainer_HPO:v4.0'

### Lets start an execution
We will use the dataset available here "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"

In [32]:
execution = DiabetesXGBoostModelTrainer_lp.execute(project, domain, inputs={
    "dataset": "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"})
print("Started execution: http://ac9bf898207d811ea83c716283142b67-1154220099.us-east-1.elb.amazonaws.com/console/projects/{}/domains/{}/executions/{}".format(project, domain, execution.id.name))

Started execution: http://ac9bf898207d811ea83c716283142b67-1154220099.us-east-1.elb.amazonaws.com/console/projects/aws-demo/domains/development/executions/f1eaf18a93dbd4395b02


### Now lets wait for the execution to complete and retrieve information
The execution is unique identified with the name given. Lets print the outputs received and the accuracy

In [33]:
from flytekit.common.workflow_execution import SdkWorkflowExecution
execution = SdkWorkflowExecution.fetch(project, domain, execution.id.name)

execution.wait_for_completion()
print("Workflow produced outputs => {}".format(execution.outputs.keys()))

print("Model Training computed accuracy => {}".format(execution.outputs["accuracy"]*100))

Workflow produced outputs => dict_keys(['accuracy', 'model'])
Model Training computed accuracy => 66.14173228346458


### Lets retrieve the model and we can run predictions
Flyte stores the model as a blob and the API provides a handy way of retrieveing it.

In [34]:
# Download the model locally to run predictions
execution.outputs["model"].download("model", overwrite=True)
print("Downloaded the generated model.")
with open("model", "rb") as f:
        model = pickle.load(f)
print("Model {} loaded, ready to predict!")

Downloaded the generated model.
Model {} loaded, ready to predict!


### Lets use the model for prediction
Flyte makes it easy to retrieve the model as shown in previous cell, and now it can be loaded in this cases using pickle and lets perform a sample classification

In [35]:
import pandas as pd
df = pd.DataFrame({'#preg': [6], 'pgc_2h': [148], 'diastolic_bp': [72], 'tricep_skin_fold_mm': [35], 'serum_insulin_2h': [0], 'bmi': [33.6], 'diabetes_pedigree': [0.627], 'age': [50]})
booster = model
if not isinstance(model, xgb.core.Booster):
    booster = model.get_booster()
v = booster.predict(data=xgb.DMatrix(df, feature_names=booster.feature_names))
if v > 0.75:
    print("Class: 1, diabetes likely - {}%".format(v[0]*100))
else:
    print("Class: 0, diabetes unlikely {}%".format(v[0]*100))

Class: 1, diabetes likely - 86.44155859947205%


### WHY did they run so fast?