# Cleaning data datalake - preprocessed - get raw data
En el datalake se obtiene la data con varios errores - se procede a correguirlos

Transformations:
- Set seconds to zero
- Aprox/round minutes to multiples of 5 minutes
- Drop hidden duplicates
- Inputer NaNs

--------
**DATA**:
- INPUT: "data_raw_bigquery.pkl"
- OUTPUT: "data_raw_preprocessed.pkl"

## Root folder and read env variables

In [1]:
import os
# fix root path to save outputs
actual_path = os.path.abspath(os.getcwd())
list_root_path = actual_path.split('\\')[:-1]
root_path = '\\'.join(list_root_path)
os.chdir(root_path)
print('root path: ', root_path)

root path:  D:\github-mi-repo\Optimization-Industrial-Process


In [2]:
import os
from dotenv import load_dotenv, find_dotenv # package used in jupyter notebook to read the variables in file .env

""" get env variable from .env """
load_dotenv(find_dotenv())

""" Read env variables and save it as python variable """
PROJECT_GCP = os.environ.get("PROJECT_GCP", "")

## RUN

In [3]:
import pandas as pd
import numpy as np
import datetime as dt
import json
import pickle
from sklearn.pipeline import Pipeline
import sys
import os
import warnings
import gcsfs

warnings.filterwarnings("ignore")

from sklearn.base import BaseEstimator, TransformerMixin

### 0. Read Raw data

In [4]:
path_data_datalake = 'artifacts/data/data_raw_bigquery.pkl'
data_datalake = pd.read_pickle(path_data_datalake)
data_datalake.head()

Unnamed: 0,Tag,PV,datetime
0,240FI020A_HRS_EOP.C,46.9992,2021-01-01
1,240FI020A_HRS_DO.C,11.48377,2021-01-01
2,230AIT446.PNT,11.55654,2021-01-01
3,240FIC440.MEAS,0.023512,2021-01-01
4,240FIC236.MEAS,0.000117,2021-01-01


### 1. Set seconds to zero

In [5]:
class set_seconds_zero(BaseEstimator,TransformerMixin):
    '''
    Fijar los segundos a cero.
    Todos los datos extraidos desde el datalake fallan en los segundos y no son extraidos exactamente en el segundo 00
    
    Example DataFrame
    1 S240ALDP022	90.600000	2022-03-20 23:55:03
    2 S240ALDP031	2.300000	2022-03-20 23:55:03
    '''
    
    def __init__(self):
        super(set_seconds_zero,self).__init__()

    def fit(self,DataFrame):
        return self

    def transform(self,DataFrame):
        print('\napplying set_seconds_zero...')
        print('Tamaño data raw: ', DataFrame.shape)
        
        
        # apply transformation
        DataFrame['datetime'] = DataFrame['datetime'].dt.strftime("%Y-%m-%d %H:%M:00")
        DataFrame['datetime'] = DataFrame['datetime'].astype('datetime64[ns]')
        return DataFrame

    
# instancia de la clase
seter_seconds = set_seconds_zero()

# transformar
basic_preprocessed_data = seter_seconds.transform(data_datalake)


applying set_seconds_zero...
Tamaño data raw:  (13352529, 3)


In [6]:
# print
basic_preprocessed_data.head()

Unnamed: 0,Tag,PV,datetime
0,240FI020A_HRS_EOP.C,46.9992,2021-01-01
1,240FI020A_HRS_DO.C,11.48377,2021-01-01
2,230AIT446.PNT,11.55654,2021-01-01
3,240FIC440.MEAS,0.023512,2021-01-01
4,240FIC236.MEAS,0.000117,2021-01-01


### 2. Aprox/round minutes to multiple of 5
The data is getting each 5 minutes, but some times it is not exact

In [7]:
class aprox_minutes_multiple_5(BaseEstimator,TransformerMixin):
    '''
    Aproximar los minutos a múltiplos de 5.
    
    Ejemplo:
    - A veces no se extrae justo cada 5 minutos, o se extrae de nuevo datos, ejemplo extraer datos en minuto 5, 12, 15, 18, 20
    - Luego hay que aproximar para que en los datos se vean como múltiplos de 5 minutos, por lo que se obtiene: 5, 10, 15, 15, 20
    '''
    
    def __init__(self):
        super(aprox_minutes_multiple_5,self).__init__()

    def fit(self,DataFrame):
        return self

    def transform(self,DataFrame):
        print('\napplying aprox_minutes_multiple_5...')
        print('Minutos que aparecen en el dataframe: ', DataFrame['datetime'].dt.minute.unique())
        
        
        # apply transformation
        DataFrame['datetime'] = DataFrame['datetime'].apply(lambda x: x.floor('5 min'))
        return DataFrame
    

# instancia
aprox_er = aprox_minutes_multiple_5()

# transformar
basic_preprocessed_data = aprox_er.transform(basic_preprocessed_data)


