### Normalización de datos desde SYBASE SAP IQ a Cassandra ###


Objetivo: obtener la data desde SAP IQ del total de medidores inteligentes, en este caso se extrajo un mes(25 millones de registros app), normalizar la data e insertarla en cassandra.

Este proceso se debe ejecutar diariamente en un script de python sobre una tarea programada.

Esta data se ocupará principalmente para realizar modelos de machine learning y BI.

Pasos de la normalización de data:

1.- Realizar consulta a SAPIQ para obtención de datos

2.- Se almacena en  dataframe de pandas

3- Se descartan medidores con pulsos negativos

4.- Se reconstruye data faltante(Lagunas)

5.- Se interpolan los datos creados

6.- Se Agrupan los valores de pulsos en una sola columna por id de medidor, año, mes y día.

7.- Se normalizan los pulsos para que la suma total de la lista de pulsos sume 96

8.- Se descartan los días con pulsos con promedio 0

9.- Se descartan medidores días con reconstrucción de pulsos mayor a 5

10.- Se descartan medidores días con cantidad de pulsos mayor a 5  

11.- Se descartan medidores días con lista de pulsos con largo menor a 96    

12.- Se realiza registro de resultados a tabla en base de datos Cassandra(Nosql)


In [134]:
#Se importan librerias para la normalizacion de datos
import numpy as np
import pandas as pd
import pyodbc

In [138]:
#extracción de datos desde SAPIQ
cursor.execute(x)
df = pd.read_sql_query(y,conn)
cursor.execute(z)
conn.close()

In [139]:
#Se visualiza el resultado de la consulta en SYABSE SAPIQ
#nr_app_dev_id ->id_medidor, fc_reading_date -> fecha lectura de pulso, 
#vl_reading_type_id-> tipo de medicion en este caso es energia en kw, nr_value -> valor del pulso medido cada 15 minutos
df.head()

Unnamed: 0,nr_app_dev_id,fc_reading_date,vl_reading_type_id,nr_value
0,202583.0,2019-01-01 00:00:00,2.0.4.1.0.12.0.0.0.0.72,1.626
1,202583.0,2019-01-01 00:15:00,2.0.4.1.0.12.0.0.0.0.72,1.394
2,202583.0,2019-01-01 00:30:00,2.0.4.1.0.12.0.0.0.0.72,1.191
3,202583.0,2019-01-01 00:45:00,2.0.4.1.0.12.0.0.0.0.72,1.12
4,202583.0,2019-01-01 01:00:00,2.0.4.1.0.12.0.0.0.0.72,1.395


In [141]:
#Se eliminarn los medidores con valores en negativos(fallas)

#se Obtiene el id del meididor 
ids = df.loc[df['nr_value'] < 0, 'nr_app_dev_id'].unique()
#se guardan en el dataframe los medidores que no esten en la variable ids
df = df.loc[~(df['nr_app_dev_id'].isin(ids))]

In [145]:
#proceso de reconstrucción de pulsos(lagunas)

#Se convierten en indices las columnas nr_app_dev_id(id del medidor) y fc_reading_date(fecha del pulso por medidor)
df.set_index(['nr_app_dev_id','fc_reading_date'], inplace=True)
df.sort_index(inplace=True)
df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,vl_reading_type_id,nr_value
nr_app_dev_id,fc_reading_date,Unnamed: 2_level_1,Unnamed: 3_level_1
202583.0,2019-01-01 00:00:00,2.0.4.1.0.12.0.0.0.0.72,1.626
202583.0,2019-01-01 00:15:00,2.0.4.1.0.12.0.0.0.0.72,1.394
202583.0,2019-01-01 00:30:00,2.0.4.1.0.12.0.0.0.0.72,1.191
202583.0,2019-01-01 00:45:00,2.0.4.1.0.12.0.0.0.0.72,1.12
202583.0,2019-01-01 01:00:00,2.0.4.1.0.12.0.0.0.0.72,1.395


