# Nuclio
## Unified Data batching & Agg function

In [2]:
# nuclio: ignore
import nuclio

## Environment

### Base config

In [3]:
%%nuclio config

# Kafka Trigger
# spec.triggers.hakafka.kind = "kafka"
# spec.trigger.url = "1.1.1.1"
# spec.triggers.hakafka.attributes.topic = "haproxy"
# spec.triggers.hakafka.attributes.partitions = [0, 1, 2]
# spec.triggers.hakafka.attributes.sasl.enable: true
# spec.triggers.hakafka.attributes.sasl.user: ""
# spec.triggers.hakafka.attributes.sasl.password: ""

# HTTP Trigger      
spec.triggers.hahttp.kind="http"
spec.triggers.hahttp.maxWorkers=1
spec.triggers.hahttp.attributes.port=31001

# Ingestion verifyer 
spec.triggers.verifybatch.kind = "cron"
spec.triggers.verifybatch.attributes.interval = "1m"

# Base image
spec.build.baseImage = "rapidsai/rapidsai:cuda10.0-runtime-centos7"

%nuclio: setting spec.triggers.hahttp.kind to 'http'
%nuclio: setting spec.triggers.hahttp.maxWorkers to 1
%nuclio: setting spec.triggers.hahttp.attributes.port to 31001
%nuclio: setting spec.triggers.verifybatch.kind to 'cron'
%nuclio: setting spec.triggers.verifybatch.attributes.interval to '1m'
%nuclio: setting spec.build.baseImage to 'rapidsai/rapidsai:cuda10.0-runtime-centos7'


### Build commands

In [4]:
%%nuclio cmd
# None Needed at the moment

### Env variables

In [5]:
%nuclio env SINK_PATH=./sink
%nuclio env BATCHING_TIME_IN_SECONDS=60
%nuclio env METRIC_NAMES=cpu_utilization,latency,packet_loss,throughput

%nuclio: setting 'SINK_PATH' environment variable
%nuclio: setting 'BATCHING_TIME_IN_SECONDS' environment variable
%nuclio: setting 'METRIC_NAMES' environment variable


## Function

In [6]:
import os
import glob
from datetime import datetime, timedelta
import time
import cudf
import itertools
import json

## Helper functions

In [7]:
def add_log_to_batch(context, log):
    # No need to marshall json since that will happen when saving the batch
    context.batch.append(log)

In [8]:
def reset_batch(context):
    
    # Reset log list
    context.batch = list()
    
    # Reset batch end time
    context.batch_end_time += context.batch_interval

In [9]:
def _batch_to_df(context):
    '''
        Turns a json-string array to a full pandas dataframe
    '''
    df = cudf.read_json('\n'.join(context.batch), lines=True)
    df = df.reset_index(drop=True)
    return df

In [75]:
def df_to_parquet(context, df):
    filename = f'{time.time()}.parquet'
    filepath = os.path.join(context.sink, filename)

    new_cols = list(df.columns)
    new_index = [f'{e[0]}_{e[1]}' for e in new_cols]
    print(new_cols)
    print(new_index)
    df.columns = new_index
    
    df.to_pandas().to_parquet(filepath)

### Main function code

In [11]:
def init_context(context):
        sink = os.getenv('SINK_PATH', './sink')
        setattr(context, 'sink', sink)
        
        # Verify sink is available
        os.makedirs(context.sink, exist_ok=True)
        
        # Expose metric names
        metric_names = os.environ['METRIC_NAMES']
        metric_names = metric_names.split(',')
        setattr(context, 'metric_names', metric_names)
        
        batch_interval = int(os.getenv('BATCH_INTERVAL_IN_SECONDS', '60'))
        batch_interval = timedelta(seconds=batch_interval)
        setattr(context, 'batch_interval', batch_interval)
        
        batch_end_time = datetime.now() + batch_interval
        setattr(context, 'batch_end_time', batch_end_time)
        
        batch = list()
        setattr(context, 'batch', batch)

In [12]:
def handler(context, event):      
    add_log_to_batch(context, event.body)
    
    if datetime.now() >= context.batch_end_time:
        df = _batch_to_df(context)
        if not df.empty:
            df = df.groupby(['company']).\
                        agg({k: ['min', 'max', 'mean'] for k in context.metric_names})
        df_to_parquet(context, df)
        reset_batch(context)

## Test

In [13]:
# nuclio : ignore
init_context(context)

In [76]:
# nuclio : ignore
event = nuclio.Event(body='{"company":"Rios__Pope_and_Baird","cpu_utilization":70.6942165035,"cpu_utilization_is_error":false,"latency":3.1373003261,"latency_is_error":false,"packet_loss":0.0,"packet_loss_is_error":false,"throughput":249.7207880994,"throughput_is_error":false,"timestamp":1563795193534}')
out = handler(context, event)
out

[('cpu_utilization', 'min'), ('cpu_utilization', 'max'), ('cpu_utilization', 'mean'), ('latency', 'min'), ('latency', 'max'), ('latency', 'mean'), ('packet_loss', 'min'), ('packet_loss', 'max'), ('packet_loss', 'mean'), ('throughput', 'min'), ('throughput', 'max'), ('throughput', 'mean')]
['cpu_utilization_min', 'cpu_utilization_max', 'cpu_utilization_mean', 'latency_min', 'latency_max', 'latency_mean', 'packet_loss_min', 'packet_loss_max', 'packet_loss_mean', 'throughput_min', 'throughput_max', 'throughput_mean']
                      cpu_utilization_min  cpu_utilization_max  \
company                                                          
Rios__Pope_and_Baird            70.694217            70.694217   

                      cpu_utilization_mean  latency_min  latency_max  \
company                                                                
Rios__Pope_and_Baird             70.694217       3.1373       3.1373   

                      latency_mean  packet_loss_min  packet_loss

## Deploy (If a nuclio cluster is available)

In [None]:
%nuclio deploy -p nvidia -n batch_and_agg -c