# Acessing EFD aggregated data

Angelo Fausti, Simon Krughoff

August 10, 2020


## Introduction

In this notebook we show how to access data produced by the EFD aggregator. This was done as part of an internal demonstration of the EFD aggregator and used the EFD Sandox instance for this purpose.

## The EFD aggregated streams

The EFD [aggregator](https://kafka-aggregator.lsst.io) is responsible for consuming the EFD data streams and produce a new set of aggregated streams which are then converted to Parquet files, partioned by time, and stored in an object store.

To demonstrate the EFD aggregator we used the [aggregator example module](https://kafka-aggregator.lsst.io/configuration.html#example-module-configuration). 

In this experiment we initialize ten “example topics” and produce messages for them at 10Hz. For each field in the source topic the aggregator adds the following summary statistics `min`, `mean`, `median`, `stdev`, `max` and aggregate the messages in windows of 1s. See the the [aggregator configuration settings](https://kafka-aggregator.lsst.io/configuration.html#configuration-settings) for more details.

A new set of aggretated topics is created in Kafka and we use the Kafka [S3 Sink Connector](https://docs.confluent.io/current/connect/kafka-connect-s3/) to write the data into Parquet files, in this example to Amazon S3.

## Reading EFD Parquet files from S3


In [None]:
import io
import boto3
import pandas as pd
import pyarrow.parquet as pq
%matplotlib widget

In [None]:
BUCKET_NAME = "efd-sandbox.data"

The S3 credentials can be added to `~/.aws/credentials` file. They are stored in SQuaRE 1Password. Search for EFD AWS S3 credentials. The S3 region can be added to the `~/.aws/config` file.

For example:
```
cat ~/.aws/credentials
[default]
aws_access_key_id = <the aws_access_key_id>
aws_secret_access_key = <the aws_secret_access_key>
```
and
```
cat ~/.aws/config 
[default]
region=us-east-1
```

In [None]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)

In this example the S3 Sink connector is configured to partition data by time on an hourly basis. The following helps to construct the path to find the Parquet files on S3 for one of the aggregated topics, in this example, `example-002-aggregated`.

In [None]:
topic = "example-002-aggregated"
year = "2020"
month = "08"
day = "07"
hour = "22"

We use the `bucket.download_fileobj()` method to download the Parquet files into a buffer, and then Pyarrow to read the files, convert and append them to a Pandas dataframe.

In [None]:
df = pd.DataFrame()
for obj in bucket.objects.filter(Prefix=f"topics/{topic}/year={year}/month={month}/day={day}/hour={hour}"):
    buffer = io.BytesIO()
    bucket.download_fileobj(obj.key, buffer)
    df = df.append(pq.read_table(buffer).to_pandas())
    print(f"{bucket.name}:{obj.key}")

The S3 Sink connector is configured to invoke file commits to S3 every 10 minutes (see the `rotate_interval_ms` configuration setting) that's why you see 6 files in this path.

In [None]:
df.head()

## Plotting the aggregated stream

In [None]:
p = df.plot(x='time', y='mean_value1', c='white', figsize=(15,5))
p.fill_between(x='time', y1='min_value1', y2='max_value1', data=df)