In [146]:
#Se obtienen los valores de los indices para crear nuevos indices
(category_index,date_index) = df.index.levels
#Se crean nuevos indices
new_index = pd.MultiIndex.from_product([category_index,date_index],names =['nr_app_dev_id', 'fc_reading_date'])
#Se utiliza la función reindex para la creacion de los nuevos valores en el dataframe
new_df = df.reindex(new_index)
new_df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,vl_reading_type_id,nr_value
nr_app_dev_id,fc_reading_date,Unnamed: 2_level_1,Unnamed: 3_level_1
202583.0,2019-01-01 00:00:00,2.0.4.1.0.12.0.0.0.0.72,1.626
202583.0,2019-01-01 00:15:00,2.0.4.1.0.12.0.0.0.0.72,1.394
202583.0,2019-01-01 00:30:00,2.0.4.1.0.12.0.0.0.0.72,1.191
202583.0,2019-01-01 00:45:00,2.0.4.1.0.12.0.0.0.0.72,1.12
202583.0,2019-01-01 01:00:00,2.0.4.1.0.12.0.0.0.0.72,1.395


In [147]:
#Se suman la cantidad de nulos creados con los nuevos indices
new_df.isnull().sum()

vl_reading_type_id    2297845
nr_value              2297845
dtype: int64

In [149]:
#se muestran los medidores dias que poseen valores nuelos
nans = lambda new_df: new_df[new_df.isnull().any(axis=1)]
nans(new_df)

Unnamed: 0_level_0,Unnamed: 1_level_0,vl_reading_type_id,nr_value
nr_app_dev_id,fc_reading_date,Unnamed: 2_level_1,Unnamed: 3_level_1
202583.0,2019-01-03 03:00:00,,
202583.0,2019-01-03 03:15:00,,
202583.0,2019-01-03 03:30:00,,
202583.0,2019-01-12 03:00:00,,
202583.0,2019-01-12 03:15:00,,
...,...,...,...
691121.0,2019-01-16 15:30:00,,
691121.0,2019-01-16 15:45:00,,
691121.0,2019-01-16 16:00:00,,
691121.0,2019-01-16 16:15:00,,


In [150]:
#Se crea un dataframe con los valores nulos(su utilizará más adelenta)
df3 = nans(new_df)

In [151]:
#Se interpolan los pulsos de forma lineal
new_df["nr_value"] = round(new_df["nr_value"].interpolate(method= 'linear'),3)


vl_reading_type_id    2297845
nr_value                    0
dtype: int64

In [None]:
#se valida que ya no existen valores nulos
new_df.isnull().sum()

In [152]:
nans(new_df)

Unnamed: 0_level_0,Unnamed: 1_level_0,vl_reading_type_id,nr_value
nr_app_dev_id,fc_reading_date,Unnamed: 2_level_1,Unnamed: 3_level_1
202583.0,2019-01-03 03:00:00,,4.265
202583.0,2019-01-03 03:15:00,,3.746
202583.0,2019-01-03 03:30:00,,3.228
202583.0,2019-01-12 03:00:00,,3.443
202583.0,2019-01-12 03:15:00,,3.344
...,...,...,...
691121.0,2019-01-16 15:30:00,,0.056
691121.0,2019-01-16 15:45:00,,0.056
691121.0,2019-01-16 16:00:00,,0.056
691121.0,2019-01-16 16:15:00,,0.057


In [153]:
#Se deja solo el id del medidor como indice 
new_df.reset_index(level=-1, inplace=True)
new_df.head()

Unnamed: 0_level_0,fc_reading_date,vl_reading_type_id,nr_value
nr_app_dev_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
202583.0,2019-01-01 00:00:00,2.0.4.1.0.12.0.0.0.0.72,1.626
202583.0,2019-01-01 00:15:00,2.0.4.1.0.12.0.0.0.0.72,1.394
202583.0,2019-01-01 00:30:00,2.0.4.1.0.12.0.0.0.0.72,1.191
202583.0,2019-01-01 00:45:00,2.0.4.1.0.12.0.0.0.0.72,1.12
202583.0,2019-01-01 01:00:00,2.0.4.1.0.12.0.0.0.0.72,1.395


In [154]:
#Se separa la fecha del puslo en año mes dia
new_df.insert (2, "year", pd.to_datetime(new_df['fc_reading_date'], errors='coerce').dt.year)
new_df.insert (3, "month", pd.to_datetime(new_df['fc_reading_date'], errors='coerce').dt.month)
new_df.insert (4, "day", pd.to_datetime(new_df['fc_reading_date'], errors='coerce').dt.day)

