# DqR ETL

** - **
** INSTRUCCIONES **

Este proceso al ejecutarse completo extrae datos de las tablas transaccionales (device_events y device_event_sensors) para transformarlos y llevarlos a las tablas del DWH. Por defecto solo pasa los datos que NO existan en el DWH hasta el dia actual (ejecucion diferencial).

Si se desea realizar un **ETL completo** de cero se deben eliminar las tablas del DWH: **bt_events** y **agg_power_consumption**. De esa forma el proceso extrae todos los eventos de la transaccional y se llevan al DWH.
Tener cuidado con la configuración de las DBs (db_trx y db_hist).
** - **

El actual documento describe el proceso de extracción, transformación y carga de datos a un DataWarehouse para luego realizar un análisis exploratorio básico para detectar patrones en el consumo de energía de los módulos instalados en los dispositivos DqR del hogar.
El proceso completo puede asemejarse al proceso de DataMining aunque no se contemplarán todas las etapas del mismo por una cuestión de simplificación del problema. El primer paso entonces es construir y actualizar el DWH con los datos de la base de datos transaccional. Empezaremos definiendo el modelo de datos en esquema estrella.

## Modelo de datos del DWH
Utilizaremos la metodología de Kimball para definir primero los problemas particulares a resolver y luego extender el dominio del negocio. Es apropiada esta decisión ya que aún no se dispone de una visión completa del negocio debido a que el producto se encuentra en su fase de desarrollo.

### Proceso del negocio
Se considerarán 3 procesos de negocio para realizar el diseño del DWH inicial.
- Predicción de estado del módulo DqR Lux
    Este proceso buscará predecir el estado de un módulo que controla la luminaria según ciertas variables predictoras. Es necesario disponer de un set de datos de entrenamiento no menor a 3 meses.
- Predicción de consumo de energía mensual
    El proceso buscará responder cuánto se estima que se consumirá en energía por cada módulo que posee sensado de corriente, de forma que sea posible estimar el consumo mensual.
- Reporte de sensado
    Se permitirá al usuario explorar el sensado de los dispositivos DqR para analizar el comportamiento de los mismos a lo largo del tiempo.
    
### Nivel de granularidad del modelo
Ya que los procesos compartirán las dimensiones asociadas, se define el nivel de granularidad de detalle para cada dimensión y alguna aclaración dependiendo del proceso que la relacione. Las consideraciones a contemplar son:
- Se debe representar el tiempo en minutos para el proceso de predicción de estado y reporte de sensado.
- Se debe representar el tiempo en horas para el proceso de predicción de estado.

### Predicción de estado del DqR Lux

**Dimensiones**
- Time: timestamp, weekday, hour
- Module: module_id, module_type_id, device_id

**Hechos**
- lux_state
- movement
- light
- sound

### Predicción de Consumo de energía

**Dimensiones**
- Time: timestamp, year, month, month_hour (day*24 + hour), hour
- Module: module_id, module_name, module_type_id, module_type_name, device_id, device_name

**Hechos**
- power_consumption
- power_consumption_accumulated
- power_consumption_percentage

### Reporte de sensado

**Dimensiones**
- Time: timestamp, year, month, yearday, weekday, weekday_name, day, month_hour (day*24 + hour), hour, minute, minday (minute + hour)
- Module: module_id, module_name, module_type_id, module_type_name, device_id, device_name
- Sensor: sensor_type_id, sensor_type_name

**Hechos**
- module_state
- light
- movement
- power_consumption
- temperature
- sound

Los hechos son todos aditivos, por lo cual pueden acumularse en el tiempo. El único que es un tipo no aditivo es el module_state que indica el estado actual del módulo para cada evento registrado.

## Modelo de Datos Star-Schema
Definido el modelo de datos lógico, se procede a construir un modelo de datos físico que permita soportar los procesos previos. Por lo tanto dichos procesos terminarán consumiendo y compartiendo las tablas especificadas a continuación.

### Tablas de dimensiones (consulta)

**lk_module**
- MOD_ID: int (PK)
- DEV_ID: int
- DEV_NAME: varchar
- MOD_NAME: varchar
- MOD_TYPE_ID: int
- MOD_TYPE_NAME: varchar

