In [1]:
import warnings
warnings.simplefilter("ignore")


import pandas as pd
import io
import requests
import sys
import json
import plotly.express as px
sys.path.append("../modules")
from data_manager import DataManager


import pyspark.ml
import pyspark.sql.functions as f
import pyspark.sql.types as t

from pyspark.ml.tuning import CrossValidator
from pyspark.mllib.evaluation import MulticlassMetrics

import toniq
import hyperopt
from hyperopt import hp

from functools import partial

import tempfile
from shutil import make_archive


# Setup Config

In [2]:
config = {
"verbose": True,
"gt_column": "income",

"hyperopt":
    {
    "metric": "fMeasure",
    "max_evals": 100,
    },
    
"mlflow":
    {
        "experiment_name": "EXP2",
        "tags": {"version": "0.1.0"} 
    },
"data":
    {
         mode: {"name": f"income_transformed_data_{mode}", "store": "feature", "partition": mode}
         for mode in ["train", "test"]
    
    }
}

## Initialize DataManager

In [3]:
dm = DataManager(provider="gcp")

s3_endpoint is 10.2.3.167:9000


## Load Features from DataManager from config['data']

In [4]:

dfs = {}
for mode, load_table_args in config["data"].items():
 dfs[mode]= dm.load_table(**load_table_args)

In [5]:

mlflow_client = toniq.MlflowClient()


def calculate_metrics( df):   
    """

    define your own metrics to evaluate cross validation

    :params:

    df: dataframe containing {aprediction} and {label} columns

    :returns:

    confusion matrix

    """

    # turn gt into label
    preds_and_labels = df.select('prediction',f.col('label').cast(t.FloatType()))
    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))


    # confusion matrix


    metrics_dict = dict(
        # unweighted measures
        tpr = metrics.truePositiveRate(label=1.0),
        fpr = metrics.falsePositiveRate(label=1.0),
        precision = metrics.precision(label=1.0),
        recall = metrics.recall(label=1.0),
        fMeasure = metrics.fMeasure(label=1.0)
    )


    metrics_dict= {k:round(v,3) if  k != "confusion" else v for k,v in metrics_dict.items()}


    return metrics_dict


def train_and_eval(model_config, args):
    
    '''update the model config'''
    config["model"] = model_config
    
    '''get the model'''
    model = getattr(getattr(pyspark.ml, model_config["type"]), model_config["name"])
    model = model(**model_config["params"])

    
    '''Initialize MLFLOW Experiment (If it does not exist)'''
    # get the experiment if it exists, otherwise it will return a None
    
    experiment= mlflow_client.get_experiment_by_name(config["mlflow"]["experiment_name"])

    if experiment:
        # if the experiment is not None, get the id
        experiment_id = experiment.experiment_id
    else:
        # if the experiment is None, create a new exerpiment and get the experiment by name
        experiment_id= mlflow_client.create_experiment(config["mlflow"]["experiment_name"])
        experiment= mlflow_client.get_experiment_by_name(config["mlflow"]["experiment_name"])

        
    """CREATE A NEW RUN"""
    current_run = mlflow_client.create_run(experiment_id)
    run_id = current_run.info.run_id
    
    
    '''FIT MODEL ON TRAINING DATASET'''
    model = model.fit(dfs["train"])
    
    '''INFERENCE ON TRAINING AND TESTING DATASET'''
    pred_dfs = {mode:model.transform(df) for mode, df in dfs.items()}
    
    
    '''CALCULATE MODEL METRICS FOR TRAINING/TESTING SETS'''
    metric_results= {mode: calculate_metrics(pred_df) for mode,pred_df in pred_dfs.items()}
                
    
    '''Store the sample predictions into Toniq Store, and save the sample_path'''
    config["metrics"] = metric_results
    
    
    '''Save Predictions as Artifact (Pandas Dictionary)'''
    
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as fp:
        # SAVE TEST PREDICTIONS ONLY FOR DEMO TO SAVE SPACE
        pred_dfs["test"].createOrReplaceTempView("pred_df")
        
        #To save as a pandas array to json , we need to use a udf to conver the vector into an array
        dm.spark.udf.register("TOARRAY", lambda v: v.toArray().tolist(), t.ArrayType(t.FloatType()))
        tmp_df = dm.spark.sql("""
            SELECT 
                label,
                TOARRAY(probability) as probability,
                prediction

            FROM pred_df
        """).toPandas()
        # save dataframe to json
        tmp_df.to_json(fp.name)
        
        # log the predictions as an artifact
        mlflow_client.log_artifact(run_id ,fp.name, artifact_path="predictions")
    
    '''Save Model as an Artifact'''
    with tempfile.TemporaryDirectory() as dp:
        # Note the mode is 'w' so json could be dumped
        # Note the suffix is .txt so the UI will show the file
        """write model to temprorary directory"""
        model.write().overwrite().save(dp)
        mlflow_client.log_artifact(run_id ,dp, artifact_path="model")
        
    
    '''Save Config of Experiment and Run '''
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as fp:
        # Note the mode is 'w' so json could be dumped
        # Note the suffix is .txt so the UI will show the file
        json.dump(config, fp)
        fp.seek(0)
        mlflow_client.log_artifact(run_id ,fp.name, artifact_path="config")
    
    
    '''Register Metrics in MLFLOW'''
    for mode in metric_results.keys():
        for metric_key, metric_val in metric_results[mode].items():
            mlflow_client.log_metric(run_id, f"{mode}-{metric_key}", metric_val)
    
    '''Register Model Parameters in MLFLOW'''
    for param_name, param_val in config["model"]["params"].items():
        mlflow_client.log_param(run_id,param_name, param_val)
            
    '''RETURN metric to Optimizer * (-) for minmizing negative (aka, maximize positive score)'''
    
    chosen_hyperopt_score = -metric_results["test"][config["hyperopt"]["metric"]]
    
    
    
    
    return chosen_hyperopt_score
    

In [None]:

# define a search space

'''
{
    'type': 'classification',
    'name': 'RandomForestClassifier',    
    'params': dict(maxDepth=10, maxBins=49, minInstancesPerNode=2, numTrees= 10)
    }
'''


space = hp.choice('model',      
    [
        {
            'type': 'classification',
            'name': 'RandomForestClassifier',    
            'params': {
                       "maxDepth":hp.quniform("maxDepth",5,10,1),
                       "maxBins":hp.quniform("maxBins", 45,60,1),
                       "minInstancesPerNode":hp.quniform("minInstancesPerNode", 40,60,1),
                       "numTrees":hp.quniform("numTrees", 40,60,1)
                      }
        },
        
        {
            'type': 'classification',
            'name': 'GBTClassifier',   
            
            
            'params': {
                       "maxDepth":hp.quniform("maxDepth_GBT",5,10,1),
                       "maxBins":hp.quniform("maxBins_GBT", 45,60,1),
                       "minInstancesPerNode":hp.quniform("minInstancesPerNode_GBT", 40,60,1),
                      }
        }
    ])

# minimize the objective over the space
from hyperopt import fmin, tpe
best = fmin(partial(train_and_eval, args=config), space, algo=tpe.suggest, max_evals=config["hyperopt"]["max_evals"])

print(best)
print("Best ", hyperopt.space_eval(space, best))

  6%|▌         | 6/100 [01:46<25:50, 16.49s/trial, best loss: -0.683]

In [None]:
ls /tmp/tmpnwcp94sq.pth/