In [155]:
#Se separa la fecha del puslo en año mes dia
df3.reset_index(level=-1, inplace=True)
df3.insert (2, "year", pd.to_datetime(df3['fc_reading_date'], errors='coerce').dt.year)
df3.insert (3, "month", pd.to_datetime(df3['fc_reading_date'], errors='coerce').dt.month)
df3.insert (4, "day", pd.to_datetime(df3['fc_reading_date'], errors='coerce').dt.day)

In [157]:
#Se agrupan los pulsos en una columna de tipo lista por id medidor año mes dia
df2 = new_df.groupby(['nr_app_dev_id','year','month','day'])['nr_value'].apply(list).reset_index(name='pulsos')

In [158]:
#Se agrupan los pulsos en una columna de tipo lista por id medidor año mes dia
df4 = df3.groupby(['nr_app_dev_id','year','month','day'])['nr_value'].apply(list).reset_index(name='pulsos')

In [159]:
df4.head()

Unnamed: 0,nr_app_dev_id,year,month,day,pulsos
0,202583.0,2019,1,3,"[nan, nan, nan]"
1,202583.0,2019,1,12,"[nan, nan, nan, nan, nan, nan, nan, nan, nan, ..."
2,202583.0,2019,1,14,"[nan, nan, nan, nan, nan, nan, nan]"
3,202583.0,2019,1,15,"[nan, nan, nan, nan, nan, nan, nan, nan, nan, ..."
4,202583.0,2019,1,16,"[nan, nan, nan, nan, nan, nan, nan, nan, nan, ..."


In [160]:
#se crea columna que cuenta la cantidad de pulsos diferentes a nulo
df2['nans'] = df4['pulsos'].apply(lambda x: len([a for a in x if a != 'nan']))

In [161]:
#si la cantidad es nula se deja en 96
df2['nans'].fillna(96, inplace=True)
df2

Unnamed: 0,nr_app_dev_id,year,month,day,pulsos,nans
0,202583.0,2019,1,1,"[1.626, 1.394, 1.191, 1.12, 1.395, 1.439, 1.20...",3.0
1,202583.0,2019,1,2,"[4.158, 3.687, 3.361, 4.01, 3.795, 3.764, 3.93...",19.0
2,202583.0,2019,1,3,"[4.293, 4.172, 3.941, 4.183, 4.43, 4.372, 4.21...",7.0
3,202583.0,2019,1,4,"[4.944, 4.562, 4.712, 5.004, 5.079, 4.978, 5.3...",79.0
4,202583.0,2019,1,5,"[4.927, 5.002, 4.9, 5.073, 5.281, 5.229, 5.465...",67.0
...,...,...,...,...,...,...
238165,691121.0,2019,1,27,"[0.031, 0.011, 0.005, 0.003, 0.003, 0.003, 0.0...",96.0
238166,691121.0,2019,1,28,"[0.006, 0.005, 0.004, 0.003, 0.003, 0.003, 0.0...",96.0
238167,691121.0,2019,1,29,"[0.085, 0.018, 0.007, 0.004, 0.003, 0.003, 0.0...",96.0
238168,691121.0,2019,1,30,"[0.055, 0.014, 0.004, 0.004, 0.004, 0.004, 0.0...",96.0


In [162]:
#se crea columna que cuenta la cantidad de pulsos diferentes a 0
df2['menor_96'] = df2['pulsos'].apply(lambda x: len([a for a in x if a != 0.0]))

In [164]:
#Se mantiene en el dataframe solo los medidores año-mes-dia donde los pulsos con 0 no superen los 6
#La cantidad de pulsos aceptables(90) sera un parametro de entrada que ingresará el usuario
df2 = df2.loc[(df2['menor_96'] > 90) ]

In [165]:
#Se mantiene en el dataframe solo los medidores año-mes-dia donde los pulsos reconstruidos(nulos) no superen los 6
#La cantidad de pulsos aceptables(90) sera un parametro de entrada que ingresará el usuario
df2 = df2.loc[(df2['nans'] > 90)]

