## Data Prep

## Function

### Imports

In [1]:
# Utils
import os
import time
import pandas as pd
import itertools

# DB Connection
import v3io_frames as v3f

# Parallelization
import dask.dataframe as dd
from dask.distributed import Client
from mlrun import get_or_create_ctx

In [2]:
mlruncontext = get_or_create_ctx('data-prep')

[mlrun] 2020-03-17 20:00:41,001 logging run results to: http://10.196.67.76:8080


In [8]:
# Create Dask client
dask = Client()

# Dask shards / CV
shards =  int(mlruncontext.get_param('NUMBER_OF_SHARDS', 4))

# Netops metrics table
metrics_table = os.path.join(str(mlruncontext.get_input('DATA_DIR', os.getenv('DATA_DIR','/netpp'))),
                             mlruncontext.get_param('metrics_table', os.getenv('metrics_table','netops_metrics_parquet')))

# Netops feautres table
features_table =  os.path.join(str(mlruncontext.get_param('NETAPP_MOUNT_PATH',os.getenv('NETAPP_MOUNT_PATH','/netapp'))),
                               mlruncontext.get_param('FEATURES_TABLE', os.getenv('FEATURES_TABLE','netops_features_parquet')))

/netpp/netops_metrics


In [None]:
 # Create saving directory if needed
filepath = os.path.join(features_table)
if not os.path.exists(filepath):
    os.makedirs(filepath)

In [23]:
def format_df_from_tsdb(df):
    df.index.names = ['timestamp', 'company', 'data_center', 'device']
    df = df.reset_index()
    df = dd.from_pandas(df, npartitions=shards)
    return df

In [13]:
def get_data_parquet():
    # Get parquet files
    mpath = [os.path.join(metrics_table, file) for file in os.listdir(metrics_table)]
    
    # Get latest filename
    latest = max(mpath, key=os.path.getmtime)
    
    # Load parquet
    df = pd.read_parquet(latest)
    # To Dask
    df = format_df_from_tsdb(df)
    return df

In [14]:
def create_rolling_featuers(df, window_size: int):
    features = df.copy()
    features['key'] = features.apply(lambda row: f'{row["company"]}_{row["data_center"]}_{row["device"]}', axis=1, meta=features.compute().dtypes)
    features.set_index('key')
    features["cpu_utilization"] = features.cpu_utilization.rolling(window=window_size).mean()
    features["latency"] = features.latency.rolling(window=window_size).mean()
    features["packet_loss"] = features.packet_loss.rolling(window=window_size).mean()
    features["throughput"] = features.throughput.rolling(window=window_size).mean()
    features["is_error"] = features.is_error.rolling(window=window_size).max()
                                     
    features = features.dropna()
    features = features.drop_duplicates()

    return features

In [15]:
def set_indexes(df):
    df = df.set_index(['timestamp', 'company', 'data_center', 'device'])
    return df

In [17]:
def save_to_parquet(df: pd.DataFrame):
    print('Saving features to Parquet')
    
    # Need to fix timestamps from ns to ms if we write to parquet
    df = df.reset_index()
    df['timestamp'] = df.loc[:, 'timestamp'].astype('datetime64[ms]')
    
    # Fix indexes
    df= set_indexes(df)
    
    # Save parquet
    first_timestamp = df.index[0][0].strftime('%Y%m%dT%H%M%S')
    last_timestamp = df.index[-1][0].strftime('%Y%m%dT%H%M%S')
    filename = first_timestamp + '-' + last_timestamp + '.parquet'
    filepath = os.path.join(features_table, filename)
    with open(filepath, 'wb+') as f:
        df.to_parquet(f)

### Init context

### Handler

In [19]:
def handler(context,event):
    # Get data
    raw = get_data_parquet() 
        
    # Get minute features
    minute = create_rolling_featuers(raw, 3)
    column_names = {'cpu_utilization': 'cpu_utilization_minutely',
                    'latency': 'latency_minutely',
                    'packet_loss': 'packet_loss_minutely',
                    'throughput': 'throughput_minutely'}
    minute = minute.rename(columns=column_names)
    
    # Get hour features
    hour = create_rolling_featuers(raw, 3*60)
    column_names = {'cpu_utilization': 'cpu_utilization_hourly',
                    'latency': 'latency_hourly',
                    'packet_loss': 'packet_loss_hourly',
                    'throughput': 'throughput_hourly'}
    hour = hour.rename(columns=column_names)
    
    # Create feature vector from data sources
    features_rm = raw.merge(minute, on=['timestamp', 'company', 'data_center', 'device'], suffixes=('_raw', '_minute'))
    features_rm.compute()
    
    features = features_rm.merge(hour, on=['timestamp', 'company', 'data_center', 'device'], suffixes=('_raw', '_hourly'))
    features = features.compute()
    
    # Save feature vector to TSDB
    
    # Drop key columns
    features = features.reset_index(drop=True)
    feature_cols = [col for col in features.columns if 'key' in col]
    features = features.drop(feature_cols, axis=1)
    
    
    # Fix indexes before saving
    features = features.set_index(['timestamp', 'company', 'data_center', 'device'])
    
    # Save to TSDB
    save_to_parquet(features)

In [20]:
# nuclio: end-code