# Labeled Stream Creator

In [1]:
import nuclio

In [17]:
%nuclio cmd -c python -m pip install v3io --upgrade

In [58]:
%%nuclio env
METRICS_TABLE = /User/demo-network-operations/streaming/metrics
PREDICTIONS_TABLE = /User/demo-network-operations/streaming/predictions
OUTPUT_STREAM = /users/admin/demo-network-operations/streaming/labels_stream
prediction_col = predictions
label_col = is_error

%nuclio: setting 'METRICS_TABLE' environment variable
%nuclio: setting 'PREDICTIONS_TABLE' environment variable
%nuclio: setting 'OUTPUT_STREAM' environment variable
%nuclio: setting 'prediction_col' environment variable
%nuclio: setting 'label_col' environment variable


## Function

In [59]:
# nuclio: start-code

In [60]:
import os
import pandas as pd
import json
import v3io
import v3io.dataplane

In [61]:
def split_path(mntpath=''):
    if mntpath[0] == '/':
        mntpath = mntpath[1:]
    paths = mntpath.split('/')
    container = paths[0]
    subpath = ''
    if len(paths) > 1:
        subpath = mntpath[len(container):]
    return container, subpath

In [62]:
def create_stream(context, path, shards=1):
    # create a stream w/8 shards
    container, stream_path = split_path(path)
    context.logger.info(f'Creating stream in Container: {container} & Path {stream_path}')
    response = context.v3io_client.create_stream(container=container,
                                        path=stream_path, 
                                        shard_count=shards,
                                        raise_for_status=v3io.dataplane.RaiseForStatus.never)
    response.raise_for_status([409, 204])

In [63]:
def push_to_stream(context, stream_path, data):
    records = json.loads(data.to_json(orient='records'))
    records = [{'data': json.dumps(record)} for record in records]
    context.logger.debug(f'Logging {len(records)} records, Record example: {records[0]}')
    container, stream_path = split_path(stream_path)
    response = context.v3io_client.put_records(container=container,
                                               path=stream_path, 
                                               records=records)

In [64]:
def get_data_parquet(table, files_to_select=1):
    mpath = [os.path.join(table, file) for file in os.listdir(table) if file.endswith(('parquet', 'pq'))]
    files_by_updated = sorted(mpath, key=os.path.getmtime, reverse=False)
    dfs = pd.concat([pd.read_parquet(file) for file in files_by_updated[:files_to_select]])
    return dfs

In [65]:
def init_context(context):
    setattr(context, 'metrics_table', os.environ['METRICS_TABLE'])
    setattr(context, 'predictions_table', os.environ['PREDICTIONS_TABLE'])
    setattr(context, 'output_stream', os.environ['OUTPUT_STREAM'])
    
    v3io_client = v3io.dataplane.Client(endpoint='http://v3io-webapi:8081', logger_verbosity='DEBUG', transport_verbosity='DEBUG')
    setattr(context, 'v3io_client', v3io_client)
    if context.trigger.kind == 'cron':
        create_stream(context, context.output_stream)
    
    setattr(context, 'label_col', os.environ['label_col'])
    setattr(context, 'prediction_col', os.environ['prediction_col'])

In [66]:
def handler(context, event):
    metrics = get_data_parquet(context.metrics_table, 2).loc[:, context.label_col].astype('int')
    predictions = get_data_parquet(context.predictions_table, 2).loc[:, context.prediction_col].astype('int')
    context.logger.debug(f'Labeling metrics ({metrics.shape}) and predictions ({predictions.shape})')
    
    full_df = pd.merge(left=metrics, right=predictions, left_index=True, right_index=True)
    full_df = full_df.reset_index()
    
    push_to_stream(context, context.output_stream, full_df)

In [12]:
# nuclio: end-code

## Test

In [254]:
init_context(context)

Python> 2020-07-13 12:33:09,977 [info] Creating stream in Container: users & Path /admin/demo-network-operations/streaming/labeled_stream


In [263]:
event = nuclio.Event(body='')
out = handler(context, event)
out

## Stream test

In [48]:
from v3io.dataplane import Client
from pprint import pprint