Unnamed: 0,nr_app_dev_id,year,month,day,pulsos,nans,menor_96
9,202583.0,2019,1,10,"[3.924, 3.542, 2.996, 3.521, 3.809, 3.087, 3.1...",96.0,96
92,202592.0,2019,1,3,"[1.392, 1.957, 2.003, 1.981, 1.962, 1.95, 2.00...",96.0,96
93,202592.0,2019,1,4,"[1.572, 1.824, 2.165, 2.12, 1.965, 1.7, 1.641,...",96.0,96
96,202592.0,2019,1,7,"[1.257, 1.558, 1.697, 1.758, 1.681, 1.834, 1.8...",96.0,96
115,202592.0,2019,1,27,"[0.952, 1.325, 1.554, 1.635, 1.918, 1.6, 1.639...",92.0,96
...,...,...,...,...,...,...,...
238165,691121.0,2019,1,27,"[0.031, 0.011, 0.005, 0.003, 0.003, 0.003, 0.0...",96.0,96
238166,691121.0,2019,1,28,"[0.006, 0.005, 0.004, 0.003, 0.003, 0.003, 0.0...",96.0,96
238167,691121.0,2019,1,29,"[0.085, 0.018, 0.007, 0.004, 0.003, 0.003, 0.0...",96.0,96
238168,691121.0,2019,1,30,"[0.055, 0.014, 0.004, 0.004, 0.004, 0.004, 0.0...",96.0,96


In [166]:
#Se eliminan dias con media 0
df2 = df2.drop(df2[df2['pulsos'].map(np.mean) == 0].index)

In [167]:
#Se eliminan dias largo menor a 96
df2 = df2.drop(df2[df2['pulsos'].map(len) < 96].index)

In [168]:
#Se eliminan dias con suma menor o igual a 0
df2 = df2.drop(df2[df2['pulsos'].map(sum) <= 0].index)

In [170]:
#se normaliza los valores de los pulsos para que la suma total por dia de 96
df2['pulsos'] = df2['pulsos'].apply(lambda x: np.round(x / np.average(x),3) if sum(x)>=1 else x*1)

In [171]:
#from sklearn.preprocessing import StandardScaler

#df2['pulsos'] = df2['pulsos'].apply(lambda x: StandardScaler().fit_transform(x.values)

In [175]:
df2

Unnamed: 0,nr_app_dev_id,year,month,day,pulsos,nans,menor_96
9,202583.0,2019,1,10,"[1.434, 1.295, 1.095, 1.287, 1.392, 1.128, 1.1...",96.0,96
92,202592.0,2019,1,3,"[1.228, 1.726, 1.767, 1.747, 1.731, 1.72, 1.77...",96.0,96
93,202592.0,2019,1,4,"[1.458, 1.692, 2.008, 1.966, 1.822, 1.577, 1.5...",96.0,96
96,202592.0,2019,1,7,"[1.161, 1.439, 1.567, 1.623, 1.552, 1.694, 1.7...",96.0,96
115,202592.0,2019,1,27,"[0.864, 1.203, 1.411, 1.484, 1.741, 1.452, 1.4...",92.0,96
...,...,...,...,...,...,...,...
238165,691121.0,2019,1,27,"[0.031, 0.011, 0.005, 0.003, 0.003, 0.003, 0.0...",96.0,96
238166,691121.0,2019,1,28,"[0.12, 0.1, 0.08, 0.06, 0.06, 0.06, 0.08, 0.06...",96.0,96
238167,691121.0,2019,1,29,"[1.797, 0.38, 0.148, 0.085, 0.063, 0.063, 0.06...",96.0,96
238168,691121.0,2019,1,30,"[1.089, 0.277, 0.079, 0.079, 0.079, 0.079, 0.0...",96.0,96


In [179]:
##se guarda la data normalizada en Cassandra
%%time
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement

session = cluster.connect('smarmeters_test')
query = "INSERT INTO test_data_parametrizada_tres(id_medidor,year,month,day,pulsos) VALUES (?,?,?,?,?)"
prepared = session.prepare(query)
for index,item in df2.iterrows():
    session.execute(prepared, (str(item.nr_app_dev_id),item.year,item.month,item.day,item.pulsos))

CPU times: user 3min 53s, sys: 45.2 s, total: 4min 38s
Wall time: 6min 20s


In [185]:
#df2.to_csv("../datasets/dat_normalizada_201901.csv",columns=['nr_app_dev_id','year','month','day','pulsos'], index=False)