---
# Demo Use Case - Ingest Live Video Streams (Part 1)

A data scientist is part of a team developing an advanced driver-assistance system (ADAS).
They continuously collect video, lidar, and other sensor data from their fleet of test vehicles.
The data scientist wants to test a new object detection model on video collected today.

The purpose of this Jupyter notebook is to simulate the ingestion of real-time video into Streaming Data Platform (SDP).

To avoid the need for a set of live cameras for this demo, we play back video from a series of PNG files on disk
and write each video frame to SDP.
These videos are part of the the [KITTI Vision Benchmark Suite](http://www.cvlibs.net/datasets/kitti/raw_data.php).

- We simulate up to 8 simultaneous cameras from different vehicles.
- Each video frame is 1242 x 374 pixels, RGB color, PNG format.
- PNG image sizes average 821 KB with the largest at 964 KB.
- Each camera records at 0.5 frames per second.
- The total ingest rate is 4.3 MB/sec.

---

### Prerequisites

1. [install_dependencies.ipynb](install_dependencies.ipynb).
2. [download_kitti.ipynb](download_kitti.ipynb).

### How to use this Notebook
1. Click *Kernel* -> *Restart Kernel and Run All Cells*.

### Import dependencies

In [None]:
%load_ext autoreload
%autoreload 2

import grpc
import imp
import pravega.grpc_gateway as pravega
import pravega.video as video
from pravega.video import UnindexedStream, OutputStream, opencv_image_to_mpl
import cv2
from matplotlib import pyplot as plt
import glob
import base64
import time
import json
import itertools
import os
import numpy as np
import pandas as pd

imp.reload(video);

### Define Pravega stream parameters

In [None]:
#gateway = os.environ['PRAVEGA_GRPC_GATEWAY_ADDRESS']
#gateway = '10.246.27.131:54672'
gateway = 'pravega-grpc-gateway.examples.frightful-four.eaglemonk.intranet.nautilus-platform-dev.com:80'
scope = 'examples'
stream = 'object-detector-input-video'

### Initialize connection to Pravega GRPC Gateway

In [None]:
pravega_channel = grpc.insecure_channel(gateway)
pravega_client = pravega.grpc.PravegaGatewayStub(pravega_channel)

In [None]:
#pravega_client.CreateScope(pravega.pb.CreateScopeRequest(scope=scope))

### Create Pravega stream

In [None]:
output_stream = OutputStream(pravega_client, scope, stream)
# output_stream.delete_stream()
output_stream.create_stream()

In [None]:
#output_stream.truncate_stream()

### Ingest Kitti data (multiple cameras)

In [None]:
prefix = '../../data/kitti/'
camera_filespecs = [
    prefix + '2011_09_26/2011_09_26_drive_0005_sync/image_02/data/*.jpg',
    prefix + '2011_09_26/2011_09_26_drive_0009_sync/image_02/data/*.jpg',
    prefix + '2011_09_26/2011_09_26_drive_0011_sync/image_02/data/*.jpg',
    prefix + '2011_09_26/2011_09_26_drive_0051_sync/image_02/data/*.jpg',
#     '../../../kitti/2011_09_26/2011_09_26_drive_0009_sync/image_02/data/*.png',
#     '../../../kitti/2011_09_26/2011_09_26_drive_0011_sync/image_02/data/*.png',
#     '../../../kitti/2011_09_26/2011_09_26_drive_0014_sync/image_02/data/*.png',
#     '../../../kitti/2011_09_26/2011_09_26_drive_0051_sync/image_02/data/*.png',
#     '../../../kitti/2011_09_26/2011_09_26_drive_0059_sync/image_02/data/*.png',
#     '../../../kitti/2011_09_26/2011_09_26_drive_0084_sync/image_02/data/*.png',
#     '../../../kitti/2011_09_29/2011_09_29_drive_0071_sync/image_02/data/*.png',
]

In [None]:
camera_files = [sorted(glob.glob(f)) for f in camera_filespecs]
images_per_camera = [len(f) for f in camera_files]
images_per_camera

In [None]:
file_sizes = [os.path.getsize(f) for c in camera_files for f in c]
pd.Series(file_sizes).describe()

In [None]:
frame_iterators = [itertools.cycle(f) for f in camera_files]

In [None]:
def get_image_data(filename, size=None):
    if size is None:
        with open(filename, 'rb') as f:
            png_bytes = f.read()
    else:    
        img = cv2.imread(filename, cv2.IMREAD_UNCHANGED)
        img = cv2.resize(img, size, interpolation=cv2.INTER_NEAREST)
        success, png_array = cv2.imencode('.png', img)
        png_bytes = png_array.tobytes()
    return png_bytes

In [None]:
# Define frames per second per camera
fps = 10
fps

In [None]:
def video_frame_write_generator(fps=fps):
    frame_number = 0
    t0_ms = time.time() * 1000.0
    while True:
        timestamp = int(frame_number / (fps / 1000.0) + t0_ms)
        sleep_sec = timestamp / 1000.0 - time.time()
        if sleep_sec > 0.0:
            time.sleep(sleep_sec)
        elif sleep_sec < -5.0:
            logging.warn(f"can't keep up with real-time. sleep_sec={sleep_sec}")
        for camera, frame_iterator in enumerate(frame_iterators):
            filename = next(frame_iterator)
            png_bytes = get_image_data(filename)
            event_dict = dict(
                camera=camera,
                data=base64.b64encode(png_bytes).decode(encoding='UTF-8'),
                frameNumber=frame_number,
                ssrc=0,
                timestamp=timestamp,
                )
            event_json = json.dumps(event_dict)
            event_bytes = event_json.encode(encoding='UTF-8')
            event_to_write = pravega.pb.WriteEventsRequest(
                scope=scope,
                stream=stream,
                event=event_bytes,
                routing_key=str(camera),
            )
            yield event_to_write
        frame_number += 1

In [None]:
events_to_write = video_frame_write_generator()
output_stream.write_events(events_to_write)