# Nuclio - Training function

## Environment

In [13]:
# nuclio: ignore
import nuclio

### Configurations

In [2]:
%%nuclio config

# Trigger
spec.triggers.retrain.kind = "cron"
spec.triggers.retrain.attributes.interval = "1h"

# Base image
spec.build.baseImage = "python:3.6-jessie"

%nuclio: setting spec.triggers.retrain.kind to 'cron'
%nuclio: setting spec.triggers.retrain.attributes.interval to '1h'
%nuclio: setting spec.build.baseImage to 'python:3.6-jessie'


### Commands

In [69]:
%%nuclio cmd 

############
# installs #
############

# Utils
pip install pyyaml
pip install pyarrow --upgrade
pip install fastparquet
pip install pandas
pip install joblib

# Igz DB
pip install v3io_frames --upgrade

# Function
pip install scikit-learn==0.20.1
pip install xgboost --upgrade
pip install dask-ml["complete"] --upgrade
pip install cuml

Requirement already up-to-date: pyarrow in /conda/envs/rapids/lib/python3.6/site-packages (0.13.0)
Requirement already up-to-date: v3io_frames in /conda/envs/rapids/lib/python3.6/site-packages (0.5.4)
Requirement already up-to-date: xgboost in /conda/envs/rapids/lib/python3.6/site-packages (0.90)
Requirement already up-to-date: dask-ml[complete] in /conda/envs/rapids/lib/python3.6/site-packages (0.13.0)


In [14]:
%env V3IO_FRAMESD=''
%env V3IO_USERNAME=''
%env V3IO_ACCESS_KEY=''

env: V3IO_FRAMESD=''
env: V3IO_USERNAME=''
env: V3IO_ACCESS_KEY=''


### Variables

In [172]:
%%nuclio env

# DB Config
V3IO_FRAMESD=${V3IO_FRAMESD}
V3IO_USERNAME=${V3IO_USERNAME}
V3IO_ACCESS_KEY=${V3IO_ACCESS_KEY}

# Features
# FEATURES_TABLE=/v3io/bigdata/netops_features_parquet
FEATURES_TABLE=../netops_features
FROM_TSDB=0

# Training
TRAIN_ON_LAST=1d
TRAIN_SIZE=0.7

# Parallelizem
NUMBER_OF_SHARDS=4

# Model
MODEL_FILENAME=netops.v3.model
SAVE_TO=../models

%nuclio: setting 'V3IO_FRAMESD' environment variable
%nuclio: setting 'V3IO_USERNAME' environment variable
%nuclio: setting 'V3IO_ACCESS_KEY' environment variable
%nuclio: setting '# FEATURES_TABLE' environment variable
%nuclio: setting 'FEATURES_TABLE' environment variable
%nuclio: setting 'FROM_TSDB' environment variable
%nuclio: setting 'TRAIN_ON_LAST' environment variable
%nuclio: setting 'TRAIN_SIZE' environment variable
%nuclio: setting 'NUMBER_OF_SHARDS' environment variable
%nuclio: setting 'MODEL_FILENAME' environment variable
%nuclio: setting 'SAVE_TO' environment variable


%nuclio: cannot find "=" in line
%nuclio: cannot find "=" in line
%nuclio: cannot find "=" in line
%nuclio: cannot find "=" in line
%nuclio: cannot find "=" in line


## Function

### Imports

In [48]:
# Utils
import os
import time
import yaml
import pandas as pd
import datetime
import itertools
import pickle
import joblib

# DB Connection
import v3io_frames as v3f
import cudf

# Parallelization
from dask_cuda import LocalCUDACluster
import dask_cudf
from dask.distributed import Client
import dask.dataframe as dd

# Function
import dask_ml.model_selection as dcv
import xgboost as xgb
from sklearn.model_selection import train_test_split

### Helper functions

In [17]:
def format_df_from_tsdb(context, df):
    df.index.names = ['timestamp', 'company', 'data_center', 'device']
    df = df.reset_index()
    df = dd.from_pandas(df, npartitions=context.shards)
    return df

In [18]:
def get_data_tsdb(context):
    df = context.v3f.read(backend='tsdb', query=f'select * from {context.features_table}',
                          start=f'now-{context.train_on_last}', end='now', multi_index=True)
    df = df.reset_index(drop=True)
    df = df[sorted(df.columns)]
    dd.from_pandas
    df = dd.from_pandas(df, npartitions=context.shards)
    return df

In [65]:
def get_data_parquet(context):
    # Get parquet files
    mpath = [os.path.join(context.features_table, file) for file in os.listdir(context.features_table)]
    
    # Get latest filename
    latest = max(mpath, key=os.path.getmtime)
    print(latest)
    context.logger.debug(f'Reading data from: {latest}')
    
    # Load parquet to dask
#     df = dd.read_parquet(latest, engine='fastparquet')
    df = cudf.read_parquet(latest)
    
    return df