In [49]:
v3io_client = Client()

In [50]:
# v3io_client.delete_stream(container='users', path='/admin/demo-network-operations/streaming/labeled_stream')

In [51]:
def print_stream(path, shard='0', seek_type='EARLIEST', last=100):
    # seek the shard to the first record in it
    container, stream_path = split_path(path)
    shard_path = os.path.join(stream_path, shard)
    response = v3io_client.seek_shard(container=container,
                                      path=shard_path, 
                                      seek_type=seek_type)
    response.raise_for_status()

    # get records, starting from the location we got from seek
    response = v3io_client.get_records(container=container,
                                       path=shard_path, 
                                       location=response.output.location)
    response.raise_for_status()
    
    models = ['pagehinkley', 'eddm', 'ddm']
    result_record = response.output.records
    records = [json.loads(record.data) for record in result_record[:last]]
    pprint()

In [94]:
print_stream('/users/admin/demo-network-operations/streaming/labels_stream', seek_type='LATEST', last=2)

2020-07-28 10:08:49,841 [info] Disconnected while attempting to send. Recreating connection: {'e': <class 'BrokenPipeError'>}
2020-07-28 10:08:49,841 [info] Disconnected while attempting to send. Recreating connection: {'e': <class 'BrokenPipeError'>}
2020-07-28 10:08:49,843 [info] Disconnected while attempting to send. Recreating connection: {'e': <class 'BrokenPipeError'>}
2020-07-28 10:08:49,843 [info] Disconnected while attempting to send. Recreating connection: {'e': <class 'BrokenPipeError'>}
[]


## Deploy

In [80]:
from mlrun import code_to_function, mount_v3io

In [81]:
fn = code_to_function('labeled-stream-creator',
                      kind='nuclio',
                      project='network-operations')
fn.spec.base_spec['spec']['build']['baseImage'] = 'mlrun/ml-models'
fn.apply(mount_v3io())
fn.add_trigger('cron', nuclio.triggers.CronTrigger(interval='1m'))

<mlrun.runtimes.function.RemoteRuntime at 0x7f99bed8ecd0>

In [82]:
fn.save()
fn.export('../src/labeled_stream_creator.yaml')

> 2020-07-28 10:06:04,245 [debug] saving function: labeled-stream-creator, tag: 
> 2020-07-28 10:06:04,292 [info] function spec saved to path: ../src/labeled_stream_creator.yaml


<mlrun.runtimes.function.RemoteRuntime at 0x7f99bed8ecd0>

In [83]:
fn.deploy(project='network-operations')

> 2020-07-28 10:06:05,422 [info] deploy started
[nuclio] 2020-07-28 10:06:09,516 (info) Build complete
[nuclio] 2020-07-28 10:06:15,568 (info) Function deploy complete
[nuclio] 2020-07-28 10:06:15,573 done updating network-operations-labeled-stream-creator, function address: 3.128.191.176:32655
> 2020-07-28 10:06:15,574 [debug] saving function: labeled-stream-creator, tag: 


'http://3.128.191.176:32655'

In [39]:
df = pd.read_parquet('/User/demo-network-operations/artifacts/test_set_preds.parquet')
df

Unnamed: 0,cpu_utilization,latency,packet_loss,throughput,is_error,predictions
0,90.769881,6.404456,0.181525,234.612771,False,False
1,74.888627,0.000000,0.000000,265.233141,False,False
2,75.061580,0.000000,0.000000,191.831912,False,False
3,62.587761,0.000000,0.614938,275.569539,False,False
4,77.590353,2.637792,0.000000,259.586024,False,False
...,...,...,...,...,...,...
571,83.089233,1.145046,0.000000,244.012803,False,False
572,85.918216,13.216614,0.000000,246.553604,False,False
573,100.000000,3.697007,50.000000,251.470858,False,False
574,100.000000,100.000000,50.000000,253.348172,False,False


In [44]:
df.loc[:, ['is_error', 'predictions']].astype('int')

Unnamed: 0,is_error,predictions
0,0,0
1,0,0
2,0,0
3,0,0
4,0,0
...,...,...
571,0,0
572,0,0
573,0,0
574,0,0