**lk_time**
- TIMESTAMP: int (PK)
- MINUTE: int
- MINDAY: int
- HOUR: int
- DAY: int
- WEEKDAY: int
- WEEKDAY_NAME: varchar
- MONTH: int
- MONTH_HOUR: int
- YEARDAY: int
- YEAR: int

**lk_sensor**
- SENSOR_TYPE_ID: int (PK)
- SENSOR_TYPE_NAME: varchar
- UNIT: varchar

### Tablas de hechos

**agg_power_consumption**
- id_power_consumption: int (SK)
- TIMESTAMP: int (fk)
- MOD_ID: int (fk)
- WATT_HOUR: float
- WATT_HOUR_ACC: int
- TOTAL_WATT_PER: float

**bt_events**
- id_event: int (SK)
- TIMESTAMP: int (fk)
- MOD_ID: int (fk)
- SENSOR_TYPE_ID: int (fk)
- MODULE_STATE: int
- SENSED_VALUE: float

## Import de librerías

In [83]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math as math
from datetime import datetime, timedelta
import MySQLdb
from sqlalchemy import create_engine
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor

%matplotlib inline

pd.options.mode.chained_assignment = None  # default='warn'

## Configuración
### Conexión a bases de datos

In [84]:
# Base de datos transaccional
dbname_trx = "ratio_dwh"
dbhost_trx = "localhost"
dbport_trx = 3306
dbuser_trx = "root"
dbpass_trx = "root"

# Base de datos histórica
dbname_hist = "ratio_dwh"
dbhost_hist = "localhost"
dbport_hist = 3306
dbuser_hist = "root"
dbpass_hist = "root"

### Tablas de consulta (fuentes de datos)
Las fuentes de datos surgen de la base de datos transaccional, ya que se incorporan los eventos registrados en tiempo real para agregarlos a la base de datos histórica.

In [85]:
devices_tbl = "devices"
device_modules_tbl = "device_modules"
module_types_tbl = "module_types"
sensor_types_tbl = "sensor_types"
device_events_tbl = "device_events"
device_event_sensors_tbl = "device_event_sensors"

### Esquema del DWH

In [86]:
agg_power_consumption = "agg_power_consumption"
lk_module = "lk_module"
lk_time = "lk_time"
lk_sensor = "lk_sensor"
bt_events = "bt_events"

### Variables globales

In [87]:
# Voltaje de la red (en V)
voltage = 230
# Sensor_type_id de energia: 1
sensor_type_id = 1
# Fecha fin de ETL
etl_end_date = 'NOW()'

## Integración de datos Transaccionales a DWH (ETL)
### Extracción de datos de la base Transaccional

In [88]:
# Get Last inserted Event from DWH
conn = MySQLdb.connect(host=dbhost_hist, port=dbport_hist, user=dbuser_hist, passwd=dbpass_hist, db=dbname_hist)
cursor = conn.cursor()
try:
    cursor.execute('SELECT MAX(TIMESTAMP) FROM ' + bt_events);
    etl_start_date = cursor.fetchone()[0].strftime('%Y-%m-%d %H:%M:%S')
except MySQLdb.Error, e:
    print 'Error mysql {0}'.format(e)
    etl_start_date = '0000-00-00 00:00:00'
conn.close()
print 'ETL beggining date: {0}'.format(etl_start_date)

Error mysql (1146, "Table 'ratio_dwh.bt_events' doesn't exist")
ETL beggining date: 0000-00-00 00:00:00


