# 6. Stream to Parquet
--------------------------------------------------------------------


Store the a stream to a set of parquet files. 
The purpose is to store the input and inference streams to a parquet log.

![Model deployment with streaming Real-time operational Pipeline](../../assets/images/model-deployment-with-streaming.png)

Each batch size (default 1024 records) are stored in parquet file partitioned by the event time (year, month, day, hour). </br> 
The default outputs are to </br> `/User/examples/model-deployment-with-streaming/data/events-pq` </br> `/User/examples/model-deployment-with-streaming/data/inference-pq`

## Initialize

Load the project

In [1]:
from mlrun import load_project
from os import path

project_path = path.abspath('conf')
project = load_project(project_path)

## Create and Test a Local Function 

[Nuclio](https://nuclio.io/) is a high-performance open-source and managed serverless framework, which is available as a predefined tenant-wide platform service (`nuclio`).
The demo uses Nuclio to create and deploy serverless functions.
Therefore, you need to import the Nuclio package and configure Nuclio for your project.

The platform's Jupyter Notebook service preinstalls the [nuclio-jupyter SDK](https://github.com/nuclio/nuclio-jupyter/blob/master/README.md) for creating and deploying Nuclio functions with Python and Jupyter Notebook.
The tutorial uses the Nuclio magic commands and annotation comments of this SDK to automate function code generation.
The magic commands are initialized when you import the `nuclio` package.<br>
The `%nuclio` magic commands are used to run Nuclio commands from Jupyter notebooks (`%nuclio <Nuclio command>`).
You can also use `%%nuclio` at the start of a cell to identify the entire cell as containing Nuclio code.
The magic commands are initialized when you import the `nuclio` package.<br>
The `# nuclio: start-code`, `# nuclio: end-code`, and `# nuclio: ignore` section-marker annotations notify Nuclio of the beginning or end of code sections.
Nuclio ignores all notebook code before a `# nuclio: start-code` marker or after an `# nuclio: end-code` marker.
Nuclio translates all other notebook code sections into function code, except for sections that are marked with the `# nuclio: ignore` marker.

### Import Nuclio

The following code imports the `nuclio` Python package.

In [2]:
import nuclio

#### Configure Nuclio

The following code uses the `# nuclio: start-code` marker to instruct Nuclio to start processing code only from this location, and then performs basic Nuclio function configuration &mdash; defining the name of the function's container image (`mlrun/ml-models`), the function type (`nuclio`), and some additional package installation commands.

> **Note:** You can add code to define function dependencies and perform additional configuration after the `# nuclio: start-code` marker.

In [3]:
%%nuclio cmd -c

python -m pip install pandas
python -m pip install pyarrow

In [4]:
%%nuclio config
spec.build.baseImage = "mlrun/ml-models"
spec.readinessTimeoutSeconds = 200
kind = "nuclio"

%nuclio: setting spec.build.baseImage to 'mlrun/ml-models'
%nuclio: setting spec.readinessTimeoutSeconds to 200
%nuclio: setting kind to 'nuclio'


In [5]:
# nuclio: start-code

In [6]:
import os
import pandas as pd
import json
import datetime
from datetime import datetime

In [7]:
def init_context(context):
    setattr(context, 'batch', [])
    setattr(context, 'batch_count', 0)
    setattr(context, 'batch_size', int(os.getenv('BATCH_SIZE', 1024)))

    setattr(context, 'timestamp_key', os.getenv('TS_KEY'))
    setattr(context, 'timestamp_format', os.getenv('TS_FORMAT', '%Y-%m-%d %H:%M:%S.%f'))

    setattr(context, 'pq_partitions', ['pq_year', 'pq_month', 'pq_day', 'pq_hour'])

    setattr(context, 'target_path', os.getenv('TARGET_PATH'))
    os.makedirs(context.target_path, exist_ok=True)
    pass

In [8]:
def handler(context, event):
    if type(event.body) is dict:
        event_dict = event.body
    else:
        event_dict = json.loads(event.body)

    context.logger.info_with('Got invoked',
                             trigger_kind=event.trigger.kind,
                             event_body=event_dict)

    event_with_time_partitions = add_time_partition_attributes(context, event_dict)

    # add the incoming event to the current batch
    context.batch.append(event_with_time_partitions)

    # check if batch size reached
    if context.batch_size == len(context.batch):
        context.logger.info_with('Writing batch',
                                 batch_count=context.batch_count,
                                 batch_size=len(context.batch))
        write_batch(context)
        context.logger.info_with('Written batch',
                                 batch_count=context.batch_count,
                                 batch_size=len(context.batch))
    pass


def add_time_partition_attributes(context, event):
    if hasattr(context, 'timestamp_key') and event.get(context.timestamp_key) is not None:
        # parse the event time
        dt_object = datetime.strptime(event[context.timestamp_key], context.timestamp_format)
    else:
        # if event time is missing or not configured, use current datetime
        dt_object = datetime.now()

    # add the partition attributes
    event['pq_year'] = dt_object.strftime('%Y')
    event['pq_month'] = dt_object.strftime('%m')
    event['pq_day'] = dt_object.strftime('%d')
    event['pq_hour'] = dt_object.strftime('%H')

    return event


def write_batch(context):
    df = pd.DataFrame.from_records(context.batch)
    df.to_parquet(path=context.target_path, partition_cols=context.pq_partitions)
    # post write cleanup
    context.batch = []
    pass

The following cell uses the `# nuclio: end-code` marker to mark the end of a Nuclio code section and instruct Nuclio to stop parsing the notebook at this point.<br>
> **IMPORTANT:** Do not remove the end-code cell.

In [9]:
# nuclio: end-code

## Test Locally

In [10]:
# set few parameters via environment variables0
target_path = path.join(os.sep, 'v3io', project.params.get('CONTAINER'), project.params.get('EVENTS_PARQUET_TARGET_PATH'))
envs = {'TARGET_PATH' : target_path,
        'BATCH_SIZE': 1024,
        'TS_KEY': 'event_time',
        'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f'}

for key, value in envs.items():
    os.environ[key] = str(value)
init_context(context)
#reduce the batch size to 10
context.batch_size = 10


In [11]:
# trigger with 9 events:

nine_events = [b'{"user_id" : 1 , "event_type": "spin", "event_time": "2020-02-02 12:20:22.333332"}',
              b'{"user_id" : 2 , "event_type": "spin", "event_time": "2020-02-02 12:20:23.333332"}',
              b'{"user_id" : 3 , "event_type": "spin", "event_time": "2020-02-02 12:20:24.333332"}',
              b'{"user_id" : 4 , "event_type": "spin", "event_time": "2020-02-02 12:20:25.333332"}',
              b'{"user_id" : 5 , "event_type": "spin", "event_time": "2020-02-02 12:20:26.333332"}',
              b'{"user_id" : 6 , "event_type": "spin", "event_time": "2020-02-02 12:20:27.333332"}',
              b'{"user_id" : 7 , "event_type": "spin", "event_time": "2020-02-02 12:20:28.333332"}',
              b'{"user_id" : 8 , "event_type": "spin", "event_time": "2020-02-02 12:20:29.333332"}',
              b'{"user_id" : 9 , "event_type": "spin", "event_time": "2020-02-02 12:20:30.333332"}']

for e in nine_events:
    event = nuclio.Event(body=e)
    handler(context, event)

Python> 2020-10-05 10:05:51,196 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 1, 'event_type': 'spin', 'event_time': '2020-02-02 12:20:22.333332'}}
Python> 2020-10-05 10:05:51,198 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 2, 'event_type': 'spin', 'event_time': '2020-02-02 12:20:23.333332'}}
Python> 2020-10-05 10:05:51,198 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 3, 'event_type': 'spin', 'event_time': '2020-02-02 12:20:24.333332'}}
Python> 2020-10-05 10:05:51,199 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 4, 'event_type': 'spin', 'event_time': '2020-02-02 12:20:25.333332'}}
Python> 2020-10-05 10:05:51,199 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 5, 'event_type': 'spin', 'event_time': '2020-02-02 12:20:26.333332'}}
Python> 2020-10-05 10:05:51,200 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 6, 'event_type': 'spin', 'event_time': '2020-02-02 12:20

