# Labeled Stream Creator

## Environment

In [46]:
import nuclio

In [67]:
import os

base_path = os.path.abspath('../')
base_stream_path = f'/users/' + os.environ['V3IO_USERNAME']+ f'{base_path[5:]}'
data_path = os.path.join(base_path, 'data')
src_path = os.path.join(base_path, 'src')
streaming_path = os.path.join(base_stream_path, 'streaming')
fs_streaming_path = os.path.join(base_path, 'streaming')

os.environ['base_path'] = base_path
os.environ['data_path'] = data_path
os.environ['src_path'] = src_path
os.environ['streaming_path'] = streaming_path
os.environ['fs_streaming_path'] = os.path.join(base_path, 'streaming')

os.environ['METRICS_TABLE'] = fs_streaming_path + '/metrics'
os.environ['PREDICTIONS_TABLE'] = fs_streaming_path+'/predictions'
os.environ['OUTPUT_STREAM'] = streaming_path+'/labeled_stream'
os.environ['prediction_col'] = 'predictions'
os.environ['label_col'] = 'is_error'
os.environ['output_stream_shards'] = '1'
os.environ['BATCHES_TO_GENERATE'] = '20'

## Function

In [68]:
# nuclio: start-code

In [69]:
import os
import pandas as pd
import json
import v3io
import v3io.dataplane
import socket

In [70]:
def split_path(mntpath=''):
    if mntpath[0] == '/':
        mntpath = mntpath[1:]
    paths = mntpath.split('/')
    container = paths[0]
    subpath = ''
    if len(paths) > 1:
        subpath = mntpath[len(container):]
    return container, subpath

In [71]:
def create_stream(context, path, shards=1):
    # create a stream w shards
    container, stream_path = split_path(path)
    context.logger.info(f'Creating stream in Container: {container} & Path {stream_path}')
    response = context.v3io_client.stream.create(container=container,
                                        stream_path=stream_path, 
                                        shard_count=shards,
                                        raise_for_status=v3io.dataplane.RaiseForStatus.never)
    response.raise_for_status([409, 204])

In [72]:
def push_to_stream(context, stream_path, data):
    def restructure_stream_event(context, event):
        instances = [dict()]
        for key in data.keys():
            if key not in ['when', 'class', 'model', 'worker', 'hostname', context.prediction_col]:
                instances[0].update({key: event.pop(key)})
        event['request'] = {'instances': instances}
        event['resp'] = [int(event.pop(context.prediction_col))]
        return event
    
    records = json.loads(data.to_json(orient='records'))
    records = [{'data': json.dumps(restructure_stream_event(context, record))} for record in records]
    context.logger.info(f'Logging {len(records)} records, Record example: {records[0]}')
    container, stream_path = split_path(stream_path)
    # batch
    step = 10
    for idx in range(0, len(records), step):
        response = context.v3io_client.put_records(container=container,
                                                   path=stream_path, 
                                                   records=records[idx:idx+step])

In [73]:
def get_data_parquet(context, table, files_to_select=1):
    mpath = [os.path.join(table, file) for file in os.listdir(table) if file.endswith(('parquet', 'pq'))]
    files_by_updated = sorted(mpath, key=os.path.getmtime, reverse=False)
    context.logger.debug_with('Input', input_files=files_by_updated[:files_to_select])
    dfs = pd.concat([pd.read_parquet(file) for file in files_by_updated[:files_to_select]])
    return dfs

In [74]:
def init_context(context):
    
    # How many batches to create? (-1 will run forever)
    batches_to_generate = int(os.getenv('BATCHES_TO_GENERATE', 20))
    setattr(context, 'batches_to_generate', batches_to_generate)
    setattr(context, 'batches_generated', 0)
    
    # Set env vars
    setattr(context, 'metrics_table', os.environ['METRICS_TABLE'])
    setattr(context, 'predictions_table', os.environ['PREDICTIONS_TABLE'])
    setattr(context, 'output_stream', os.environ['OUTPUT_STREAM'])
    setattr(context, 'timestamp_col', os.getenv('timestamp_col', 'when'))
    setattr(context, 'orig_timestamp_col', os.getenv('orig_timestamp_col', 'timestamp'))
    
    v3io_client = v3io.dataplane.Client(logger_verbosity='DEBUG', transport_verbosity='DEBUG')
#     v3io_client.stream.create(container='users', stream_path='/orz/mlrun-demos/demos/network-operations/streaming/labeled_stream', shard_count=1)
    setattr(context, 'v3io_client', v3io_client)
    create_stream(context, context.output_stream)
    
    setattr(context, 'label_col', os.environ['label_col'])
    setattr(context, 'prediction_col', os.environ['prediction_col'])

In [75]:
def handler(context, event):
    
    # Limit the number of generated batches to save cluster resources
    # for people forgetting the demo running
    if (context.batches_to_generate == -1) or (context.batches_generated <= context.batches_to_generate):
        
        metrics = get_data_parquet(context, context.metrics_table, 2).loc[:, context.label_col].astype('int')
        metrics.index.names = list([name if name != context.orig_timestamp_col else context.timestamp_col for name in metrics.index.names])
        predictions = get_data_parquet(context, context.predictions_table, 2)
        context.logger.debug(f'Labeling metrics ({metrics.shape}) and predictions ({predictions.shape})')
        context.logger.debug_with('Indexes', metrics_index=metrics.index.names, predictions_index=predictions.index.names)

        print('metrics')
        print(metrics.head())
        print(type(metrics))
        metrics = pd.DataFrame(metrics)
        print('change')
        print(type(metrics))
        print(metrics.head())
        print(metrics.index.names)

        full_df = pd.merge(left=predictions, right=metrics, left_on=metrics.index.names, how='left', right_index=True)
        full_df = full_df.reset_index()
        context.logger.info(f'Fully labeled batch size is {full_df.shape}')
        context.logger.info(f'Indexes: {list(full_df.index.names)}')
        context.logger.info(f'Columns: {full_df.columns}')
        context.logger.info_with('sample', full_df=full_df.head(1))    
        push_to_stream(context, context.output_stream, full_df)
        
        # Update batches count
        context.batches_generated += 1

