# ETL Timeseries Import

This notebook illustrates how bulk import data from a users datasource.

## References 
* This notebook uses an example [dataset](https://archive.ics.uci.edu/ml/datasets/Occupancy+Detection+) from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/index.php).
* The [Waylay api documentation](https://docs.waylay.io/api/)
* The [Waylay python SDK](https://docs.waylay.io/api/sdk/python/)
* [Setup instructions](https://github.com/waylayio/demo-general/tree/master/python-sdk) for a python notebook using the Waylay Python SDK.


## Parameters
Please review and adapt the following parameters for this demo

In [1]:
import tempfile
from datetime import datetime
from random import random

temp_dir_ctx = tempfile.TemporaryDirectory()

class ImportDemo:
    """parametrization for this demo"""
    
    # original location of the data set
    data_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00357/occupancy_data.zip'
    
    # the profile name under which waylay credentials are stored
    waylay_client_profile='demo'
    
    run_id = f"{datetime.now():%Y-%m-%d}-{1000 * random():04.0f}"
    # the id of the resource under which this demo is run
    resource_id = f'dataframe_import_{run_id}'
    
    # a temporary directory that we can easily cleanup
    temp_dir = temp_dir_ctx.__enter__()

    @classmethod
    def cleanup(cls):
        temp_dir_ctx.__exit__(None,None,None)

## Setup

In [2]:
import pandas as pd
import waylay
from datetime import datetime

waylay.__version__

'v0.2.0+81.g7f8a960'

## Data retrieval

### download the data set
We download the dataset (a zipped set of csv files), inspect its content, and read out the csv files into a pandas data structure.

In [3]:
import os
import os.path
import zipfile
from urllib.request import urlretrieve

os.makedirs('input', exist_ok=True)
os.makedirs('output', exist_ok=True)

# download the kaggle data set
if not os.path.isfile('input/occupancy.zip'):
    urlretrieve(ImportDemo.data_url, 'input/occupancy.zip')
    
with zipfile.ZipFile('input/occupancy.zip') as occ_zip:
    for file_name in occ_zip.namelist():
        print(file_name)

datatest.txt
datatest2.txt
datatraining.txt


In [4]:
with zipfile.ZipFile('input/occupancy.zip') as occ_zip:
    datatest = pd.read_csv(occ_zip.open('datatest.txt'))
    datatest2 = pd.read_csv(occ_zip.open('datatest2.txt'))
    datatraining = pd.read_csv(occ_zip.open('datatraining.txt'))
    


In [5]:
datatraining.describe()

Unnamed: 0,Temperature,Humidity,Light,CO2,HumidityRatio,Occupancy
count,8143.0,8143.0,8143.0,8143.0,8143.0,8143.0
mean,20.619084,25.731507,119.519375,606.546243,0.003863,0.21233
std,1.016916,5.531211,194.755805,314.320877,0.000852,0.408982
min,19.0,16.745,0.0,412.75,0.002674,0.0
25%,19.7,20.2,0.0,439.0,0.003078,0.0
50%,20.39,26.2225,0.0,453.5,0.003801,0.0
75%,21.39,30.533333,256.375,638.833333,0.004352,0.0
max,23.18,39.1175,1546.333333,2028.5,0.006476,1.0


In [6]:
datatraining.head()

Unnamed: 0,date,Temperature,Humidity,Light,CO2,HumidityRatio,Occupancy
1,2015-02-04 17:51:00,23.18,27.272,426.0,721.25,0.004793,1
2,2015-02-04 17:51:59,23.15,27.2675,429.5,714.0,0.004783,1
3,2015-02-04 17:53:00,23.15,27.245,426.0,713.5,0.004779,1
4,2015-02-04 17:54:00,23.15,27.2,426.0,708.25,0.004772,1
5,2015-02-04 17:55:00,23.1,27.2,426.0,704.5,0.004757,1


## ETL Timeseries Bulk Import 
To bulk upload data into the Waylay Platform, the python supports SDK the following process.

<img src="ETL File Import.png">

This demo describes the general upload process, using Pandas dataframes.

Alternatively, you directly use CSV files, e.g. when files are too large to fit in memory.

The <a href="ETL import CSV conversion.ipynb">ETL import CSV conversion</a> notebook illustrates the different supported CSV formats.

The `etl_tool` service tool in the waylay client support this workflow.

In [7]:
waylay_client = waylay.WaylayClient.from_profile()
etl_tool = waylay_client.timeseries.etl_tool
etl_tool.temp_dir = ImportDemo.temp_dir

### convert to etl format
To upload bulk data into waylay, `etl_tool.prepare_import()` will convert your input source into a local CSV file that
has the right format.

In this case, we provide the tool with additional information:
 * `timestamp_timezone='UTC'` as timestamps do not contain a timezone component
 * `resource=ImportDemo.resource_id` as the resource id is not provided in the input
 * `timestamp_key='date'`, as timestamps are in the `date` column. The default if not specified is `timestamp`, but `date` will be recognised too.

The first two instruction are required for this dataset. Try to omit them to see what errors are raised.

In [8]:
etl_import = etl_tool.prepare_import(
    datatraining, 
    timestamp_timezone='UTC',
    resource=ImportDemo.resource_id,
    timestamp_column='date'
)
etl_import

100%|██████████| 6.00/6.00 [00:00<00:00, 17.0series/s]


WaylayETLSeriesImport(import_file=ETLFile(directory='/tmp/tmpk7hdcerf', prefix='import-20210603.142206'), settings=SeriesSettings(metrics=['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio', 'Occupancy'], metric_column=None, metric=None, resources=['dataframe_import_2021-06-03-0176'], resource_column=None, resource='dataframe_import_2021-06-03-0176', value_column=None, timestamp_column='date', timestamp_offset=None, timestamp_first=None, timestamp_last=None, timestamp_interval=None, timestamp_constructor=None, timestamp_timezone='UTC'), storage_bucket='etl-import')

Because it is easer to work with recent data, we instruct the tool to shift timestamps
(with `timestamp_offset`, `timestamp_first` or `timestamp_last`)

In [9]:
etl_import = waylay_client.timeseries.etl_tool.prepare_import(
    datatraining, 
    timestamp_timezone='UTC',
    resource=ImportDemo.resource_id,
    timestamp_column='date',
    timestamp_last=datetime.utcnow(), # shift all timestamps so that last one is now
    temp_dir='output'
)
etl_import

100%|██████████| 6.00/6.00 [00:00<00:00, 19.4series/s]


WaylayETLSeriesImport(import_file=ETLFile(directory='output', prefix='import-20210603.142207'), settings=SeriesSettings(metrics=['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio', 'Occupancy'], metric_column=None, metric=None, resources=['dataframe_import_2021-06-03-0176'], resource_column=None, resource='dataframe_import_2021-06-03-0176', value_column=None, timestamp_column='date', timestamp_offset=None, timestamp_first=None, timestamp_last=datetime.datetime(2021, 6, 3, 14, 22, 7, 123953), timestamp_interval=None, timestamp_constructor=None, timestamp_timezone='UTC'), storage_bucket='etl-import')

The resulting file is a `gzip` compressed csv file in fully normalized _waylay timeseries ETL_ format

In [10]:
import gzip
with gzip.open(etl_import.import_file.path, 'rt') as csv_file:
     etl_series_df = pd.read_csv(csv_file)

etl_series_df.head()

Unnamed: 0,resource,metric,timestamp,value
0,dataframe_import_2021-06-03-0176,Temperature,2021-05-28T22:40:07.123953Z,23.18
1,dataframe_import_2021-06-03-0176,Temperature,2021-05-28T22:41:06.123953Z,23.15
2,dataframe_import_2021-06-03-0176,Temperature,2021-05-28T22:42:07.123953Z,23.15
3,dataframe_import_2021-06-03-0176,Temperature,2021-05-28T22:43:07.123953Z,23.15
4,dataframe_import_2021-06-03-0176,Temperature,2021-05-28T22:44:07.123953Z,23.1


### (optional) create or update waylay resource
Timeseries in waylay are normally associated with a _Waylay Resource_, an entity that represents the device or source of your timeseries.

The `etl_tool` can extract resource metadata from your dataset, but you can enhance this information by providing addtional `Resource` and `Metric` descriptors.

In this case we take over the metadata descriptions that were available on the dataset.

In [11]:
from waylay.service.timeseries import Resource, Metric
hvac_resource_info = Resource(
    id= ImportDemo.resource_id,
    name= ImportDemo.resource_id,
    description =(
        "Experimental data used for binary classification (room occupancy) "
        "from Temperature,Humidity,Light and CO2.\n"
        "Ground-truth occupancy was obtained from time stamped pictures that were taken every minute.\n"
        "See https://archive.ics.uci.edu/ml/datasets/Occupancy+Detection+#"
    ),
    metrics = [
        Metric(name="Temperature", value_type="float", metric_type="gauge", unit="°C"), 
        Metric(name="Humidity", value_type= "float",  metric_type="gauge",  unit="%", description= "Relative Humidity"), 
        Metric(name="Light",value_type="float", metric_type="gauge",  unit= "Lux"), 
        Metric(name="CO2", value_type="float",  metric_type="gauge",  unit= "ppm"), 
        Metric(name="HumidityRatio", value_type="float", metric_type="gauge", unit="kgwater-vapor/kg-air", description="Derived quantity from temperature and relative humidity."),
        Metric(name="Occupancy", value_type="integer",  metric_type="gauge",  unit="boolean", description="0 for not occupied, 1 for occupied status")
    ]
)
hvac_resource_info.to_dict()

{'id': 'dataframe_import_2021-06-03-0176',
 'description': 'Experimental data used for binary classification (room occupancy) from Temperature,Humidity,Light and CO2.\nGround-truth occupancy was obtained from time stamped pictures that were taken every minute.\nSee https://archive.ics.uci.edu/ml/datasets/Occupancy+Detection+#',
 'name': 'dataframe_import_2021-06-03-0176',
 'metrics': [{'name': 'Temperature',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': '°C'},
  {'name': 'Humidity',
   'description': 'Relative Humidity',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': '%'},
  {'name': 'Light',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': 'Lux'},
  {'name': 'CO2', 'valueType': 'float', 'metricType': 'gauge', 'unit': 'ppm'},
  {'name': 'HumidityRatio',
   'description': 'Derived quantity from temperature and relative humidity.',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': 'kgwater-vapor/kg-air'},
  {'name': 'Occupancy',
   

The following SDK calls upsert this resource into the Waylay system.

In [12]:
# use `update` (PATCH method) to upsert the resource
hvac_resource_resp = waylay_client.api.resource.update(ImportDemo.resource_id, body=hvac_resource_info.to_dict())

# validate it is stored correctly
waylay_client.api.resource.get(ImportDemo.resource_id)

{'id': 'dataframe_import_2021-06-03-0176',
 'name': 'dataframe_import_2021-06-03-0176',
 'metrics': [{'name': 'Temperature',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': '°C'},
  {'name': 'Humidity',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': '%',
   'description': 'Relative Humidity'},
  {'name': 'Light',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': 'Lux'},
  {'name': 'CO2', 'valueType': 'float', 'metricType': 'gauge', 'unit': 'ppm'},
  {'name': 'HumidityRatio',
   'valueType': 'float',
   'metricType': 'gauge',
   'unit': 'kgwater-vapor/kg-air',
   'description': 'Derived quantity from temperature and relative humidity.'},
  {'name': 'Occupancy',
   'valueType': 'integer',
   'metricType': 'gauge',
   'unit': 'boolean',
   'description': '0 for not occupied, 1 for occupied status'}],
 'description': 'Experimental data used for binary classification (room occupancy) from Temperature,Humidity,Light and CO2.\nGround-truth occupancy wa

You will now be able to see the Resource in the [Waylay Console](https://preview.waylay.io/)

### upload the etl-import data

The next step will upload the import file to the `etl-import/upload` storage folder.

Any upload in this folder will initiate the following etl process:

* the file is moved from `etl-import/upload` to an timestamped folder in `etl-import/busy`
* the etl process is kicked of, reading data from this _busy_ folder.
* on completion, the file (and a result statement) is moved to a folder in `etl-import/done`

If anything goes wrong, either:
* the files are moved to an `ignored` folder if they do no comply with the requirements for an import.
* the files are moved to a `failed` folder if the etl process raised a fatal error
* note that, even if moved to the `done` folder, the processings results might still contain errors. Typically this is caused by parsing errors, e.g. if timestamps are not of the correct format.

The import files that are created by  _timeseries.etl_tool.prepare_import()_ should not run into `ignored` or parsing errors ...


The following reporting utilities allow you to follow up this process:
* `etl_tool.check_import(etl_import)` checks the status of a specific import job.
* `etl_tool.list_import(name_filter, status_filter)` queries the status of all jobs, optionally filtering on name and/or status.

In [13]:
etl_import = waylay_client.timeseries.etl_tool.initiate_import(etl_import)

Uploading content to etl-import/upload/import-20210603.142207-timeseries.csv.gz ...
... done.


In [14]:
from IPython.core.display import HTML, Markdown
resp = waylay_client.timeseries.etl_tool.check_import(etl_import)

HTML(resp.to_html())


In [15]:
for item in waylay_client.timeseries.etl_tool.list_import():
    display(HTML(item.to_html()))
    

### query the timeseries data

In [16]:
ImportDemo.resource_id

'dataframe_import_2021-06-03-0176'

In [17]:
query = dict(
    resource=ImportDemo.resource_id,
    data=[
        dict(metric=metric) for metric in etl_import.settings.metrics
    ]
)
# test query
waylay_client.analytics.query.execute(
    body=query, 
    params=dict(until=datetime.utcnow().isoformat()
))

resource,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176
metric,Temperature,Humidity,Light,CO2,HumidityRatio,Occupancy
timestamp,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
2021-06-02 05:03:07.123000+00:00,19.50,27.033333,0.0,454.666667,0.003785,0.0
2021-06-02 05:04:07.123000+00:00,19.50,27.000000,0.0,456.000000,0.003781,0.0
2021-06-02 05:05:07.123000+00:00,19.50,27.000000,0.0,461.000000,0.003781,0.0
2021-06-02 05:06:06.123000+00:00,19.50,27.000000,0.0,458.000000,0.003781,0.0
2021-06-02 05:07:06.123000+00:00,19.50,27.000000,0.0,460.000000,0.003781,0.0
...,...,...,...,...,...,...
2021-06-03 14:18:07.123000+00:00,21.05,36.097500,433.0,787.250000,0.005579,1.0
2021-06-03 14:19:06.123000+00:00,21.05,35.995000,433.0,789.500000,0.005563,1.0
2021-06-03 14:20:06.123000+00:00,21.10,36.095000,433.0,798.500000,0.005596,1.0
2021-06-03 14:21:07.123000+00:00,21.10,36.260000,433.0,820.333333,0.005621,1.0


In [18]:
# save query
query_name = f'example_{ImportDemo.resource_id}'
waylay_client.analytics.query.create(body=dict(name=query_name, query=query))


{'data': [{'metric': 'Temperature'},
  {'metric': 'Humidity'},
  {'metric': 'Light'},
  {'metric': 'CO2'},
  {'metric': 'HumidityRatio'},
  {'metric': 'Occupancy'}],
 'resource': 'dataframe_import_2021-06-03-0176'}

In [19]:
# test saved query
display(Markdown(
"Use the query in the console on\n"
f"* https://console.waylay.io/analytics/queries?query={query_name}"
))

waylay_client.analytics.query.data(query_name)

Use the query in the console on
* https://console.waylay.io/analytics/queries?query=example_dataframe_import_2021-06-03-0176

resource,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176,dataframe_import_2021-06-03-0176
metric,Temperature,Humidity,Light,CO2,HumidityRatio,Occupancy
timestamp,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
2021-06-02 05:03:07.123000+00:00,19.50,27.033333,0.0,454.666667,0.003785,0.0
2021-06-02 05:04:07.123000+00:00,19.50,27.000000,0.0,456.000000,0.003781,0.0
2021-06-02 05:05:07.123000+00:00,19.50,27.000000,0.0,461.000000,0.003781,0.0
2021-06-02 05:06:06.123000+00:00,19.50,27.000000,0.0,458.000000,0.003781,0.0
2021-06-02 05:07:06.123000+00:00,19.50,27.000000,0.0,460.000000,0.003781,0.0
...,...,...,...,...,...,...
2021-06-03 14:18:07.123000+00:00,21.05,36.097500,433.0,787.250000,0.005579,1.0
2021-06-03 14:19:06.123000+00:00,21.05,35.995000,433.0,789.500000,0.005563,1.0
2021-06-03 14:20:06.123000+00:00,21.10,36.095000,433.0,798.500000,0.005596,1.0
2021-06-03 14:21:07.123000+00:00,21.10,36.260000,433.0,820.333333,0.005621,1.0














##### cleanup

In [20]:
display(waylay_client.data.events.remove(ImportDemo.resource_id))
display(waylay_client.api.resource.remove(ImportDemo.resource_id))
display(waylay_client.analytics.query.remove(query_name))


{'message': 'Deleted messages, series and all metrics for dataframe_import_2021-06-03-0176'}

None

Response(url='https://ts-analytics-io.waylay.io/config/query/example_dataframe_import_2021-06-03-0176?api_version=0.19', method='DELETE', body={'messages': []}, headers=Headers({'server': 'envoy', 'date': 'Thu, 03 Jun 2021 14:22:25 GMT', 'content-type': 'application/json', 'content-length': '16', 'server-timing': 'config; dur=12.224674224853516; env=on-demand; method=DELETE; tenant=dc3481e5-5149-445b-b8e9-ab518cc3ba34; domain=bouncy-turkey.waylay.io', 'access-control-allow-origin': '*', 'x-envoy-upstream-service-time': '16'}), status_code=200, client_response=<Response [200 OK]>)