# DFI Quick Start Guide - Large Synthetic Dataset of 92B Records

This notebook will guide you through querying a large synthetic traffic dataset
in the Data Flow Index from [General System](https://www.generalsystem.com).

OpenAPI specification documentation is available at
<https://api.dataflowindex.io/docs/api>.

Please refer to https://github.com/thegeneralsystem/dfi-client-examples for
the most up-to-date companion documentation.

Additional resources and help are available at <https://support.generalsystem.com>.

## Get ready

In [None]:
# Install Python modules if they are not already present.
!python3 -m pip install requests tabulate sseclient-py pydeck pandas

In [None]:
# Import required modules.
import json
from typing import List

import requests
import sseclient
from tabulate import tabulate

In [None]:
import pandas as pd

# This tutorial uses PyDeck to visualise the data on a map.
# If you want to visualise data, please install PyDeck following the instructions:
#     https://deckgl.readthedocs.io/en/latest/installation.html
# You do not need a Mapbox API key (skip this step).
# You DO need to enable pydeck for Jupyter (follow this step in the guide).
import pydeck as pdk

In [None]:
# First set your API token to access the DFI API.
#
# Access to the DFI demonstration servers requires an API token, which may be
# obtained free of charge by enrolling at <https://eap.generalsystem.com>. Once
# enrolled, your API token may be redeemed from <https://tokens.dataflowindex.io/>.

import os
from getpass import getpass

api_token = getpass("Enter your API token: ")

# Set authorisation headers:
headers = {
    "Authorization": f"Bearer {api_token}",
    "accept": "application/json",
    "content-type": "application/json",
}
base_url = "https://api.dataflowindex.io"
query_timeout = 60

#### In this tutorial we will be querying a large synthetic data set

This synthetic data set represent traffic moving across London.

| total records	| 92,435,312,835 |
| ------------- | -------------- |
| distinct uuids | 1,578,544 |
| start time | 2022-01-01 00:00:00 |
| end time | 2022-08-26 07:12:00 |

Bounding box of all data:

|      | Longitude  | Latitude |
| ---- | ---------- | -------- |
| Min  | -0.5120832 | 51.2810883 |
| Max  | 0.322123   | 51.6925997 |



#### Hardware
- The dataset runs on a single server hosted on AWS
- The server is storage optimised, with 192GB ram and 2 x 7.5TB NVMe SSD

#### Note: this is a shared instance, and you cannot add or delete data to it.

In [None]:
# Get list of instances associated with your API token.
r = requests.get(f"{base_url}/instances", headers=headers, timeout=query_timeout)
print(r.json())

In [None]:
# Next select the DFI instance you will be accessing.
namespace = "gs_eap_demo"
instance_name = "eap-1"
params = {"instance": f"{namespace}.{instance_name}"}

## Query the data

In [None]:
# We have created a set of interesting polygons that you can use to query the
# London Traffic dataset. These include the London Borough areas,
# the Congestion Charging Zone area and more.
# The code below lists the polygons available:
r = requests.get(f"{base_url}/namespaces/{namespace}/polygons", headers=headers, timeout=query_timeout)
if r.status_code != 200:
    print(f"Status code: {r.status_code}")
    print(f"Response:\n{r.text}")
    r.raise_for_status()
data = [[polygon["name"], polygon["count"]] for polygon in r.json()["polygons"]]
print(tabulate(data, ["name", "vertices"], tablefmt="pretty"))

In [None]:
# this is a helper function that allows us to display a polygon on a map
def show_polygon(query_polygon: str) -> pdk.Deck:
    """Visualise a polygon on a map"""
    r = requests.get(
        f"{base_url}/namespaces/{namespace}/polygons/" + query_polygon, headers=headers, timeout=query_timeout
    )
    coordinates = r.json()["vertices"]
    geo_json = {
        "type": "FeatureCollection",
        "features": [
            {
                "type": "Feature",
                "properties": {},
                "geometry": {"coordinates": [coordinates], "type": "Polygon"},
            }
        ],
    }

    geo_json_pdk = pdk.Layer(
        "GeoJsonLayer",
        geo_json,
        opacity=0.2,
        stroked=False,
        filled=True,
        extruded=False,
        wireframe=True,
        get_elevation="0",
        get_fill_color="[255, 255, 0]",
        get_line_color=[255, 255, 255],
        pickable=True,
    )
    view_state = pdk.ViewState(longitude=-0.1, latitude=51.5, zoom=10, min_zoom=5, max_zoom=15, pitch=0, bearing=0)
    return pdk.Deck(layers=[geo_json_pdk], initial_view_state=view_state)


show_polygon("uk_congestion_charge_zone")

### Introducing the streaming API

The dataset we are querying contains 92bn records. For that reason, most queries will take longer than 60 seconds to produce results. 

To avoid our API call to timeout, we use a streaming protocol, which we demonstrate below

In [None]:
# In the HTTP headers we specify we would like to stream the results.
streaming_headers = {"Authorization": f"Bearer {api_token}", "accept": "text/event-stream"}

In [None]:
# Count how many records there are inside a polygon.
polygon = "uk_congestion_charge_zone"
time_params = {
    "startTime": "2022-01-01T08:00:00.000Z",
    "endTime": "2022-01-01T09:30:00.000Z",
}
r = requests.get(
    f"{base_url}/polygon/{namespace}.{polygon}/count",
    params=params | time_params,
    headers=streaming_headers,
    stream=True,
    timeout=query_timeout,
)
client = sseclient.SSEClient(r)
for index, event in enumerate(client.events(), start=1):
    print(f"Message no. {index} of type {event.event}")
    if event.event == "keepAlive":
        continue
    if event.event == "finish":
        break
    if event.event == "message":
        # We got some data!
        results = event.data
        continue
    print("Unexpected event in bagging area")
if len(results) > 0:
    print(f"The polygon '{polygon}' has {results} records in it.")
else:
    print(f"The polygon '{polygon}' does not contain any point")

In [None]:
# Let's add helper functions to simplify our code:
def receive_stream_count(response: requests.models.Response) -> List[any]:
    client = sseclient.SSEClient(response)
    results = []
    for event in client.events():
        if event.event == "keepAlive":
            continue
        if event.event == "finish":
            break
        if event.event == "message":
            # We got some data!
            results = json.loads(event.data)
            continue
        print("Unexpected event in bagging area")
    return results


def receive_stream_entities(response: requests.models.Response) -> List[any]:
    client = sseclient.SSEClient(response)
    results = []
    for event in client.events():
        if event.event == "keepAlive":
            continue
        if event.event == "finish":
            break
        if event.event == "message":
            # We got some data!
            results += [json.loads(event.data)]
            continue
        print("Unexpected event in bagging area")
    return results


def receive_stream_history(response: requests.models.Response) -> List[any]:
    client = sseclient.SSEClient(response)
    results = []
    for event in client.events():
        if event.event == "keepAlive":
            continue
        if event.event == "finish":
            break
        if event.event == "message":
            # We got some data!
            results += json.loads(event.data)
            continue
        print("Unexpected event in bagging area")
    return results

In [None]:
# Count how many unique entities there are inside a polygon.
polygon = "uk_congestion_charge_zone"
time_params = {
    "startTime": "2022-01-01T08:00:00.000Z",
    "endTime": "2022-01-01T09:30:00.000Z",
}
r = requests.get(
    f"{base_url}/polygon/{namespace}.{polygon}/entities",
    params=params | time_params,
    headers=streaming_headers,
    stream=True,
    timeout=query_timeout,
)
r.raise_for_status()

results = receive_stream_entities(r)
print(f"We found the following unique entities: {len(results)}")

In [None]:
def show_history(history: List[List[float]]) -> pdk.Deck:
    """show history on a map"""
    df = pd.DataFrame(history, columns=["Longitude", "Latitude"])

    history_pdk = pdk.Layer(
        "ScatterplotLayer",
        df,
        get_position=["Longitude", "Latitude"],
        auto_highlight=True,
        elevation_scale=500,
        pickable=True,
        elevation_range=[0, 300],
        extruded=True,
        filled=True,
        opacity=0.8,
        radius_scale=6,
        radius_min_pixels=1,
        radius_max_pixels=100,
        line_width_min_pixels=1,
        get_fill_color=[255, 0, 0],
        get_line_color=[255, 0, 0],
        coverage=1,
    )
    view_state = pdk.ViewState(longitude=-0.1, latitude=51.5, zoom=10, min_zoom=5, max_zoom=15, pitch=0, bearing=0)
    r = pdk.Deck(layers=[history_pdk], initial_view_state=view_state)
    return r

In [None]:
# List all records inside a polygon.
polygon = "uk_congestion_charge_zone"
time_params = {
    "startTime": "2022-01-01T08:00:00.000Z",
    "endTime": "2022-01-01T09:30:00.000Z",
}
r = requests.get(
    f"{base_url}/polygon/{namespace}.{polygon}/history",
    params=params | time_params,
    headers=streaming_headers,
    stream=True,
    timeout=query_timeout,
)
r.raise_for_status()

results = receive_stream_history(r)
print(f"Points returned: {len(results)}")
history = [[item["coordinate"][0], item["coordinate"][1]] for item in results]
show_history(history)

In [None]:
# We can also query by polygon supplying the list of vertices of the polygon.
# Vertices must be listed in counter-clockwise order as mandated in the geoJSON standard.
payload = {"vertices": [[-1.1, +1.1], [-1.1, -1.1], [+1.1, -1.1], [+1.1, +1.1], [-1.1, +1.1]]}
r = requests.post(
    f"{base_url}/polygon/count",
    json=payload,
    headers=streaming_headers,
    stream=True,
    params=params,
    timeout=query_timeout,
)
r.raise_for_status()

results = receive_stream_count(r)
print(f"Points found: {results}")

# Adding polygons

In [None]:
# Polygons can be defined, named and stored for later use.
# Polygons are used in "points in polygon" queries. As polygons definitions may
# be large and complex, they can be stored and referred to by name in queries.
# A polygon could be, for instance, the boundary of a country and be several MBs in size.
# Here we create a new polygon.
# Vertices must be listed in counter-clockwise order as mandated in the geoJSON standard.
payload = {
    "name": "my-first-polygon",
    "vertices": [[-1.1, +1.1], [-1.1, -1.1], [+1.1, -1.1], [+1.1, +1.1], [-1.1, +1.1]],
}
r = requests.post(f"{base_url}/polygons", json=payload, headers=headers, timeout=query_timeout)
print(f"Status code: {r.status_code}")

In [None]:
# Reading the polygon.
r = requests.get(f"{base_url}/polygons", headers=headers, timeout=query_timeout)
if r.status_code != 200:
    print(f"Status code: {r.status_code}")
    print(f"Response:\n{r.text}")
    r.raise_for_status()

data = [[polygon["name"], polygon["count"]] for polygon in r.json()["polygons"]]
print(tabulate(data, ["name", "vertices"], tablefmt="pretty"))

## Bounding box queries methods
The user supplies a bounding box by listing its top-left, bottom-right vertices. The DFI will find all points (observations) that lie within. We have 3 types of queries:

* `count` - Computes how many points lie within the polygon
* `points` - Returns the details of the points that lie within the polygon
* `entities` - Returns the list of unique sensor ids that lie within the polygon

All queries optionally support time ranges and limit the search to include a list of sensor IDs.

In [None]:
payload = {"minLng": -1, "minLat": -1, "maxLng": 1, "maxLat": 1}
r = requests.post(
    f"{base_url}/bounding-box/count",
    json=payload,
    headers=streaming_headers,
    stream=True,
    params=params,
    timeout=query_timeout,
)
r.raise_for_status()

results = receive_stream_count(r)
print(f"The bounding box has {results} records in it.")

In [None]:
payload = {"minLng": -1, "minLat": -1, "maxLng": 1, "maxLat": 1}
r = requests.post(
    f"{base_url}/bounding-box/history",
    json=payload,
    headers=streaming_headers,
    stream=True,
    params=params,
    timeout=query_timeout,
)
r.raise_for_status()

results = receive_stream_history(r)
history = [[item["coordinate"][0], item["coordinate"][1]] for item in results]
print(f"Records found: {len(history)}")

In [None]:
payload = {"minLng": -1, "minLat": -1, "maxLng": 1, "maxLat": 1}
r = requests.post(
    f"{base_url}/bounding-box/entities",
    json=payload,
    headers=streaming_headers,
    stream=True,
    params=params,
    timeout=query_timeout,
)
r.raise_for_status()

results = receive_stream_entities(r)
print(f"We found the following unique entities: {len(results)}")