In [89]:
conn = MySQLdb.connect(host=dbhost_trx, port=dbport_trx, user=dbuser_trx, passwd=dbpass_trx, db=dbname_trx)
# Dimensiones de devices y modulos
proy_sql = 'M.id as MOD_ID, M.name as MOD_NAME, module_type_id as MOD_TYPE_ID, MT.name as MOD_TYPE_NAME, D.id as DEV_ID, D.name as DEV_NAME'
tables_sql = devices_tbl + ' D JOIN ' + device_modules_tbl + ' M ON D.id=M.device_id JOIN ' + module_types_tbl + ' MT ON module_type_id=MT.id'
df_modules = pd.read_sql('SELECT ' + proy_sql + ' FROM ' + tables_sql, con=conn)
# Dimension sensor
proy_sql = 'id as SENSOR_TYPE_ID, name as SENSOR_TYPE_NAME, unit as UNIT'
tables_sql = sensor_types_tbl
df_sensors = pd.read_sql('SELECT ' + proy_sql + ' FROM ' + tables_sql, con=conn)
# Hechos (mediciones)
proy_sql = 'module_id as MOD_ID, ts as TIMESTAMP, state as MODULE_STATE, sensor_type_id as SENSOR_TYPE_ID, value as SENSED_VALUE'
tables_sql = device_events_tbl + ' DE JOIN ' + device_event_sensors_tbl + ' DES ON DE.id=DES.device_event_id'
where_sql = 'ts BETWEEN "' + etl_start_date + '" AND ' + etl_end_date
df_events = pd.read_sql('SELECT ' + proy_sql + ' FROM ' + tables_sql + ' WHERE ' + where_sql, con=conn)
conn.close()

El dataset df_modules representa la tabla lk_module:

In [90]:
df_modules

Unnamed: 0,MOD_ID,MOD_NAME,MOD_TYPE_ID,MOD_TYPE_NAME,DEV_ID,DEV_NAME
0,51,Lux Cocina mod,1,LUX,5,Lux Cocina
1,52,Pot Cocina,2,POTENTIA,5,Lux Cocina
2,61,Pot 1,2,POTENTIA,6,Potentia Heladera
3,71,Omni 1,3,OMNI,7,Omni Cocina
4,111,Lux Living,1,LUX,11,Living Comedor
5,112,Potentia Living,2,POTENTIA,11,Living Comedor
6,121,Omni General,3,OMNI,12,Living General
7,122,Potentia Living Pared,2,POTENTIA,12,Living General


El dataset df_sensors representa la tabla lk_sensor:

In [91]:
df_sensors

Unnamed: 0,SENSOR_TYPE_ID,SENSOR_TYPE_NAME,UNIT
0,1,CURRENT,mAmp
1,2,LUMINOSITY,lum
2,3,MOVEMENT,
3,4,SOUND,dB
4,5,TEMPERATURE,C


El dataset df_events se utilizará para generar las tablas de hechos

In [92]:
df_events.head(5)

Unnamed: 0,MOD_ID,TIMESTAMP,MODULE_STATE,SENSOR_TYPE_ID,SENSED_VALUE
0,51,2017-01-01,0,2,2.0
1,51,2017-01-01,0,4,34.952475
2,51,2017-01-01,0,3,0.0
3,51,2017-01-01,0,1,0.009875
4,51,2017-01-01,0,2,4.0


In [93]:
print 'Se extraen {0} eventos de sensado.'.format(df_events.shape[0])

Se extraen 21091995 eventos de sensado.


### Chequeo de valores null
Es importante verificar que el dataset de eventos no contenga valores nulos, ya que de ser así deberán completarse mediante padding o interpolación, o bien se dropean...

In [94]:
df_events.dropna(inplace=True)

### Limpieza del dataset
Se verifican algunas columnas de los datasets obtenidos

In [95]:
if df_events.duplicated().any():
    print 'Se eliminan los duplicados (se mantiene el primero)'
    df_events.drop_duplicates(inplace=True,keep='first')
else:
    print 'No existen eventos duplicados'

Se eliminan los duplicados (se mantiene el primero)


## Transformación de variables
Se trabajan las variables que se extraen para generar las tablas históricas en la db del DWH.
La dimensión Module no requiere trabajo adicional ya que se integró con la extracción de la base transaccional.
Por lo tanto resta generar la dimensión Tiempo con sus atributos y relacionar todas las dimensiones con la tabla de hechos.

In [96]:
df_events.dtypes

MOD_ID                     int64
TIMESTAMP         datetime64[ns]
MODULE_STATE               int64
SENSOR_TYPE_ID             int64
SENSED_VALUE             float64
dtype: object

