Read sensor data from a Pravega stream written by Pravega Sensor Collector.
Then analyze the gaps in the event timestamps.

## Install GRPC

In [None]:
!pip install grpcio

## Install Pravega GRPC Gateway Client

In [None]:
!pip uninstall -y pravega-grpc-gateway-client ; \
    rm -rf /tmp/pravega-grpc-gateway ; \
    git clone https://github.com/pravega/pravega-grpc-gateway /tmp/pravega-grpc-gateway && \
    cd /tmp/pravega-grpc-gateway && \
    git checkout master && \
    pip install pravega-grpc-gateway/src/main/python

In [None]:
import grpc
import imp
import pravega.grpc_gateway as pravega
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
from itertools import islice

In [None]:
class StreamBase():
    def __init__(self, pravega_client, scope, stream):
        self.pravega_client = pravega_client
        self.scope = scope
        self.stream = stream

    def create_stream(self, min_num_segments=1):
        return self.pravega_client.CreateStream(pravega.pb.CreateStreamRequest(
            scope=self.scope,
            stream=self.stream,
            scaling_policy=pravega.pb.ScalingPolicy(min_num_segments=min_num_segments),
        ))

    def delete_stream(self):
        return self.pravega_client.DeleteStream(pravega.pb.DeleteStreamRequest(
            scope=self.scope,
            stream=self.stream,
        ))

    def get_stream_info(self):
        return self.pravega_client.GetStreamInfo(pravega.pb.GetStreamInfoRequest(
            scope=self.scope,
            stream=self.stream,
        ))

    def truncate_stream(self):
        return self.pravega_client.TruncateStream(pravega.pb.TruncateStreamRequest(
            scope=self.scope,
            stream=self.stream,
            stream_cut=self.get_stream_info().tail_stream_cut,
        ))

    def write_events(self, events_to_write):
        return self.pravega_client.WriteEvents(events_to_write)

In [None]:
class UnindexedStream(StreamBase):
    def __init__(self, pravega_client, scope, stream):
        super(UnindexedStream, self).__init__(pravega_client, scope, stream)

    def read_events(self, from_stream_cut=None, to_stream_cut=None, stop_at_tail=False):
        """Read events from a Pravega stream. Returned events will be byte arrays."""
        if stop_at_tail:
            to_stream_cut = self.get_stream_info().tail_stream_cut
        read_events_request = pravega.pb.ReadEventsRequest(
            scope=self.scope,
            stream=self.stream,
            from_stream_cut=from_stream_cut,
            to_stream_cut=to_stream_cut,
        )
        return self.pravega_client.ReadEvents(read_events_request)

In [None]:
gateway = os.environ['PRAVEGA_GRPC_GATEWAY_ADDRESS']
scope = 'edge'
stream = 'pravega-sensor-collector-test3'

In [None]:
pravega_channel = grpc.insecure_channel(gateway, options=[
        ('grpc.max_receive_message_length', 9*1024*1024),
    ])
pravega_client = pravega.grpc.PravegaGatewayStub(pravega_channel)

In [None]:
unindexed_stream = UnindexedStream(pravega_client, scope=scope, stream=stream)

In [None]:
# raw_events = list(islice(unindexed_stream.read_events(stop_at_tail=True), 1000000))
# del raw_events
# len(raw_events)
# events = [json.loads(e.event) for e in raw_events]
# del raw_events
# timestamps = [t for e in events for t in e['TimestampsNanos']]
# del events

In [None]:
%%time
timestamps = [t for e in islice(unindexed_stream.read_events(stop_at_tail=True), 8*60*60)
                for t in json.loads(e.event)['TimestampsNanos']]

In [None]:
len(timestamps)

In [None]:
# Skip first 1500 records
timestamps = timestamps[1500:]

In [None]:
total_hours = len(timestamps) / 1600 / 60 / 60
total_hours

In [None]:
%%time
timestamp_series = pd.Series(timestamps)
del timestamps

In [None]:
gaps = timestamp_series.diff()[1:]
del timestamp_series

In [None]:
# Skip first 1500 records and convert to milliseconds.
gaps_ms = gaps[1500:] * 1e-6
del gaps

In [None]:
gaps_ms.describe()

In [None]:
ax = gaps_ms.plot(figsize=(12,4))
ax.set_xlabel("record number")
ax.set_ylabel("gap (milliseconds)");

In [None]:
# ax = gaps_ms.plot(figsize=(12,4), xlim=[2482650,2482680], style='x-')
# ax.set_xlabel("record number")
# ax.set_ylabel("gap (milliseconds)");

In [None]:
ax = gaps_ms.hist(bins=range(0,80), log=True, figsize=(12,4))
ax.set_xlabel("gap (milliseconds)")
ax.set_ylabel("count");

In [None]:
np.mean(gaps_ms < 1) * 100

In [None]:
np.mean(gaps_ms < 2) * 100

In [None]:
huge_gaps = gaps_ms[gaps_ms > 10]
huge_gaps

In [None]:
np.diff(huge_gaps.index.values) / 1600 / 60

In [None]:
fft = pd.Series(np.fft.fft(gaps_ms))

In [None]:
fft

In [None]:
fft.plot(logx=True)