applying aprox_minutes_multiple_5...
Minutos que aparecen en el dataframe:  [ 0  4  5  9 10 14 15 19 20 24 25 29 30 34 35 39 40 44 45 49 50 54 55 59
 31 36 41 46 51 56  1  6 11 16 21 26  2  7 12 17 22 27 32 33 37 38 42 43
 47 48 52 53 57 58  3  8 13 18 23 28]


In [8]:
# print
basic_preprocessed_data.head(3)

Unnamed: 0,Tag,PV,datetime
0,240FI020A_HRS_EOP.C,46.9992,2021-01-01
1,240FI020A_HRS_DO.C,11.48377,2021-01-01
2,230AIT446.PNT,11.55654,2021-01-01


### 3. Eliminate duplicates that are generated with the same datetime (product of the approximation of the previous stage)

In [9]:
class drop_duplicates_time(BaseEstimator,TransformerMixin):
    '''
    Eliminar los duplicados que se podrian generar en el paso anterior redondeo de minutos:
    - dos o más observaciones del mismo tag tiene el MISMO valor y el mismo datetime
    - dos o más observaciones del mismo tag tiene DISTINTO valor y el mismo datetime
    
    Ex.
    TAG            VALUE    TIME
    240FI020A.PNT  3363.118 2021-09-01 00:00:00
    240FI020A.PNT  3362.869 2021-09-01 00:00:00
    '''
    def __init__(self):
        super(drop_duplicates_time,self).__init__()
    
    def fit(self,DataFrame):
        return self
    
    def transform(self,DataFrame):
        print('\napplying drop_duplicates_time...')
        print('Tamaño data: ', DataFrame.shape)
        
        
        # Drop duplicates. Same datetime, same tag, different value. Conservar el menor valor
        DataFrame=DataFrame.groupby(['datetime','Tag'],as_index=False).agg({'PV':['min']})
        DataFrame.columns=[x[0] if x[1]=='' else '_'.join(x) for x in DataFrame.columns]
        DataFrame=DataFrame.rename(columns={'PV_min': "PV"})
        print('Tamaño data luego de borrar duplicados time: ', DataFrame.shape)
        
        
        """ print de info de interés. Cantidad de datos que contiene la data raw correguido el datetime vs la cantidad de datos que debería de tener """
        # get idel number of observations
        days_to_query = (DataFrame.iloc[-1]['datetime'] - DataFrame.iloc[0]['datetime'])
        ideal_number_observations = (days_to_query.days + 1) * 288

        # get the real number of observations
        real_number_observations = DataFrame['datetime'].unique().shape[0]
        
        # calculate the % off loss data
        print('Num Obs ideal: Cantidad de datos que deberían haber si se subieran todos los datos de todos los días(cantidad días * 288): ', ideal_number_observations)
        print('Num Obs Real: Cantidad de obs realmente existen en los datos: ', real_number_observations)
        print('Cantidad de observaciones COMPLETAS no subidas al datalake: ', ideal_number_observations - real_number_observations)
        print('% observaciones COMPLETAS no subidas al datalake: ', 100 * (1 - (real_number_observations / ideal_number_observations)))
        
        return DataFrame
    

# instancia de la clase
droper_time = drop_duplicates_time()

# transformar
basic_preprocessed_data = droper_time.transform(basic_preprocessed_data)


applying drop_duplicates_time...
Tamaño data:  (13352529, 3)
Tamaño data luego de borrar duplicados time:  (12543679, 3)
Num Obs ideal: Cantidad de datos que deberían haber si se subieran todos los datos de todos los días(cantidad días * 288):  210528
Num Obs Real: Cantidad de obs realmente existen en los datos:  208865
Cantidad de observaciones COMPLETAS no subidas al datalake:  1663
% observaciones COMPLETAS no subidas al datalake:  0.7899186806505587


In [10]:
# print
basic_preprocessed_data.head(3)

Unnamed: 0,datetime,Tag,PV
0,2021-01-01,230AIT446.PNT,11.55654
1,2021-01-01,240AIC022.MEAS,3.008503
2,2021-01-01,240AIC126.MEAS,11.40464


### 4. Inputer NaNs
- Se asume que la cantidad de datos que no existen en el datalake fue por algún error en PI que lo clasificó como bad input, algún problema en la subida a BigQuery y no subió el dato
- La planta está siempre funcionando por lo que siempre tienen que haber datos
- Cuando hay PGP igual se guardan datos, lab se mantiene el último valor por ejemplo hasta que lo vuelven a actualizar
- Entonces, no habría razón para que la cantidad que datos en el datalake sea distinta al valor ideal
- Luego, estos valores faltantes se rellenan