In [97]:
df_time = pd.DataFrame(df_events.TIMESTAMP.unique(),columns=['TIMESTAMP'])
df_time['YEAR'] = df_time['TIMESTAMP'].dt.year
df_time['YEARDAY'] = df_time['TIMESTAMP'].dt.strftime('%j').astype(int)
df_time['MONTH'] = df_time['TIMESTAMP'].dt.month
df_time['YEARMONTH'] = df_time['TIMESTAMP'].dt.strftime('%Y%m').astype(int)
df_time['WEEKDAY'] = df_time['TIMESTAMP'].dt.weekday
df_time['WEEKDAY_NAME'] = df_time['TIMESTAMP'].dt.strftime('%A')
df_time['DAY'] = df_time['TIMESTAMP'].dt.day
df_time['HOUR'] = df_time['TIMESTAMP'].dt.hour
df_time['MONTH_HOUR'] = (df_time['DAY'] - 1) * 24 + df_time.HOUR
df_time['MINUTE'] = df_time['TIMESTAMP'].dt.minute
df_time['MINDAY'] = df_time['HOUR'] * 60 + df_time['MINUTE']

El dataset df_time es el que representa la tabla lk_time en el DWH

In [98]:
df_time.head(5)

Unnamed: 0,TIMESTAMP,YEAR,YEARDAY,MONTH,YEARMONTH,WEEKDAY,WEEKDAY_NAME,DAY,HOUR,MONTH_HOUR,MINUTE,MINDAY
0,2017-01-01 00:00:00,2017,1,1,201701,6,Sunday,1,0,0,0,0
1,2017-01-01 00:01:00,2017,1,1,201701,6,Sunday,1,0,0,1,1
2,2017-01-01 00:02:00,2017,1,1,201701,6,Sunday,1,0,0,2,2
3,2017-01-01 00:03:00,2017,1,1,201701,6,Sunday,1,0,0,3,3
4,2017-01-01 00:04:00,2017,1,1,201701,6,Sunday,1,0,0,4,4


#### Cálculo de consumo en energía (WATT_HOUR)
Se general la tabla de hechos (energía consumida) a partir de los eventos. Primero se calcula la energía consumida en Watts/hora. El cálculo se realiza obteniendo la media de las mediciones de corriente en cada hora de cada día del año.

En principio se genera un dataset DF_POWER que contiene el consumo de cada modulo en WATT/H

In [99]:
df_events_acs = df_events.query('SENSOR_TYPE_ID == ' + str(sensor_type_id))
df_power = pd.DataFrame(columns=['TIMESTAMP','MOD_ID','WATT_HOUR'])
for mod_id in df_modules.MOD_ID:
    # Se calcula el watt/hora para el modulo mod_id
    df_power_mod = df_events_acs.query('MOD_ID == ' + str(mod_id)).merge(df_time,on='TIMESTAMP').groupby(['YEAR','YEARDAY','HOUR']).SENSED_VALUE.mean() * voltage
    if df_power_mod.shape[0] > 0:
        # Eliminamos el indice multiple para aplanar y convertir de Serie a Dataframe
        df_power_mod = df_power_mod.reset_index()
        # Se agrega el TIMESTAMP para utilizarlo como FK de la dimension tiempo y se renombra la columna de consumo
        df_power_mod = df_power_mod.merge(df_time[['TIMESTAMP','YEAR','YEARDAY','HOUR']],left_on=['YEAR','YEARDAY','HOUR'], right_on=['YEAR','YEARDAY','HOUR'])
        df_power_mod.rename(columns={'SENSED_VALUE': 'WATT_HOUR'},inplace=True)
        # Eliminamos columnas que no se grabaran en la tabla de hechos
        df_power_mod.drop(['YEAR','YEARDAY','HOUR'],axis=1,inplace=True)
        # Se asigna el mod_id calculado
        df_power_mod = df_power_mod.assign(MOD_ID = mod_id)
    
        # Se agrega los consumos del mod_id a la tabla de hechos
        df_power = pd.concat([df_power,df_power_mod])

