# Stream to Parquet

In [5]:
import nuclio

In [6]:
# Define function spec
%nuclio config kind = "nuclio"
%nuclio config spec.build.baseImage = "mlrun/ml-models"

%nuclio: setting kind to 'nuclio'
%nuclio: setting spec.build.baseImage to 'mlrun/ml-models'


In [7]:
# nuclio: start-code

In [8]:
import os
import pandas as pd
import numpy as np
import json
import datetime
import mlrun

In [9]:
def record_to_features(record):
    features = record['request']['instances'][0]
    timestamp = record['when']
    prediction = record['resp']
    
    record = {'timestamp': timestamp,
              **features,
              'predictions': prediction}
    
    return record

In [29]:
def init_context(context):
    setattr(context, 'batch', [])
    setattr(context, 'window', int(os.getenv('window', 10)))    
    setattr(context, 'save_to', os.getenv('save_to', '/bigdata/inference_pq/'))
    os.makedirs(context.save_to, exist_ok=True)
    
    mlrun.mlconf.dbpath = mlrun.mlconf.dbpath or 'http://mlrun-api:8080'
    artifact_path = os.getenv('artifact_path', None)
    if artifact_path:
        mlrun.mlconf.artifact_path = artifact_path
    if 'hub_url' in os.environ:
        mlrun.mlconf.hub_url = os.environ['hub_url']
    virtual_drift_fn = mlrun.import_function('hub://virtual_drift')
    virtual_drift_fn.apply(mlrun.mount_v3io(mount_path=os.getenv('mount_path', '~/'), remote=os.getenv('mount_remote', '/User')))
    setattr(context, 'virtual_drift_fn', virtual_drift_fn)
    
    predictions_col = os.getenv('predictions', None) 
    label_col = os.getenv('label_col', None)
    setattr(context, 'base_dataset', os.getenv('base_dataset', ''))
    setattr(context, 'indexes', json.loads(os.environ.get('indexes', '[]')))
    setattr(context, 'predictions_col', predictions_col)
    setattr(context, 'label_col', label_col)
    setattr(context, 'results_tsdb_container', os.getenv('results_tsdb_container', None))
    setattr(context, 'results_tsdb_table', os.getenv('results_tsdb_table', None))

In [30]:
def handler(context, event):
    
    context.logger.info(f'Adding {event.body}')
    context.batch.append(record_to_features(json.loads(event.body)))
    
    if len(context.batch) > context.window:
        context.logger.info(context.batch[:1])
        context.logger.info(context.indexes)
        df = pd.DataFrame(context.batch)
        context.logger.info(f'df example: {df.head(1)}')
        if context.indexes:
            df = df.set_index(context.indexes)
        df_path = os.path.join(context.save_to, f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}.pq")
        df.to_parquet(df_path)

        task = mlrun.NewTask(name='drift_magnitude',
                        handler='drift_magnitude',
                        params={'label_col': context.label_col,
                                'prediction_col': context.predictions_col,
                                'results_tsdb_container': context.results_tsdb_container,
                                'results_tsdb_table': context.results_tsdb_table},
                        inputs={'t': context.base_dataset,
                                'u': df_path},
                        artifact_path=mlrun.mlconf.artifact_path)
        
        context.virtual_drift_fn.run(task,
                                     watch=False)
        
        context.batch = []

In [31]:
# nuclio: end-code

## Save to function yaml

In [44]:
import os
import json
from nuclio.triggers import V3IOStreamTrigger
from mlrun import mlconf, code_to_function, mount_v3io

mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

# create job function object from notebook code
fn = code_to_function("stream_to_parquet")
fn.spec.min_replicas = 1
fn.spec.max_replicas = 1

# add metadata (for templates and reuse)
fn.spec.default_handler = "handler"
fn.spec.description = "Saves a stream to Parquet and can lunch drift detection task on it"
fn.metadata.categories = ["ml", "serve"]
fn.metadata.labels = {"author": "orz"}
fn.export("function.yaml")

fn.add_trigger('labeled_stream', V3IOStreamTrigger(url=f'https://{os.environ["V3IO_API"]}/users/orz/mlrun-demos/demos/network-operations/streaming/labeled_stream@s2p1', seekTo='latest'))
fn.apply(mount_v3io())
projdir = '/User/mlrun-demos/demos/network-operations/'
fn.set_envs({'window': 10000,
             'indexes': json.dumps(['timestamp', 'company', 'data_center', 'device']),
             'save_to': os.path.join(projdir, 'streaming', 'inference_pq'),
             'prediction_col': 'prediction',
             'label_col': 'is_error',
             'base_dataset': '/User/mlrun-demos/demos/network-operations/artifacts/test_set_preds.parquet',
             'results_tsdb_container': 'users',
             'results_tsdb_table': 'orz/mlrun-demos/demos/network-operations/streaming/s2p_tsdb',
             'mount_path': '/users/orz',
             'mount_remote': '/User',
             'artifact_path': '/User/mlrun-demos/demos/network-operations/streaming/drift_magnitude'})
fn.deploy(project='network-operations')

> 2020-12-24 14:34:39,340 [info] function spec saved to path: function.yaml
> 2020-12-24 14:34:39,350 [info] Starting remote function deploy
2020-12-24 14:34:39  (info) Deploying function
2020-12-24 14:34:39  (info) Building
2020-12-24 14:34:39  (info) Staging files and preparing base images
2020-12-24 14:34:39  (info) Building processor image
2020-12-24 14:34:40  (info) Build complete
2020-12-24 14:34:50  (info) Function deploy complete
> 2020-12-24 14:34:51,915 [info] function deployed, address=default-tenant.app.lewpwntlsyrb.iguazio-cd1.com:32225


'http://default-tenant.app.lewpwntlsyrb.iguazio-cd1.com:32225'