# Nuclio - Data preperation function

## Environment

In [1]:
# nuclio: ignore
import nuclio

### Configurations

In [2]:
%%nuclio config

# Trigger
spec.triggers.retrain.kind = "cron"
spec.triggers.retrain.attributes.interval = "1h"

# Base image
spec.build.baseImage = "python:3.6-jessie"

%nuclio: setting spec.triggers.retrain.kind to 'cron'
%nuclio: setting spec.triggers.retrain.attributes.interval to '1h'
%nuclio: setting spec.build.baseImage to 'python:3.6-jessie'


### Commands

In [3]:
%%nuclio cmd -c

############
# installs #
############

# Utils
pip install pyarrow
pip install pandas

# Igz DB
pip install v3io_frames --upgrade

# Function
pip install dask["complete"]
pip install 'fsspec>=0.3.3'

## Function

### Imports

In [4]:
# Utils
import os
import time
import pandas as pd
import itertools

# DB Connection
import v3io_frames as v3f

# Parallelization
import dask.dataframe as dd
from dask.distributed import Client

### Helper functions

In [5]:
def format_df(context, df, shards):
    df.index.names = ['timestamp', 'company', 'data_center', 'device']
    df = df.reset_index()
    df = dd.from_pandas(df, npartitions=shards)
    return df

In [6]:
def get_data_tsdb(context, metrics_table, shards):
    df = context.v3f.read(backend='tsdb', query=f'select cpu_utilization, latency, packet_loss, throughput, is_error from {metrics_table}',
                          start=f'now-2h', end='now', multi_index=True)
    df = format_df(context, df, shards)
    return df

In [7]:
def get_data_parquet(context, metrics_table, shards):
    # Get parquet files
    mpath = [os.path.join(metrics_table, file) for file in os.listdir(metrics_table)]
    
    # Get latest filename
    latest = max(mpath, key=os.path.getmtime)
    
    # Load parquet
    df = pd.read_parquet(latest)
    
    # To Dask
    df = format_df(context, df, shards)
    return df

In [8]:
def save_to_tsdb(context, features: pd.DataFrame, features_table):   
    context.v3f.write('tsdb', features_table, features)

In [38]:
def save_to_parquet(context, df: pd.DataFrame, features_table):
    print('Saving features to Parquet')
    
    # Need to fix timestamps from ns to ms if we write to parquet
    df = df.reset_index()
    df['timestamp'] = df.loc[:, 'timestamp'].astype('datetime64[ms]')
    df.set_index('timestamp')
    
    # Save parquet
    context.logger.info(df['timestamp'].min().compute())
    context.logger.info(df['timestamp'].min().compute().strftime('%Y%m%dT%H%M%S'))
    first_timestamp = df['timestamp'].min().compute().strftime('%Y%m%dT%H%M%S')
    last_timestamp = df['timestamp'].max().compute().strftime('%Y%m%dT%H%M%S')
    filename = first_timestamp + '-' + last_timestamp #+ '.parquet'
    filepath = os.path.join(features_table, filename)
    try:
        df.to_parquet(filepath, engine='pyarrow')
        context.log_artifact('features', local_path=filepath)
    except Exception as e:
        context.logger.info(f'Could not write Parquet results\n{e}')

### Handler

In [39]:
def preprocessor(context, 
            windows={'minutely': 3, 
                     'hourly': 3*60},
            metrics=[],
            labels=[],
            save_to_tsdb=0,
            metrics_table='/v3io/bigdata/netops_metrics_parquet',
            features_table='/v3io/bigdata/netops_features_parquet',
            dask_shards=4):
    
    # Setup context   
    if save_to_tsdb:
        # Create V3IO connection
        v3io_client = v3f.Client(address='framesd:8081', container='bigdata')
        setattr(context, 'v3f', v3io_client)
        
        # Create features table if neede
        context.v3f.create('tsdb', 
                           features_table, 
                           attrs={'rate': '1/s'}, 
                           if_exists=1)
    
        # Set TSDB reading & saving function
        setattr(context, 'read', get_data_tsdb)
        setattr(context, 'write', save_to_tsdb)
    
    # Save to Parquet
    else:
         # Create saving directory if needed
        filepath = os.path.join(features_table)
        if not os.path.exists(filepath):
            os.makedirs(filepath)
            
        # Set Parquet reading & saving function
        setattr(context, 'read', get_data_parquet)
        setattr(context, 'write', save_to_parquet)
        
    # Setup Dask
    dask_client = Client()    
    
    df = context.read(context, metrics_table, dask_shards)
    
    df['key'] = df.apply(lambda row: f'{row["company"]}_{row["data_center"]}_{row["device"]}', axis=1, meta=df.compute().dtypes)
    df.set_index('key')
    
    # Create rolling features per defined windows
    for name, duration in windows.items():
        for metric in metrics:
            df[f'{metric}_{duration}'] = df[metric].rolling(window=duration).mean()
        for label in labels:
            df[f'{label}_{duration}'] = df[label].rolling(window=duration).max()

        df = df.dropna()
        df = df.drop_duplicates()
    
    # Drop key columns
    df = df.reset_index(drop=True)
    
    # Save to TSDB
    context.write(context, df, features_table)
    
    # Close connection
    dask_client.close()

In [29]:
# nuclio: end-code

## Test locally

In [None]:
preprocessor(context)

## Deploy to cluster

In [40]:
from mlrun import code_to_function, mount_v3io, mlconf

In [41]:
mlconf.dbpath = 'http://mlrun-api:8080'

In [42]:
preprocessing = code_to_function(name='netopspreprocessing',
                                 runtime='job',
                                 project='netops',
                                 handler='preprocessor')
preprocessing = preprocessing.apply(mount_v3io())

In [None]:
preprocessing.with_code().deploy()

In [45]:
params = {
    'windows': {'minutely': 3, 
                'hourly': 3*60},
    'metrics': ['cpu_utilization', 'throughput', 'latency', 'packet_loss'],
    'labels': ['is_error'],
    'save_to_tsdb': 0,
    'metrics_table': '/User/v3io/bigdata/netops_metrics_parquet',
    'features_table': '/User/v3io/bigdata/netops_features_parquet',
    'dask_shards': 4,
}

run = preprocessing.run(params=params, watch=True, handler='preprocessor')

[mlrun] 2020-01-06 10:20:13,935 starting run preprocessor uid=c843744c1bed49e9b056e15d3c3611f2  -> http://mlrun-api:8080
Saving features to Parquet
[mlrun] 2020-01-06 10:20:33,229 2020-01-02 07:47:01.982000
[mlrun] 2020-01-06 10:20:33,581 20200102T074701

[mlrun] 2020-01-06 10:20:35,901 run executed, status=completed
final state: succeeded


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...3611f2,0,Jan 06 10:20:29,completed,netopspreprocessing,host=preprocessor-wb9bjkind=jobowner=admin,,"dask_shards=4features_table=/User/netops_features_parquetlabels=['is_error']metrics=['cpu_utilization', 'throughput', 'latency', 'packet_loss']metrics_table=/User/v3io/bigdata/netops_metrics_parquetsave_to_tsdb=0windows={'hourly': 180, 'minutely': 3}",,features


to track results use .show() or .logs() or in CLI: 
!mlrun get run c843744c1bed49e9b056e15d3c3611f2  , !mlrun logs c843744c1bed49e9b056e15d3c3611f2 
[mlrun] 2020-01-06 10:20:43,289 run executed, status=completed