Para calcular los hechos restantes (consumo total mensual y consumo acumulado del mes a cada hora) se utiliza un dataset de la dimension tiempo modificado ya que la granularidad es en horas y no minutos

In [100]:
df_time_hours = df_time.groupby(['YEARDAY','HOUR']).min()
df_time_hours.reset_index(inplace=True,drop=True)
df_time_hours.head(3)

Unnamed: 0,TIMESTAMP,YEAR,MONTH,YEARMONTH,WEEKDAY,WEEKDAY_NAME,DAY,MONTH_HOUR,MINUTE,MINDAY
0,2017-01-01 00:00:00,2017,1,201701,6,Sunday,1,0,0,0
1,2017-01-01 01:00:00,2017,1,201701,6,Sunday,1,1,0,60
2,2017-01-01 02:00:00,2017,1,201701,6,Sunday,1,2,0,120


Finalmente se calculan las variables restantes para generar la tabla historica agg_power_consumption

In [101]:
# Se calcula el consumo final de Watt/h por mes
df_agg_power = df_power.merge(df_modules,on='MOD_ID').merge(df_time_hours,on='TIMESTAMP').groupby(['YEAR','MONTH','MOD_ID']).WATT_HOUR.sum().astype(int)
df_agg_power = df_agg_power.reset_index()
df_agg_power = df_agg_power.rename(columns={'WATT_HOUR': 'WATT_HOUR_MONTH'})
df_agg_power = df_agg_power.merge(df_power.merge(df_modules,on='MOD_ID').merge(df_time_hours,on='TIMESTAMP'),on=['YEAR','MONTH','MOD_ID'])

# Cálculo de Watt/hora acumulados al mes
for mod_id in df_modules.MOD_ID:
    mod_cumsum = df_agg_power.query('MOD_ID == ' + str(mod_id)).groupby(['YEAR','MONTH','MOD_ID']).WATT_HOUR.cumsum().astype(int)
    if mod_cumsum.shape[0] > 0:
        df_agg_power.loc[ df_agg_power.MOD_ID == mod_id, 'WATT_HOUR_ACC'] = mod_cumsum
        
# Cálculo del porcentaje de consumo del mes
df_agg_power['TOTAL_WATT_PER'] = df_agg_power.WATT_HOUR_ACC / df_agg_power.WATT_HOUR_MONTH

df_agg_power = df_agg_power[['TIMESTAMP','MOD_ID','WATT_HOUR','WATT_HOUR_ACC','TOTAL_WATT_PER']]

In [102]:
df_agg_power.head(3)

Unnamed: 0,TIMESTAMP,MOD_ID,WATT_HOUR,WATT_HOUR_ACC,TOTAL_WATT_PER
0,2017-01-01 00:00:00,51,5.435589,5.0,0.000753
1,2017-01-01 01:00:00,51,4.313271,9.0,0.001355
2,2017-01-01 02:00:00,51,5.063294,14.0,0.002107


## Carga de tablas en DWH
Finalizado el proceso de transformación, se procede a cargar las tablas con datos históricos en el DWH.

In [103]:
engine = create_engine('mysql://{user}:{pw}@{host}/{db}'.format(user=dbuser_hist, pw=dbpass_hist, host=dbhost_hist, db=dbname_hist),pool_recycle=300)

# Tabla lk_module
df_modules.to_sql(con=engine, name=lk_module, if_exists='replace', index=False, chunksize=10000)

# Tabla lk_sensor
df_sensors.to_sql(con=engine, name=lk_sensor, if_exists='replace', index=False, chunksize=10000)

# Tabla lk_time
df_time.to_sql(con=engine, name=lk_time, if_exists='replace', index=False, chunksize=10000)
#engine.execute('alter table ' + lk_time + ' add index ' + lk_time + '_ts_idx (TIMESTAMP);')

# Tabla bt_events
df_events.to_sql(con=engine, name=bt_events, if_exists='append', index=True, index_label='id', chunksize=10000)

# Tabla agg_power_consumption
df_agg_power.to_sql(con=engine, name=agg_power_consumption, if_exists='append', index=True, index_label='id', chunksize=10000)