# Timestream

This notebook explores the Timestream API via an sample database that is used to store sensor data from Ruuvi sensors. If you do not own Ruuvi sensor you can simply simulate the data using the generator.


In [None]:
!pip install boto3 --upgrade

After importing `boto3` we setup a default session specifying the profile and region we want to use.

In [None]:
import boto3
boto3.setup_default_session(profile_name='iot',region_name='us-east-1')

## Databases

Timestream databases and tables can be created using the write client.
We first crete a client for timestream, list the existing databases and then create a new database.

In [None]:
tsc = boto3.client('timestream-write')

In [None]:
tsc.list_databases()

In [None]:
DB_NAME='test4'

In [None]:
tsc.create_database(DatabaseName=DB_NAME)

In [None]:
tsc.describe_database(DatabaseName=DB_NAME)

## Tables

Tables belong to databases and represent the space where you write data. In order to create a new table you specify 2 paramters (2 dials): 
1. the memory retention period
2. the magneting retention period

You can write data in timestream if it is not older that what the memory store period counted from the current time. 
Let's say it is Jan 30th at 12h00, and you have configured the memory retention period to 2hrs, then all data with a timestamp between Jan 30th 10h00 and Jan 30th 12h15 is accepted. As data gets older than what the memory store holds it is automatically moved to the magnetic store.


In [None]:
TABLE_NAME='historical2'
HOT_TIER_TTL_HOURS=12
COLD_TIER_TTL_DAYS=7

In [None]:
tsc.create_table(DatabaseName=DB_NAME, TableName=TABLE_NAME, RetentionProperties= {
    'MemoryStoreRetentionPeriodInHours': HOT_TIER_TTL_HOURS,
    'MagneticStoreRetentionPeriodInDays': COLD_TIER_TTL_DAYS
})

In [None]:
tsc.describe_table(DatabaseName=DB_NAME, TableName=TABLE_NAME)

## Write data

The API to write records accepts either single records with distinct dimensions or multiple records with the same dimension. 

Let's say that we want to trac temperature, humidity and pressure from sensors located at different sites. In each site we also have more granular locations. So the dimensions we are going to define for our data will be:

* site
* location

Next, let's define the dimensions for few of our sensors:

In [None]:
sensor1_dimensions = [
    { 'Name': 'location', 'Value': 'bedroom'},
    { 'Name': 'site', 'Value': 'home'}
]
sensor2_dimensions = [
    { 'Name': 'location', 'Value': 'bathroom'},
    { 'Name': 'site', 'Value': 'home'}
]
sensor3_dimensions = [
    { 'Name': 'location', 'Value': 'greenhouse'},
    { 'Name': 'site', 'Value': 'home'}
]

sensor4_dimensions = [
    { 'Name': 'site', 'Value': 'home'}
]

In our case we are using ruuvi sensors. These sensors generates the following measures:

```json
{
  "dataFormat": 5,
  "rssi": -90,
  "temperature": 21.87,
  "humidity": 30.1775,
  "pressure": 101028,
  "accelerationX": 32,
  "accelerationY": -24,
  "accelerationZ": 1016,
  "battery": 2989,
  "txPower": 4,
  "movementCounter": 24,
  "measurementSequenceNumber": 45173
}
```
NOTE: Ruuvi sensor generate binary encoded data. The field names in the above structure are not defined by the sensor.

Each value corresponds to a measure. For the input above we can then create the records as follow:

In [None]:
from time import time

data = {
  "dataFormat": 5,
  "rssi": -90,
  "temperature": 24.7,
  "humidity": 30.1775,
  "pressure": 101028,
  "accelerationX": 32,
  "accelerationY": -24,
  "accelerationZ": 1016,
  "battery": 2989,
  "txPower": 4,
  "movementCounter": 24,
  "measurementSequenceNumber": 45173
}

def ruuvi_data_to_records(data):
    records = []
    for k,v in data.items():
        if k in ['temperature', 'humidity', 'pressure', 'battery', 'txPower']:
            records.append({
                'MeasureName': k,
                'MeasureValue': str(v)
            })
    return records

print(ruuvi_data_to_records(data))

And finally we write the records:

In [None]:
from time import time
ts = time()

In [None]:
try:
    tsc.write_records(DatabaseName=DB_NAME,
                 TableName=TABLE_NAME,
                 CommonAttributes= {
                     'Dimensions': sensor1_dimensions,
                     'MeasureValueType': 'DOUBLE',
                     'Time': str(int(ts*1000)),
                     'TimeUnit': 'MILLISECONDS'
                 },
                 Records=ruuvi_data_to_records(data))