In [176]:
def get_train_test_sets_from_data(context, df):
    drop_columns = [col for col in df.columns if 'is_error' in col]
    index_columns = ['timestamp', 'company', 'data_center', 'device']
    X = df.drop(drop_columns + index_columns, axis=1)
    X = X[sorted(X.columns)]
    y = df.loc[:, ['is_error']]['is_error'].astype(bool)
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=context.train_size, test_size=1-context.train_size)
    return X_train, X_test, y_train, y_test

### Init context

In [131]:
def init_context(context):
    
    # Netops features table
    setattr(context, 'features_table', os.getenv('FEATURES_TABLE', 'netops_features'))
    
    # Get saving configuration
    is_from_tsdb = (int(os.getenv('FROM_TSDB', 1)) == 1)
    
    # Save to TSDB
    if is_from_tsdb:
        # Create our DB client
        v3io_client = v3f.Client(address='http://' + os.getenv('V3IO_FRAMESD', 'framesd:8081'), 
                            container='bigdata', 
                            password=os.environ['V3IO_ACCESS_KEY'], 
                            user=os.environ['V3IO_USERNAME'])
        setattr(context, 'v3f', v3io_client)
        
        # Create features table if neede
        context.v3f.create('tsdb', context.features_table, attrs={'rate': '1/s'}, if_exists=1)
        
        # Set TSDB reading function
        setattr(context, 'read', get_data_tsdb)
        
    # Save to Parquet
    else:
         # Create saving directory if needed
        filepath = os.path.join(context.features_table)
        if not os.path.exists(filepath):
            os.makedirs(filepath)
            
        # Set Parquet reading function
        setattr(context, 'read', get_data_parquet)
    
    # Set time to train on
    train_on_last = os.getenv('TRAIN_ON_LAST', '7d')
    setattr(context, 'train_on_last', train_on_last)
    
    # Set training set size
    train_set_size = float(os.getenv('TRAIN_SIZE', 0.7))
    setattr(context, 'train_size', train_set_size)
    
    # Dask shards / CV
    setattr(context, 'shards', int(os.getenv('NUMBER_OF_SHARDS', 4)))
    
    # Create save-to folder if needed
    model_filepath = os.getenv('SAVE_TO', '/v3io/bigdata/netops/models')
    if not os.path.exists(model_filepath):
        os.makedirs(model_filepath)
    setattr(context, 'model_filepath', os.path.join(model_filepath, os.getenv('MODEL_FILENAME', 'netops.model')))

### Handler

In [169]:
def handler(context, event):
    
    # Get data
    df = context.read(context) 

    # Split to Train / Test datasets
    X_train, X_test, y_train, y_test = get_train_test_sets_from_data(context, df)
    
    train = xgb.DMatrix(data=X_train.as_gpu_matrix(), label=y_train)
    test = xgb.DMatrix(data=X_test.as_gpu_matrix(), label=y_test)
        
    # Train
    params = {'objective': 'binary:logistic', # Specify multiclass classification
             'tree_method': 'gpu_hist' # Use GPU accelerated algorithm
    }
    
    res = {}
    model = xgb.train(params, train, evals=[(test, 'test')], evals_result=res)
    print(res)
    context.logger.debug(f'test results: {res}')
    
    # Save model
    pickle.dump(model, open(context.model_filepath + '.pickle', 'wb'))

## Test

In [177]:
# nuclio: ignore
init_context(context)

In [178]:
# nuclio: ignore
# init_context(context)
event = nuclio.Event(body='')
output = handler(context, event)
output

../netops_features/20190606T080717-20190607T080527.parquet
[0]	test-error:0.046933
[1]	test-error:0.044132
[2]	test-error:0.044567
[3]	test-error:0.042032
[4]	test-error:0.04167
[5]	test-error:0.041283
[6]	test-error:0.040945
[7]	test-error:0.039714
[8]	test-error:0.039038
[9]	test-error:0.038942
{'test': {'error': [0.046933, 0.044132, 0.044567, 0.042032, 0.04167, 0.041283, 0.040945, 0.039714, 0.039038, 0.038942]}}


## Deployment

In [13]:
%nuclio deploy -p netops -n training -c

%nuclio: ['deploy', '-p', 'netops', '-n', 'training', '-c', '/User/netops/tutorials/demos/netops/Nuclio-Training.ipynb']
%nuclio: [nuclio.deploy] 2019-05-20 10:54:16,684 (info) Building processor image
%nuclio: [nuclio.deploy] 2019-05-20 10:54:24,902 (info) Pushing image
%nuclio: [nuclio.deploy] 2019-05-20 10:54:24,902 (info) Build complete
%nuclio: [nuclio.deploy] 2019-05-20 10:54:36,603 done updating training, function address: 3.122.56.83:32342
%nuclio: function deployed