In [12]:
# nuclio: end-code

## Test

In [13]:
init_context(context)

Python> 2021-10-03 14:49:45,141 [info] Creating stream in Container: users & Path /dani/test/demos/network-operations/streaming/labeled_stream
2021-10-03 14:49:45,142 [debug] Tx: {'connection_idx': 0, 'method': 'POST', 'path': '/users/dani/test/demos/network-operations/streaming/labeled_stream/', 'headers': {'X-v3io-function': 'CreateStream', 'X-v3io-session-key': '5b4a006e-d1e7-4f0b-8101-3e35bba895e7', 'Content-Type': 'application/json'}, 'body': '{"ShardCount":1,"RetentionPeriodHours":24}'}
2021-10-03 14:49:45,143 [debug] Rx: {'connection_idx': 0, 'status_code': 204, 'body': b''}


In [None]:
event = nuclio.Event(body='')
out = handler(context, event)
out

## Stream test

In [48]:
from v3io.dataplane import Client
from pprint import pprint

In [49]:
v3io_client = Client()

In [50]:
# v3io_client.delete_stream(container='users', path='/admin/demos/network-operations/streaming/labeled_stream')

In [51]:
def print_stream(path, shard='0', seek_type='EARLIEST', last=100):
    # seek the shard to the first record in it
    container, stream_path = split_path(path)
    shard_path = os.path.join(stream_path, shard)
    response = v3io_client.seek_shard(container=container,
                                      path=shard_path, 
                                      seek_type=seek_type)
    response.raise_for_status()

    # get records, starting from the location we got from seek
    response = v3io_client.get_records(container=container,
                                       path=shard_path, 
                                       location=response.output.location)
    response.raise_for_status()
    
    models = ['pagehinkley', 'eddm', 'ddm']
    result_record = response.output.records
    records = [json.loads(record.data) for record in result_record[:last]]
    pprint(records)

In [76]:
print_stream(context.output_stream, seek_type='EARLIEST', last=2)

[{'class': 'RandomForestClassifier',
  'hostname': 'jupyter-dani-86575fbc89-k9xvr',
  'model': 'netops_predictor_v1',
  'request': {'instances': [{'company': 'Cisneros__Fuentes_and_Nelson',
                             'cpu_utilization': 64.29,
                             'cpu_utilization_is_error': False,
                             'data_center': 'Vincent_Roads',
                             'device': '0698161311745',
                             'is_error_x': False,
                             'is_error_y': None,
                             'latency': 0.0,
                             'latency_is_error': False,
                             'packet_loss': 1.0,
                             'packet_loss_is_error': False,
                             'throughput': 226.29,
                             'throughput_is_error': False}]},
  'resp': [0],
  'when': 1633335347249,
  'worker': None},
 {'class': 'RandomForestClassifier',
  'hostname': 'jupyter-dani-86575fbc89-k9xvr',
  'model'

## Deploy

In [53]:
from mlrun import code_to_function, mount_v3io

In [60]:
fn = code_to_function('labeled-stream-creator',
                      kind='nuclio',
                      project='network-operations', image='mlrun/ml-models')
fn.spec.build.commands = ['pip install v3io']
fn.apply(mount_v3io())
fn.add_trigger('cron', nuclio.triggers.CronTrigger(interval='1m'))
fn.set_envs({'METRICS_TABLE' : fs_streaming_path + '/metrics',
             'PREDICTIONS_TABLE' : fs_streaming_path+'/predictions',
             'OUTPUT_STREAM' : streaming_path+'/labeled_stream',
             'prediction_col' : 'predictions',
             'label_col' : 'is_error',
             'output_stream_shards' : '1',
             'BATCHES_TO_GENERATE' : '20'})

<mlrun.runtimes.function.RemoteRuntime at 0x7f5a6d669250>

In [61]:
fn.save()
fn.export('../src/labeled_stream_creator.yaml')

> 2021-10-04 12:35:59,631 [info] function spec saved to path: ../src/labeled_stream_creator.yaml


<mlrun.runtimes.function.RemoteRuntime at 0x7f5a6d669250>

In [62]:
fn.deploy(project='network-operations')

> 2021-10-04 12:35:59,638 [info] Starting remote function deploy
2021-10-04 12:35:59  (info) Deploying function
2021-10-04 12:35:59  (info) Building
2021-10-04 12:36:00  (info) Staging files and preparing base images
2021-10-04 12:36:00  (info) Building processor image
2021-10-04 12:36:01  (info) Build complete
2021-10-04 12:36:07  (info) Function deploy complete
> 2021-10-04 12:36:08,807 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-network-operations-labeled-stream-creator.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['default-tenant.app.dev8.lab.iguazeng.com:30367']}


'http://default-tenant.app.dev8.lab.iguazeng.com:30367'

In [63]:
fn.invoke('')

> 2021-10-04 12:36:08,847 [info] invoking function: {'method': 'GET', 'path': 'http://nuclio-network-operations-labeled-stream-creator.default-tenant.svc.cluster.local:8080/'}


b''