In [14]:
# check whether a parquet has been created
!ls -l {target_path}

ls: cannot access '/User/examples/model-deployment-with-streaming/data/events-pq': No such file or directory


In [15]:
# trigger the tenth event which should trigger the creation of the parquet file.
tenth_event = b'{"user_id" : 10 , "event_type": "spin", "event_time": "2020-02-02 12:20:31.333332"}'
event = nuclio.Event(body=tenth_event)
handler(context, event)

Python> 2020-10-05 10:06:39,411 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 10, 'event_type': 'spin', 'event_time': '2020-02-02 12:20:31.333332'}}
Python> 2020-10-05 10:06:39,412 [info] Writing batch: {'batch_count': 0, 'batch_size': 10}
Python> 2020-10-05 10:06:39,500 [info] Written batch: {'batch_count': 0, 'batch_size': 0}


In [16]:
# check weather a parquet has been created
!ls -l {target_path}

total 0
drwxr-xr-x 2 51 nogroup 0 Oct  5 10:06 'pq_year=2020'


In [17]:
# cleanup
!rm -rf {target_path}/

## Nuclio Deploy

Nuclio leverages consumer groups. When one or more Nuclio replicas join a consumer group, each replica receives its equal share of the shards, based on the number of replicas that are defined in the function.

We set up the input stream URL below. A consumer-group URL is in the form of `http://v3io-webapi:8081/<container name>/<stream path>@<consumer group name>`. In this case we use `WEB_API_USERS` for URL prefix `http://v3io-webapi:8081/<container name>` and a consumer group named **`stream2pq`**.

