# Prepare

In [1]:
# import libraries
import json
import os
import os.path
import pandas as pd
from tqdm.notebook import tqdm

In [2]:
SOURCE = 'Y:\\ZBU\\_Datasets\\GHL\\GHL'  # dataset source dir
TARGET = 'E:\\Datasets\\GHL'

# Conver data

In [3]:
# There are tags in the dataset that do not relate to sensor or control signals.
# For realistic, let's drop them.

# There are duplicates and not integer timestamps.
# For realistic, let's fix it.

# There is impossible to get values every second for process like this.
# For realistic, let's resample to 1 min.

In [4]:
def rename_columns(df: pd.DataFrame) -> None:
    # rename columns
    rename_dict = {
        'Time': 'time',
        'limiter.y': 'input_temp_gc',
        'RT_temperature.T': 'rt_temp_gc',
        'HT_temperature.T': 'ht_temp_gc',
        'C_temperature.T': 'ct_temp_gc',
        'RT_level': 'rt_level_m',
        'HT_level': 'ht_level_m',
        'C_level': 'ct_level_m',
        'inj_valve_act': 'input_flow_state',
        'dir_valve_act': 'supply_flow_state',
        'inv_valve_act': 'return_flow_state',
        'out_valve_act': 'output_flow_state',
        'heater_act': 'heater_state',  # device with its own regulator
        'ATTACK': 'anomaly',
#         'RT_level_ini': ,  # level in RT at start of input
#         'dT_rand': ,  # random fluctuations of input temperature
#         'dt_rand':,  # random fluctuations of relaxing time
#         'dL_rand':  # random fluctuations of stop level in RT
#         'limiter1.y': 'time_relax',  # resulting relaxing time
#         'Relaxing.active': ,  # relax state is control logic value
#         'boundary.m_flow_in': 'input_flow_m3h',  # nominal input flow
    }
    df.rename(columns=rename_dict, inplace=True)
    if 'anomaly' not in df.columns:
        df['anomaly'] = 0
    # and drop other columns and duplicates
    columns2drop = [c for c in df.columns if c not in rename_dict.values()]
    df.drop(columns=columns2drop, inplace=True)
    df.drop_duplicates(subset='time', keep='first', inplace=True)
    return

In [5]:
def index_and_downsample(df: pd.DataFrame) -> None:
    # set index
    df['time'] = pd.to_datetime(df['time'].astype('int'), unit='s', origin='2016-10-28T00:00:00')
    df.set_index('time', inplace=True)
    df.index.name = None
    # downsample
    df = df.resample('1 min').first()
    return df

In [6]:
def trim_features(df: pd.DataFrame) -> None:
    zero_celsius = 273.0
    df['input_temp_gc'] -= zero_celsius
    df['rt_temp_gc'] -= zero_celsius
    df['ht_temp_gc'] -= zero_celsius
    df['ct_temp_gc'] -= zero_celsius
    return

In [7]:
def optimize_dtypes(df: pd.DataFrame) -> None:
    uint_columns = [
        'output_flow_state',
        'return_flow_state', 
        'input_flow_state',
        'supply_flow_state', 
        'heater_state',
        'anomaly',
    ]
    float_columns = [
        'rt_temp_gc', 
        'ct_temp_gc', 
        'rt_level_m', 
        'input_temp_gc', 
        'ct_level_m', 
        'ht_temp_gc', 
        'ht_level_m', 
    ]
    
    df[uint_columns] = df[uint_columns].apply(pd.to_numeric, downcast='unsigned')
    df[float_columns] = df[float_columns].apply(pd.to_numeric, downcast='float')
    return

In [8]:
train_dir = os.path.join(TARGET, 'train')
if not os.path.isdir(train_dir):
    os.mkdir(os.path.join(train_dir))

test_dir = os.path.join(TARGET, 'test')
if not os.path.isdir(test_dir):
    os.mkdir(os.path.join(test_dir))

files = os.listdir(os.path.join(SOURCE))

for f in tqdm(files):
    filename = os.path.join(SOURCE, f)
    
    data = pd.read_csv(filename)

    rename_columns(data)
    data = index_and_downsample(data)
    trim_features(data)
    optimize_dtypes(data)
    
    f_ = os.path.splitext(f)[0] + '.snappy'
    filename_ = os.path.join(train_dir, f_) if data['anomaly'].max() == 0 else os.path.join(test_dir, f_)
    data.to_parquet(filename_, compression='snappy')

  0%|          | 0/49 [00:00<?, ?it/s]

# Self-Check

In [9]:
data = pd.read_parquet(filename_)
data.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 25001 entries, 2016-10-28 00:00:00 to 2016-11-14 08:40:00
Data columns (total 13 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   rt_temp_gc         25001 non-null  float32
 1   ht_temp_gc         25001 non-null  float32
 2   rt_level_m         25001 non-null  float32
 3   output_flow_state  25001 non-null  uint8  
 4   return_flow_state  25001 non-null  uint8  
 5   input_temp_gc      25001 non-null  float32
 6   input_flow_state   25001 non-null  uint8  
 7   supply_flow_state  25001 non-null  uint8  
 8   ct_level_m         25001 non-null  float32
 9   ct_temp_gc         25001 non-null  float32
 10  heater_state       25001 non-null  uint8  
 11  ht_level_m         25001 non-null  float32
 12  anomaly            25001 non-null  uint8  
dtypes: float32(7), uint8(6)
memory usage: 1.0 MB


In [25]:
data.index

DatetimeIndex(['2016-10-28 00:00:00', '2016-10-28 00:01:00',
               '2016-10-28 00:02:00', '2016-10-28 00:03:00',
               '2016-10-28 00:04:00', '2016-10-28 00:05:00',
               '2016-10-28 00:06:00', '2016-10-28 00:07:00',
               '2016-10-28 00:08:00', '2016-10-28 00:09:00',
               ...
               '2016-11-14 08:31:00', '2016-11-14 08:32:00',
               '2016-11-14 08:33:00', '2016-11-14 08:34:00',
               '2016-11-14 08:35:00', '2016-11-14 08:36:00',
               '2016-11-14 08:37:00', '2016-11-14 08:38:00',
               '2016-11-14 08:39:00', '2016-11-14 08:40:00'],
              dtype='datetime64[ns]', length=25001, freq='60S')