# Trabajo Práctico Final - Cargar datos a InfluxDB
<br/>

## Curso de Especialización en Inteligencia Artificial
## Aprendizaje de Máquina II
<br/>

### Autor: Maximiliano Torti
### Fecha: 07/10/21
<br/>

En este Notebook se implementaran las funciones para cargar datos a la base de datos de influxDB. Los datos se cargan mediante el protocolo flux, que tiene el siguiente formato:

{medicion},{tag1}={valor_tag1},{tag2}={valor_tag2} {campo1}={valor_campo1},{campo2}={valor_campo2} {Timestamp}

Donde:

- Medicion es obligatorio y puede tener cualquier nombre, no tiene mayor trascendencia.
- Los tags son opcionales y permiten definir y diferenciar las series de tiempo.
- Los campos indican los valores numéricos de las curvas que componen una serie de tiempo.
- El timestamp es el tiempo del punto correspondiente.
- Tags, campos y timestamp se separan por un espacio.

Para mayor entendimiento, se agrega el siguiente ejemplo:

- system,signal=PS1,cycle=1 value=3.33 1633867200000

La query anterior, agregaría a la medición system, una serie de tiempo definidia por el campo señal igual a PS1 y ciclo igual a 1, y el valor 3.33 a su curva "value" en el tiempo 1633867200000 (equivalente a 10/10/2021 12:00:00). 
Existe una cierta flexibilidad en las definiciones ya que por ejemplo "cycle" podría ser un campo en lugar de un tag. Esto depende de si cycle nos interesa como metadato de la serie de tiempo o como "valor graficable".

Se utilizará el API HTTP de influxDB mediante el modulo request de python insertar los datos a la base. Como nota, python posee un modulo conector desarrollado para influxDB que facilita ciertas tareas, pero en la práctica se observó una muy pobre performance del último con alta cantidad de datos.

In [1]:
import pickle as pkl
import pandas as pd
import numpy as np

from datetime import datetime
import time

import requests

In [2]:
path = './dataset/'

with open(path+ 'df_data_structured.pkl','rb') as file:
    df_x=pkl.load(file)

with open(path + 'df_output_structured.pkl','rb') as file:
    df_y=pkl.load(file)

In [3]:
df_x.shape

(2205, 6000)

In [4]:
df_y.shape

(2205,)

In [5]:
INFLUX_URL='127.0.0.1'
ORG = "Maxi"
BUCKET_NAME = "HydraulicSystem"
QUERY_URI='http://{}:8086/api/v2/write?org={}&bucket={}&precision=ms'.format(INFLUX_URL,ORG,BUCKET_NAME)
INFLUX_TOKEN="BjUgOzZ3jhI6duhKPIOQiiNmxD7vHqrpsaPuQVSognmHfkTAyoKHtSQPiXQHz6vth2TRE922ZVb7WmeRzRJbrw=="
headers = {}
headers['Authorization'] = 'Token {}'.format(INFLUX_TOKEN)

In [6]:
# Generamos los datos de input en batch en protocolo flux line y realizamos la escritura con post HTTP.
# Calculamos el tiempo que demorado para todos los datos de input

http_write_start_time = time.perf_counter()

init_timestamp = 1633521600000 # 06/10/2021 12:00. Epoch in ms
time_between_cycles = 300000 # 5 min in ms
time_per_point = 10 # 100 Hz 

# Generate influxdb line protocol data in batch
for col in df_x.dtype.names:
    for cycle in range(df_x[col].shape[0]):
        data = []
        for point in range(df_x[col].shape[1]):
            if not(np.isnan(df_x[col][cycle,point])):
                timestamp = init_timestamp + time_between_cycles * cycle + time_per_point * point
                data.append("system,signal="+col+",cycle="+str(cycle+1)+" value="+str(df_x[col][cycle,point])+" "+
                            str(timestamp))
        batch_data = '\n'.join(data)
        response=requests.post(QUERY_URI, data=batch_data, headers=headers)

http_write_end_time = time.perf_counter()

print("Batch data generation in line protocol and http write total time: {time}s"
      .format(time=http_write_end_time - http_write_start_time))

Batch data generation in line protocol and http write total time: 2028.6867659999998s


**Escribir los datos de 17 señales que cubren aproximadamente 2205 ciclos de 1 minuto, es decir, aproximadamente 37 horas de operación, le demoró aproximadamente 30 minutos, por lo que el ratio de escritura es relativamente bueno. Bastante bueno en realidad, si quisieramos cargar 1 año de operación, extrapolando, demoraría aproximadamente unos 5 días lo cual es razonable.**

In [7]:
# En el caso del otuput, generaremos un punto al final de cada ciclo que resuma la condición deducida de cada elemento,
# simulando como si un modelo tomara el ciclo cuando finaliza, procesara y escribiese el resultado en la base de datos

http_write_start_time = time.perf_counter()

for col in df_y.dtype.names:
    data_y = []
    for cycle in range(df_y[col].shape[0]):
        timestamp = init_timestamp + time_between_cycles * cycle + time_per_point * df_x.shape[1]
        data_y.append("system,component="+col.replace(" ","")+",cycle="+str(cycle+1)+" value="+str(df_y[col][cycle])+" "
                      +str(timestamp))
    batch_data_y = '\n'.join(data_y)
    requests.post(QUERY_URI, data=batch_data_y, headers=headers)
        
http_write_end_time = time.perf_counter()

print("Batch output data generation in line protocol and http write total time: {time}s"
      .format(time=http_write_end_time - http_write_start_time))

Batch output data generation in line protocol and http write total time: 0.2928607999999713s


### Imagenes generadas desde el cliente por defecto de InfluxDB donde se observa la carga de las señales:

**Señales de entrada**

<br />

<div style="clear: both">
<img src="img/InfluxDB_input_data.jpg" style="width: 1000px;float:left">
</div>

<div style="clear: both">
</div>

<br />

**Señales de salida**

<br />

<div style="clear: both">
<img src="img/InfluxDB_output_data.jpg" style="width: 1000px;float:left">
</div>

<div style="clear: both">
</div>

<br />

**Ejemplo de dashboard operativo**

<br />

<div style="clear: both">
<img src="img/InfluxDB_dashboard_example.jpg" style="width: 1000px;float:left">
</div>

<div style="clear: both">
</div>

<br />