# Stream to Parquet Logger
--------------------------------------------------------------------

In [1]:
import nuclio

In [2]:
%%nuclio cmd -c

python -m pip install pandas
python -m pip install pyarrow

In [3]:
%%nuclio config
spec.build.baseImage = "mlrun/ml-models"
spec.readinessTimeoutSeconds = 200
kind = "nuclio"

%nuclio: setting spec.build.baseImage to 'mlrun/ml-models'
%nuclio: setting spec.readinessTimeoutSeconds to 200
%nuclio: setting kind to 'nuclio'


In [4]:
# nuclio: start-code

In [5]:
import os
import pandas as pd
import json
import datetime
from datetime import datetime

In [6]:
def init_context(context):
    setattr(context, 'batch', [])
    setattr(context, 'batch_size', int(os.getenv('BATCH_SIZE', 1024)))

    setattr(context, 'timestamp_key', os.getenv('TS_KEY'))
    setattr(context, 'timestamp_format', os.getenv('TS_FORMAT', '%Y-%m-%d %H:%M:%S.%f'))

    setattr(context, 'pq_partitions', ['pq_year', 'pq_month', 'pq_day', 'pq_hour'])

    setattr(context, 'target_path', os.getenv('TARGET_PATH'))
    os.makedirs(context.target_path, exist_ok=True)

    # in case of an inference stream set the names of features and predictions.
    features = os.getenv('FEATURES')
    if features is not None:
        features = features.split(',')
    setattr(context, 'features', features)

    predictions = os.getenv('PREDICTIONS')
    if predictions is not None:
        predictions = predictions.split(',')
    setattr(context, 'predictions', predictions)

    pass

In [7]:
def handler(context, event):
    if type(event.body) is dict:
        event_dict = event.body
    else:
        event_dict = json.loads(event.body)

    context.logger.info_with('Got invoked',
                             trigger_kind=event.trigger.kind,
                             event_body=event_dict)

    # for inference events
    if context.features is not None and context.predictions is not None:
        event_dict = flatten_inference_event(context, event_dict)

    event_with_time_partitions = add_time_partition_attributes(context, event_dict)

    # add the incoming event to the current batch
    context.batch.append(event_with_time_partitions)

    # check if batch size reached
    if context.batch_size == len(context.batch):
        written_records = write_batch(context)
        context.logger.info_with('Written batch',
                                 Writtent_records=written_records)
    pass


def flatten_inference_event(context, event):
    # add parsed features to the event
    feature_values = event['request']['instances'][0]
    event.update(zip(context.features, feature_values))

    # add parsed predictions to the event
    prediction_values = event['resp']
    event.update(zip(context.predictions, prediction_values))
    
    return event


def add_time_partition_attributes(context, event):
    if hasattr(context, 'timestamp_key') and event.get(context.timestamp_key) is not None:
        # parse the event time
        dt_object = datetime.strptime(event[context.timestamp_key], context.timestamp_format)
    else:
        # if event time is missing or not configured, use current datetime
        dt_object = datetime.now()

    # add the partition attributes
    event['pq_year'] = dt_object.strftime('%Y')
    event['pq_month'] = dt_object.strftime('%m')
    event['pq_day'] = dt_object.strftime('%d')
    event['pq_hour'] = dt_object.strftime('%H')

    return event


def write_batch(context):
    df = pd.DataFrame.from_records(context.batch)
    df.to_parquet(path=context.target_path, partition_cols=context.pq_partitions)
    # post write cleanup
    context.batch = []
    return len(df.index)


In [8]:
# nuclio: end-code

## Save to function yaml

In [9]:
from mlrun import code_to_function, mount_v3io

fn = code_to_function(name='stream-to-parquet-logger')

# add metadata (for templates and reuse)
fn.spec.default_handler = "handler"
fn.spec.description = "Saves a stream to Parquet log"
fn.metadata.categories = ["serve", "stream"]
fn.metadata.labels = {"author": "michaelk"}
fn.export("function.yaml")

> 2020-10-08 15:37:49,211 [info] function spec saved to path: function.yaml


<mlrun.runtimes.function.RemoteRuntime at 0x7fe98864ef90>

In [10]:
# set few parameters via environment variables0
fn.set_envs({'TARGET_PATH' : '/User/path/to/parquet/log',
             'BATCH_SIZE': 1024,
             'TS_KEY': 'event_time',
             'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f',
             'FEATURES': 'feat1,feat2,feat3', #optional for inference stream
             'PREDICTIONS': 'pred1,pred2' #optional for inference stream
             })

<mlrun.runtimes.function.RemoteRuntime at 0x7fe98864ef90>

In [11]:
fn.add_trigger('stream2pq',
                      nuclio.triggers.V3IOStreamTrigger(url='http://v3io-webapi:8081/users/path/to/stream@consumergroup',
                                                        access_key=os.getenv('V3IO_ACCESS_KEY'),
                                                        maxWorkers=10))

# Configure a mount on the nuclio function from '/User' to our home directory '~/'.
fn.apply(mount_v3io())

<mlrun.runtimes.function.RemoteRuntime at 0x7fe98864ef90>

### Deploy

In [13]:
fn.deploy(project='my-project')

> 2020-10-08 15:38:31,537 [info] deploy started
[nuclio] 2020-10-08 15:38:31,555 project name not found created new (my-project)
[nuclio] 2020-10-08 15:38:36,665 (info) Build complete
[nuclio] 2020-10-08 15:41:10,166 done creating my-project-stream-to-parquet-logger, function address: 192.168.226.12:30221


'http://192.168.226.12:30221'