# Experiments on timescale

The goal here is to try to setup reasonable scenarios on how we could use timescaledb + tiledb to manage sensor data.

The notebook is divided in three parts:

 * boiler plate needed to setup the db infrastucture
 * a possible set of tables as a quick implementation
 * using the quick implentation on a set of use cases.

## Boiler plate configuration

The following are configuration details to setup timescaledb with the postgis extension.

### Postgres extensions loading

In [1]:
import psycopg2 as psy
from datetime import datetime, timedelta
import json

from osgeo import ogr
from osgeo import osr
from datetime import datetime, timedelta

import numpy as np
import tiledb
import os

  """)


In [2]:
# create testdb
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

con = psy.connect("dbname=postgres host=timescaledb user=postgres password=foobar")

with con:
    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    with con.cursor() as cur:
        cur.execute("DROP DATABASE IF EXISTS testdb;")
        cur.execute('CREATE DATABASE testdb')

# not sure if the AUTOCOMMIT is needed
with con:
    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    with con.cursor() as cur:   
        cur.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;")
con.close()

In [3]:
con = psy.connect("dbname=testdb host=timescaledb user=postgres password=foobar")

with con:
    with con.cursor() as cur:   
        cur.execute("CREATE EXTENSION IF NOT EXISTS postgis;")
        cur.execute("SELECT postgis_version();")
        print(cur.fetchall())
        cur.execute("SELECT srid, auth_name, proj4text FROM spatial_ref_sys LIMIT 3;")
        for x in cur.fetchall():
            print(x)
        cur.execute("SELECT srid, auth_name, proj4text FROM spatial_ref_sys WHERE srid = 3003")
        print(cur.fetchall())
con.close()

[('2.5 USE_GEOS=1 USE_PROJ=1 USE_STATS=1',)]
(3819, 'EPSG', '+proj=longlat +ellps=bessel +towgs84=595.48,121.69,515.35,4.115,-2.9383,0.853,-3.408 +no_defs ')
(3821, 'EPSG', '+proj=longlat +ellps=aust_SA +no_defs ')
(3824, 'EPSG', '+proj=longlat +ellps=GRS80 +towgs84=0,0,0,0,0,0,0 +no_defs ')
[(3003, 'EPSG', '+proj=tmerc +lat_0=0 +lon_0=9 +k=0.9996 +x_0=1500000 +y_0=0 +ellps=intl +towgs84=-104.1,-49.1,-9.9,0.971,-2.917,0.714,-11.68 +units=m +no_defs ')]


### Geographic reference system

We would like to be able to easily express concepts as "all objects within a <distance> radius from point p" with the point p defined by its (longitude, latitude) pair in the [World Geodetic System (GPS, EPSG4326)](https://en.wikipedia.org/wiki/World_Geodetic_System). However, this does not imply that we need to store internally geographyical information using that coordinate system. The natual choice, since we are focusing on a region centered around the island of Sardinia is to have a local reference system, here EPSG3003 [see](http://www.sardegnageoportale.it/documenti/40_583_20150625090153.pdf), where the Earth sphere is, practically, mapped to a local plane and objects positions are expressed in distance in meters, in the North and East direction from a reference point.

In the latter coordinate system, distances between objects can be directly computed using Pythagora's theorem.
On the other hand, it is still useful to express object positions using their GPS coordinates. Therefore, we need to be able to handle conversions from one coordinate system to the other.

In [4]:
import itertools as it
import psycopg2.sql as sql
from osgeo import ogr
from osgeo import osr
import json

source = osr.SpatialReference()
source.ImportFromEPSG(4326)

target = osr.SpatialReference()
target.ImportFromEPSG(3003)

transform = osr.CoordinateTransformation(source, target)

def map_to_monte_mario(wkt):
    geom = ogr.CreateGeometryFromWkt(wkt)
    geom.Transform(transform)
    return geom.ExportToWkt()


def take_by_n(a, n):
    c = it.cycle(range(2*n))
    for k, g in it.groupby(a, lambda _: next(c) < n):
        yield [_ for _ in g]


def format_to_sql_tuple(t):
    "Convert tuple t to an SQL.Composable."
    return  sql.SQL("({})").format(sql.SQL(', ').join(sql.Literal(v) for v in t))


def execute_in_context(conn, SQL):
    with conn:
        with conn.cursor() as cur:
            cur.execute(SQL)

    
def load_data_by_chunks(conn, data, chunk_size, into, format_to_sql_tuple):
    values = take_by_n(data, chunk_size)
    for v in values:
        s = sql.SQL(into) + sql.SQL(' VALUES ')
        s += sql.SQL(', ').join(format_to_sql_tuple(_) for _ in v)
        execute_in_context(conn, s)

## A prototype implementation

In this section we define a prototype implementation that is based on the following model:

 * a **sensor** is an identifiable device of a given **sensor_type** that generates a well defined datum at acquisition events, it has a geographical footprint and associated to it there is a set of metainformations (e.g., is the sensor broken?);
 * a **sensor_type** describes a class of sensors;
 * a **node** is a physical object that controls a group of sensors;
 * a **station** is a phyisical object that controls a group of nodes;
 * a **measure** represent the datum collected at the acquisition event.

### Sensor type

A sensor type is, externally, uniquely identified by its uuid tag. Internally, it is indexed and referred to by using its, db generated, id.

In [5]:
def create_sensor_types_table(conn):
    """The sensor types table mantains the definition of the sensor types.
        <id(int)>,<stype uuid>,<json>
    """
    SQL = """
    CREATE TABLE  sensor_types (
           id          SERIAL PRIMARY KEY,
           tag         UUID UNIQUE,
           description JSONB);
    """
    execute_in_context(conn, SQL)    


def destroy_sensor_types_table(conn):
    """
    Can be safely called even when the sensor_types table does not exist.
    """
    SQL = """
    DROP TABLE IF EXISTS sensor_types;
    """
    execute_in_context(conn, SQL)


def get_sensor_types_mapping(conn):
    with conn:
        with conn.cursor() as cur:
            cur.execute("SELECT id, tag from sensor_types;")
            uuid_to_id = {}
            for r in cur.fetchall():
                uuid_to_id[r[1]] = r[0]
            return uuid_to_id


def load_sensor_types(conn, data, validate=False, chunk_size=1000):
    """
    Load an iterable of sensor types definitions.
    Each element of data should be an iterable with 3 fields:
    (<uuid of sensor_type>, <desc>)

    e.g.,
      data = [('2e174cf0-fbb2-4e2d-b1ab-230c87a3166c', {'key': 'value'}),]
    """
    def fix_json(t):
        t2 = (t[0], json.dumps(t[1]))
        return format_to_sql_tuple(t2)
    into = "INSERT INTO sensor_types (tag, description)"
    load_data_by_chunks(conn, data, chunk_size, into, fix_json)


In [6]:
import uuid

def simulate_sensor_types(n):
    return [(str(uuid.uuid4()), {'key': 'value{}'.format(_)}) for _ in range(n)]        


In [7]:
con = psy.connect("dbname=testdb host=timescaledb user=postgres password=foobar")

In [8]:
destroy_sensor_types_table(con)

In [9]:
create_sensor_types_table(con)

In [10]:
data = simulate_sensor_types(100)

In [11]:
load_sensor_types(con, data)

In [12]:
def check_sensor_types(conn, data):
    with conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * from sensor_types;')
            rdata = cur.fetchall()
            return rdata

In [13]:
rdata = check_sensor_types(con, data)

In [14]:
vdata = [(1 + _,) + d for _, d in enumerate(data)]

In [15]:
vdata[0], rdata[0]

((1, 'cbca3889-2731-4d2c-b3e9-4b6f1c233bfe', {'key': 'value0'}),
 (1, 'cbca3889-2731-4d2c-b3e9-4b6f1c233bfe', {'key': 'value0'}))

In [16]:
rdata[0] == vdata[0], rdata[50] == vdata[50]

(True, True)

### Nodes

A node is, externally, uniquely identified by its uuid tag. Internally, it is indexed and referred to by using its, db generated, id.

In [17]:
def create_nodes_table(conn):
    """The nodes table mantains the definition of the nodes.
        <id(int)>,<stype uuid>, <geo>, <json>
    """
    SQL = """
    CREATE TABLE  nodes (
           id          SERIAL PRIMARY KEY,
           tag         UUID UNIQUE,
           station     INT4 NOT NULL,
           geom        GEOMETRY,
           description JSONB);
    """
    execute_in_context(conn, SQL)    


def destroy_nodes_table(conn):
    """
    Can be safely called even when the sensor_types table does not exist.
    """
    SQL = """
    DROP TABLE IF EXISTS nodes;
    """
    execute_in_context(conn, SQL)

def get_nodes_mapping(conn):
    with conn:
        with conn.cursor() as cur:
            cur.execute("SELECT id, tag from nodes;")
            nodes_uuid_to_id = {}
            for r in cur.fetchall():
                nodes_uuid_to_id[r[1]] = r[0]
            return nodes_uuid_to_id

def load_nodes(conn, data, validate=False, chunk_size=1000):
    """
    Load an iterable of nodes definitions.
    Each element of data should be an iterable with 3 fields:
    (<uuid of node>, <station id>, <position>, <desc>)

    e.g.,
      data = [('2e174cf0-fbb2-4e2d-b1ab-230c87a3166c', 1, 'POINT(9.220304 39.276140)', {'key': 'value'}),]
    """
    def fix_geom_and_json(t):    
        return  sql.SQL("({})").format(sql.SQL(', ').join([
            sql.Literal(t[0]), sql.Literal(t[1]),
            sql.SQL("ST_GeomFromText('%s', 3003)" % map_to_monte_mario(t[2])),
            sql.Literal(json.dumps(t[3]))]))
    into = "INSERT INTO nodes (tag, station, geom, description)"
    load_data_by_chunks(conn, data, chunk_size, into, fix_geom_and_json)


In [18]:
destroy_nodes_table(con)

In [19]:
create_nodes_table(con)

### Sensors

A sensor is connected to a node and the nodes to a station. The geometrical footprint is expressed using the following [OpenGIS WKT](https://postgis.net/docs/using_postgis_dbmanagement.html) notation:
```
'POINT(9.220304 39.276140)' # a single point
'LINESTRING(0 0, 1 1, 1 2)' # a polyline
'POLYGON((0 0, 4 0, 4 4, 0 4, 0 0), (1 1, 2 1, 2 2, 1 2,1 1))' # a polygon with a hole.
'MULTIPOINT((0 0), (1 2))' # 
```

All the coordinates should be in EPSG(4326). 

In [20]:
def create_sensors_table(conn):
    """
    The sensors table mantains information on specific sensors

    <sid(int)>,<stypeid>,<stationid>,<geom>,<JSONB>
    
    """

    SQL = """
    CREATE TABLE sensors (
           id          SERIAL PRIMARY KEY,
           tag         UUID UNIQUE,
           stype       INT4 NOT NULL,
           node        INT4 NOT NULL,
           geom        GEOMETRY,
           description JSONB);
    """
    execute_in_context(conn, SQL)    


def destroy_sensors_table(conn):
    """
    Can be safely called even when the sensors table does not exists.
    """
    SQL = """
    DROP TABLE IF EXISTS sensors;
    """
    execute_in_context(conn, SQL)


def get_sensors_mapping(conn):
    with conn:
        with conn.cursor() as cur:
            cur.execute("SELECT id, tag from sensors;")
            uuid_to_id = {}
            for r in cur.fetchall():
                uuid_to_id[r[1]] = r[0]
            return uuid_to_id


def load_sensors(conn, data, validate=False, chunk_size=1000):
    """
    Load an iterable of sensor definitions data to the sensors table.
    Each element of data should be an iterable with 5 fields

    <stypeid>,<stationid>,<geom>,<json>
    
    """
    # FIXME it appears that I cannot trigger psycopg2 JSON support
    def fix_geom_and_json(t):    
        return  sql.SQL("({})").format(sql.SQL(', ').join([
            sql.Literal(t[0]), sql.Literal(t[1]), sql.Literal(t[2]),
            sql.SQL("ST_GeomFromText('%s', 3003)" % map_to_monte_mario(t[3])),
            sql.Literal(json.dumps(t[4]))]))
    into = "INSERT INTO sensors (tag, stype, node, geom, description)"
    load_data_by_chunks(conn, data, chunk_size, into, fix_geom_and_json)


In [21]:
def simulate_sensors(n):
    return [(str(uuid.uuid4()), 1, 1, 'POINT(9.220304 39.276140)', {'key': 'value{}'.format(_)}) 
            for _ in range(n)]    

In [22]:
def check_sensors(conn, data):
    with conn:
        with conn.cursor() as cur:
            cur.execute('SELECT id, tag, stype, node, ST_AsText(ST_Transform(geom,4326)), description from sensors;')
            rdata = cur.fetchall()
            return rdata

In [23]:
destroy_sensors_table(con)

In [24]:
create_sensors_table(con)

In [25]:
data = simulate_sensors(100)

In [26]:
load_sensors(con, data)

In [27]:
rdata = check_sensors(con, data)

In [28]:
vdata = [(1 + _,) + d for _, d in enumerate(data)]

We have a round-off problem...

In [29]:
rdata[0], vdata[0], rdata[50], vdata[50]

((1,
  '7065fd86-eac4-4029-8782-bbbe943296f6',
  1,
  1,
  'POINT(9.22030400716716 39.2761400015204)',
  {'key': 'value0'}),
 (1,
  '7065fd86-eac4-4029-8782-bbbe943296f6',
  1,
  1,
  'POINT(9.220304 39.276140)',
  {'key': 'value0'}),
 (51,
  'abc72b02-a6a9-4541-8095-b8706c13393c',
  1,
  1,
  'POINT(9.22030400716716 39.2761400015204)',
  {'key': 'value50'}),
 (51,
  'abc72b02-a6a9-4541-8095-b8706c13393c',
  1,
  1,
  'POINT(9.220304 39.276140)',
  {'key': 'value50'}))

### Measures table

This is the critical part of the exercise.

What we are trying to do is to use a *entity, attribute, value* like formalism to uniformly manage all possible measure events. Specifically,

 * a point sensor (say temperature) will generate a record where only the value part has a value assigned and NULL on the others;
 * a, say, radar, sensor will generate a record with NULL in the value field with an url that points to the tiledb array and indx that selects the relevant frame;
 * a value measured by a moving sensor will have the value field and the index field assigned and NULL on the url field.
 
Of course, we use the type of the sensor

**NOTE** we could, denormalize everything, but, for the time being, it is probably better to wait.

In [30]:
def create_measures_table(conn):
    """
    The measures table mantains information on the acquired data.

    <timestamp>,<sid>,<floatvalue>|NULL,<ref>|NULL,<indx>|NULL
    """
    SQL = """
    CREATE TABLE measures (
           time   TIMESTAMPTZ NOT NULL,
           sensor INT4        NOT NULL,
           value  REAL,
           url    TEXT,
           indx   INT4);
    SELECT create_hypertable('measures', 'time');
    CREATE INDEX measures_sensor_index on measures(sensor);
    """
    with conn:
        with conn.cursor() as cur:
            cur.execute(SQL)


def destroy_measures_table(conn):
    """
    Can be safely called even when the measures table does not exists.
    """
    SQL = """
    DROP TABLE IF EXISTS measures;
    """
    execute_in_context(conn, SQL)


def load_measures(conn, data, validate=False, chunk_size=1000):
    """
    Load an iterable of measures data to the measures table.
    Each element of data should be an iterable with 4 fields:
    (<timestamp>,<sid>,<floatvalue>|NULL,<url>|NULL,<indx>|NULL)

    Legal combinations are:

      (<timestamp>, <sid>, <floatvalue>, None, None) # standard scalar
      (<timestamp>, <sid>, None, <reftoarrayseq>, <arrayidx>) #array seq data
      (<timestamp>, <sid>, None, None, <idx>) # trace data

    """
    into = "INSERT INTO measures (time, sensor, value, url, indx)"
    load_data_by_chunks(conn, data, chunk_size, into, format_to_sql_tuple)


In [31]:
def simulate_measures(n):
    now = datetime.now()
    return [(now + timedelta(seconds=5*_), 1, 0.02 * _, None, None) for _ in range(n)]  

In [32]:
def check_measures(conn, data):
    with conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * from measures;')
            rdata = cur.fetchall()
            return rdata

In [33]:
destroy_measures_table(con)

In [34]:
create_measures_table(con)

In [35]:
data = simulate_measures(100)

In [36]:
load_measures(con, data)

In [37]:
rdata = check_measures(con, data)

In [38]:
rdata[0], data[0], rdata[50], data[50]

((datetime.datetime(2019, 4, 23, 11, 16, 3, 379599, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=0, name=None)),
  1,
  0.0,
  None,
  None),
 (datetime.datetime(2019, 4, 23, 11, 16, 3, 379599), 1, 0.0, None, None),
 (datetime.datetime(2019, 4, 23, 11, 20, 13, 379599, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=0, name=None)),
  1,
  1.0,
  None,
  None),
 (datetime.datetime(2019, 4, 23, 11, 20, 13, 379599), 1, 1.0, None, None))

## Use cases and scenarios

We will consider the following scenarios:

 1. A network of traffic sensors distributed along major traffic routes
 2. A meteo radar
 3. A collection of energy/meteo sensors distributed across the city.

### A network of traffic sensors

The data flux is coming from a single station that collects data coming from 1568 nodes, each of which refers to a specific oriented edge of the road network and, itself, collects data from 4 specialized traffic sensors (*jam*, *speed*, *flux*, *length*). The specific data collected from each sensor is FIXME. 


#### Station

FIXME, we currently do not have support for stations, so we assume that we have a single station with id 1.

#### Node

Each node is identified by its *uuid* and has, as minimal attributes, an *osm_edge* and a reference FIXME position intentified by a *lon* and a *lat*.

In [39]:
import csv
import io
import itertools as it
        
nodes = []

with io.open('./data/traffic/nodes.csv') as f:
    df = csv.DictReader(f)
    for r in df:
        nodes.append((r['uuid'], 1, 'POINT({} {})'.format(r['lon'], r['lat']), {'osm_edge': r['osm_edge']}))

In [40]:
load_nodes(con, nodes)

In [41]:
nodes_uuid_to_id = get_nodes_mapping(con)

#### Sensor types

We need to define 4 sensor types.


In [42]:
sensor_types = []

with io.open('./data/traffic/sensor_types.csv') as f:
    df = csv.DictReader(f)
    for r in df:
        sensor_types.append((r['uuid'], json.loads(r['description'])))

In [43]:
load_sensor_types(con, sensor_types)

In [44]:
sensor_types_uuid_to_id = get_sensor_types_mapping(con)   

#### Sensors definitions


In [45]:
sensors = []
with io.open('./data/traffic/sensors.csv') as f:
    df = csv.DictReader(f)
    for r in df:
        sensors.append((r['uuid'], 
                        sensor_types_uuid_to_id[r['type']], 
                        nodes_uuid_to_id[r['node']],
                        'POINT({} {})'.format(r['lon'], r['lat']), {'key': 'value'})
                       )

In [46]:
load_sensors(con, sensors)

In [47]:
sensors_uuid_to_id = get_sensors_mapping(con)

#### Actual measures

The measures file contains 84511921 rows. Each row
has the following structure:

```
[zag@pflip traffic]$ head measures.csv 
uuid,time,measure
98359c6d-863a-4c94-a997-d0e5446a489f,2019-02-21T11:32:08Z,2.76693
77cb21dc-a2e3-4434-96f6-baa15748979d,2019-02-21T11:32:08Z,26.93
156ec27f-0642-45a4-8bbb-c1b31649e6e5,2019-02-21T11:32:08Z,39.5
2fad7fbe-5923-43d2-85cc-96a0dd782263,2019-02-21T11:32:08Z,4.6948
b376da04-d521-4c0e-98dc-c683bf714281,2019-02-21T11:32:08Z,3.03756
aa3e95cf-78f8-4a63-9e6e-61a794079ee0,2019-02-21T11:32:08Z,16.09
12e7b744-794d-482b-ac84-cf5daeb039ec,2019-02-21T11:32:08Z,26.0
5808faf1-1040-47d1-9cfb-6d0a7ebe458c,2019-02-21T11:32:08Z,0.01049
9b00d24a-6798-4a60-bc2d-c118acebd6ae,2019-02-21T11:32:38Z,1.49068
```



In [48]:
with io.open('./data/traffic/measures.csv') as f:
    df = csv.DictReader(f)
    measures = [(datetime.strptime(
                     r['time'],'%Y-%m-%dT%H:%M:%SZ'),
                 sensors_uuid_to_id[r['uuid']],
                 r['measure'])
                for r in df]
        
        

In [53]:
import sys

with io.open('./data/traffic/measures.csv') as f:
    df = csv.DictReader(f)
    data = take_by_n(df, n=10000)
    for chunk in data:
        measures = [(datetime.strptime(r['time'],'%Y-%m-%dT%H:%M:%SZ'),
                     sensors_uuid_to_id[r['uuid']],
                     r['measure'], None, None)
                    for r in chunk]
        sys.stdout.write('.')
        load_measures(con, measures)

........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

...................................................................................................................................................................................................................................................................