For more information, refer to the [Nuclio v3iostream trigger reference documentation](https://nuclio.io/docs/latest/reference/triggers/v3iostream/).

In [18]:
def get_stream_url(name):
    input_stream = project.params.get('STREAM_CONFIGS').get(name)
    input_stream_path =  input_stream.get('path')
    print(f'Input {name} path: {input_stream_path}')
    WEB_API_USERS = project.params.get('WEB_API_USERS')
    input_stream_url = path.join(WEB_API_USERS, input_stream_path) + "@stream2pq"
    print(f'Input {name} URL: {input_stream_url}')
    return input_stream_url

events_stream_url = get_stream_url('incoming-events-stream')
inference_stream_url = get_stream_url('inference-stream')

Input incoming-events-stream path: michaelk/examples/model-deployment-with-streaming/data/incoming-events-stream
Input incoming-events-stream URL: http://v3io-webapi:8081/users/michaelk/examples/model-deployment-with-streaming/data/incoming-events-stream@stream2pq
Input inference-stream path: michaelk/examples/model-deployment-with-streaming/data/inference-stream
Input inference-stream URL: http://v3io-webapi:8081/users/michaelk/examples/model-deployment-with-streaming/data/inference-stream@stream2pq


Define the `target_path`s for the parquet files

In [19]:
events_target_path = path.join(os.sep, 'v3io', project.params.get('CONTAINER'), project.params.get('EVENTS_PARQUET_TARGET_PATH'))
inference_target_path = path.join(os.sep, 'v3io', project.params.get('CONTAINER'), project.params.get('INFERENCE_PARQUET_TARGET_PATH'))
print(f'Events target path: {events_target_path}\nInference target path: {inference_target_path}')

Events target path: /User/examples/model-deployment-with-streaming/data/events-pq
Inference target path: /User/examples/model-deployment-with-streaming/data/inference-pq


## Environment Variables

Set a dictionary for initializing the environment variables used by each of the functions

In [20]:
events_envs = {'TARGET_PATH' : events_target_path,
        'BATCH_SIZE': 1024,
        'TS_KEY': 'event_time',
        'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f'}

inference_envs = {'TARGET_PATH' : inference_target_path,
        'BATCH_SIZE': 1024,
        'TS_KEY': 'when',
        'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f'}

### Convert code to function

We use MLRun `code_to_function` in order to convert the python code to a Nuclio function. We then set the relevant enrivonment variables and streaming trigger.

In [21]:
from mlrun import code_to_function, mount_v3io

gen_func = code_to_function(name='stream2pq', kind = 'nuclio')
project.set_function(gen_func)

<mlrun.runtimes.function.RemoteRuntime at 0x7f195e8b3350>

In [22]:
project.save()

### Configure function instances
Here we configure a function instances for each of the streams we want to use `stream to parquet` upon.

In [23]:
def configure_stream2pq_function(name, envs, input_stream_url):
    stream2pq = project.func('stream2pq').copy()
    stream2pq.metadata.name = name
    stream2pq.set_envs(envs)
    stream2pq.add_trigger('stream2pq',
                          nuclio.triggers.V3IOStreamTrigger(url=input_stream_url,
                                                            access_key=os.getenv('V3IO_ACCESS_KEY'),
                                                            maxWorkers=10))
    # Configure a mount on the nuclio function from '/User' to our home directory '~/'.
    stream2pq.apply(mount_v3io())
    return stream2pq

events_s2pq = configure_stream2pq_function('events-s2pq', events_envs, events_stream_url)
inference_s2pq = configure_stream2pq_function('inference-s2pq', inference_envs, inference_stream_url)

### Deploy

In [24]:
events_s2pq.deploy()

> 2020-10-05 10:08:13,238 [info] deploy started
[nuclio] 2020-10-05 10:08:15,342 (info) Build complete
[nuclio] 2020-10-05 10:10:34,736 (info) Function deploy complete
[nuclio] 2020-10-05 10:10:34,742 done updating model-deployment-with-streaming-michaelk-events-s2pq, function address: 192.168.226.12:31566


'http://192.168.226.12:31566'

In [25]:
inference_s2pq.deploy()

> 2020-10-05 10:11:15,842 [info] deploy started
[nuclio] 2020-10-05 10:11:15,922 (info) Building processor image
[nuclio] 2020-10-05 10:11:17,943 (info) Build complete
[nuclio] 2020-10-05 10:13:33,287 (info) Function deploy complete
[nuclio] 2020-10-05 10:13:33,293 done updating model-deployment-with-streaming-michaelk-inference-s2pq, function address: 192.168.226.12:32270


'http://192.168.226.12:32270'

## Done