In [11]:
class ImputerNaNs(BaseEstimator,TransformerMixin):
    '''
    Imputer NaN since data Raw (columns)
    Imputer NaN of each tags.
    Output: Data Raw with each tag have the same number of observations
    '''
    def __init__(self, method, start_date, end_date):
        super(ImputerNaNs,self).__init__()
        self.method = method
        self.start_date = start_date
        self.end_date = end_date
    
    def fit(self,DataFrame):
        return self
    
    def transform(self, DataFrame):
        print('\napplying ImputerNaNs...')
        
        
        df = pd.DataFrame(columns=['datetime', 'Tag', 'PV'])

        for tag_name in DataFrame['Tag'].unique().tolist():

            # create ALL indexes each 5 minutes since begin of data until end of data. "Indexes Full"
            raw= DataFrame[DataFrame['Tag']==tag_name]
            inicio =  self.start_date         
            fin =  self.end_date           
            idx = pd.date_range(dt.datetime.strptime(inicio, '%Y-%m-%d'), 
                                dt.datetime.strptime(fin, '%Y-%m-%d') + dt.timedelta(days = 1), # la query incluye el ultimo dia dentro de los datos que consulta
                                freq='5min'
                               )
            
            # set into data raw the "Indexes Full"
            raw=raw.set_index('datetime')
            raw=raw.reindex(idx)  
            raw = raw.reset_index()
            raw=raw.rename(columns={'index':'datetime'})

            #info: opcional para detener más detalles
            print('\nProcessing tag: ', tag_name)
            print('Amount of null values that will be filled: ', raw['Tag'].isnull().sum())
            print('Percent of null values that will be filled: {:.2f}'.format(100 * raw['Tag'].isnull().sum() / raw.shape[0]) + '%')


            # Fill NaN values with the previous value.
            if self.method == 'ffill':
                raw=raw.fillna(method='ffill',axis=0)  # interpolate with previos values
                raw=raw.fillna(method='bfill',axis=0)  # when the first value is null, this is not filled, so it is necessary fill it with the next value

            # Fill NaN values interpolated it
            if self.method == 'interpolate':
                raw = raw.interpolate()  # interpolate values
                raw=raw.fillna(method='ffill',axis=0)  # fill Tag

            #pinfo: opcional para mas detalles: revisar el largo de la data de cada uno de los tags
            print('revisar el largo de la data: ', raw.shape)

            # concat
            df=pd.concat([df, raw])
            
        print('\nNUEVO - Tamaño data raw: ', df.shape)
        print('NUEVO - Num Real Obs.: ', df['datetime'].unique().shape[0])
        return df

In [12]:
# obtener fechas de inicio y termino definido para el entrenamiento
json_params = 'config/params.json'
with open(json_params, 'r') as file:
    params = json.load(file)

# get start and end date train
start_date = params['blanqueo_santafe_all']['data_train']['start_date_train']
end_date = params['blanqueo_santafe_all']['data_train']['end_date_train']

print('start_date: ',start_date)
print('end_date: ', end_date)

start_date:  2021-01-01
end_date:  2023-01-01


In [13]:
# crear instancia de la clase
imputer = ImputerNaNs(method = 'ffill', 
                      start_date = start_date, 
                      end_date = end_date
                     )

# transform
basic_preprocessed_data = imputer.transform(basic_preprocessed_data)


applying ImputerNaNs...

Processing tag:  230AIT446.PNT
Amount of null values that will be filled:  5369
Percent of null values that will be filled: 2.55%
revisar el largo de la data:  (210529, 3)

Processing tag:  240AIC022.MEAS
Amount of null values that will be filled:  4943
Percent of null values that will be filled: 2.35%
revisar el largo de la data:  (210529, 3)

Processing tag:  240AIC126.MEAS
Amount of null values that will be filled:  6763
Percent of null values that will be filled: 3.21%
revisar el largo de la data:  (210529, 3)

Processing tag:  240AIC224.MEAS
Amount of null values that will be filled:  5176
Percent of null values that will be filled: 2.46%
revisar el largo de la data:  (210529, 3)

Processing tag:  240AIC286.MEAS
Amount of null values that will be filled:  3944
Percent of null values that will be filled: 1.87%
revisar el largo de la data:  (210529, 3)

Processing tag:  240AIC324.MEAS
Amount of null values that will be filled:  3942
Percent of null values t

In [14]:
basic_preprocessed_data

Unnamed: 0,datetime,Tag,PV
0,2021-01-01 00:00:00,230AIT446.PNT,11.55654
1,2021-01-01 00:05:00,230AIT446.PNT,11.55354
2,2021-01-01 00:10:00,230AIT446.PNT,11.55110
3,2021-01-01 00:15:00,230AIT446.PNT,11.54881
4,2021-01-01 00:20:00,230AIT446.PNT,11.54548
...,...,...,...
210524,2023-01-01 23:40:00,SSTRIPPING015,534.00000
210525,2023-01-01 23:45:00,SSTRIPPING015,534.00000
210526,2023-01-01 23:50:00,SSTRIPPING015,534.00000
210527,2023-01-01 23:55:00,SSTRIPPING015,534.00000


### 5. Save data raw

In [15]:
# save data pkl cloud
path_raw_data = 'artifacts/data/data_raw_preprocessed.pkl'
with open(path_raw_data, "wb") as output:
    pickle.dump(basic_preprocessed_data, output)
    output.close()