except Exception as err:
    print(err)

We can do the for another sensor, this time specifying the record directly with all the fields and not using the common attributes.

In [None]:

try: 
    tsc.write_records(DatabaseName=DB_NAME, TableName=TABLE_NAME, Records=[
    {
        'Dimensions':sensor2_dimensions,
        'MeasureValueType': 'DOUBLE',
        'Time': str(int(ts*1000)),
        'TimeUnit': 'MILLISECONDS', 
        'MeasureName': 'temperature',
        'MeasureValue': '20.2'
    }
    ])
    print('Success')
except Exception as err:
    e = err

In [None]:
e.response

### Changing retention periods

We can change the properties of the table at any time. If the memory storage period is extended, data already ingested will not be brought back from magnetic into memory. 

Say it is 2021-01-01T12h00 and you had configured a memory retention period of 2h. So you might potentially have data in the magnetic store which has a timestamp of 2021-01-01T09h59
At this time you change the retention period to 4h, and you try to import data with a timestamp of 2021-01-01T09h00. The timestamp falls inside the new memory retention period, but as there might already be data in the magnetic store with an older timestamp, Timestream will not allow to write it. 

Time passes, and now it is 2021-01-01T13h00, and you try to write data that is 3hrs old, ie with a timestamp of 2021-01-01T10h00. This write is now accepted since no data after 09h59 has been transitioned to the magnetic store after you have increased the retention period to 4h.

In [None]:
tsc.update_table(DatabaseName=DB_NAME, TableName=TABLE_NAME, RetentionProperties= {
    'MemoryStoreRetentionPeriodInHours': 12,
    'MagneticStoreRetentionPeriodInDays': COLD_TIER_TTL_DAYS
})

In [None]:
try:
    tsc.write_records(DatabaseName=DB_NAME,
                 TableName=TABLE_NAME,
                 CommonAttributes= {
                     'Dimensions': sensor1_dimensions,
                     'MeasureValueType': 'DOUBLE',
                     'Time': str(int((time()-4000*12)*1000)),
                     'TimeUnit': 'MILLISECONDS'
                 },
                 Records=ruuvi_data_to_records(data))
except Exception as err:
    print(err)

## Queries

Queries are written in SQL-like format with some specific timeseries extensions. To run queries we have to instantiate a query client.

In [None]:
tsq = boto3.client('timestream-query')

In [None]:
paginator=tsq.get_paginator('query')

In [None]:
query='''WITH interp_ts AS (
    SELECT location, INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(time, measure_value::double),
            SEQUENCE(ago(3h), ago(2h), 30m)) AS temp
        FROM ruuvi.sensors
        WHERE measure_name='temperature' and time>=ago(4h)
        GROUP BY location
)
SELECT location, avg(t.temp_unnest) FROM interp_ts
CROSS JOIN UNNEST(temp) AS t (time, temp_unnest)
GROUP BY location
'''

In [None]:
query='''SELECT location, measure_name, INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(time, measure_value::double),
            SEQUENCE(ago(4h), ago(3h), 10m)) AS v
        FROM ruuvi.sensors
        WHERE measure_name ='temperature' and time >= ago(5h)
                GROUP BY location, measure_name'''

As a query might retrieve lots of data, the answer is paginated. To easily access all the data, we can use a `paginator`.

In [None]:
p =paginator.paginate(QueryString=query)

In [None]:
for page in p:
    print(page)

tsq.query(QueryString=query)

When writing code we strive to reuse and factorize as much code as possible. With Timestream we can use substitution templates to write generic queries that can be applied to specific values. 

In [None]:

QUERY_MULTI = """select bin(time, {bin}) as binned_time, avg(case when measure_name='temperature' then measure_value::double else null end) as avg_temp,
avg(case when measure_name='humidity' then measure_value::double else null end) as avg_humidity,
avg(case when measure_name='pressure' then measure_value::double else null end) as avg_pressure
from ruuvi.sensors
where time > ago({time}) and location = '{location}'
group by bin(time, {bin})
order by bin(time, {bin})"""

In [None]:
query = QUERY_MULTI.format(time='3h', bin='10m', location='e7428453ecb1')

In [None]:
query

In [None]:
tsq.query(QueryString=query)

## What's next

You can deploy a demo application using the code in the `./cdk` folder (follow the instruction in the README there). This will deploy a small backend that allows client apps to interact with Timestream to query data. In the `./timestream-explorer` folder you find such an app that allows you to visualize arbitrary queries to your database. Think a Timestream query console with graphing capabilities. 

Have fun, work hard, and never stop learning!
