In [1]:
import pandas as pd
import glob

from sklearn.preprocessing import StandardScaler

def check_isnan(df):
    isnan = df.isna().any()
    for col in df.columns:
        if isnan[col]:
            return False
    return True


def get_selected_columns(integrated):
    selected_columns = set(integrated.columns.drop('timestamp'))
    mins, maxes = integrated.min(), integrated.max()
    for col in integrated.columns:
        if mins[col] == maxes[col]:
            selected_columns.remove(col)
    return sorted(list(selected_columns))

def load_dataset(data_folder_dir, selected_cols=None, scaler=None, ewm=True, validation=False):
    """
    :param data_folder_dir: directory of dataset folder. '.../folder/'
    :param selected_cols: selected columns to use. If None, automatically generates and returns
    :param scaler: fitted scaler to train_dataset. If None, automatically generates and returns
    :param ewm: whether to use Exponential smoothing.
    :return: if cols, scaler are None, gives list of np arrays. else, given with cols, scaler (train)
    """
    dirs = glob.glob(data_folder_dir + '*.csv')
    data_list = [pd.read_csv(path) for path in dirs]
    itc = [0]  # indices to cut
    if not selected_cols or not scaler:  # train
        integrated = pd.DataFrame(columns=data_list[0].columns)
        for i, data in enumerate(data_list):
            integrated = integrated.append(data)
            itc.append(integrated.shape[0])
        selected_cols = get_selected_columns(integrated)
        data_pd = integrated[selected_cols]
        assert check_isnan(data_pd), "there exist nan in dataset"
        scaler = StandardScaler()
        data_np = scaler.fit_transform(data_pd)
        if ewm:
            return [pd.DataFrame(data_np[itc[i]:itc[i+1]]).ewm(alpha=0.9).mean().values
                    for i in range(len(itc) - 1)], selected_cols, scaler
        else:
            return [data_np[itc[i]:itc[i + 1]] for i in range(len(itc) - 1)], selected_cols, scaler
    else:  # test
        if validation:
            attack = data_list[0]['attack']
        integrated = pd.DataFrame(columns=selected_cols)
        for i, data in enumerate(data_list):
            integrated = integrated.append(data[selected_cols])
            itc.append(integrated.shape[0])
        assert check_isnan(integrated), "there exist nan in dataset"
        data_np = scaler.transform(integrated)
        if ewm:
            if validation:
                return [pd.DataFrame(data_np[itc[i]:itc[i + 1]]).ewm(alpha=0.9).mean().values
                        for i in range(len(itc) - 1)], attack
            else:
                return [pd.DataFrame(data_np[itc[i]:itc[i + 1]]).ewm(alpha=0.9).mean().values
                        for i in range(len(itc) - 1)]
        else:
            if validation:
                return [data_np[itc[i]:itc[i + 1]] for i in range(len(itc) - 1)], attack
            else:
                return [data_np[itc[i]:itc[i + 1]] for i in range(len(itc) - 1)]

In [19]:
from abc import ABC

import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator


class HAIDataLoader(tf.data.Dataset, ABC):
    def _generator1(seq_data_list):
        for seq_data in seq_data_list:
            for idx in range(len(seq_data)):
                x, y = seq_data[idx][:-1], seq_data[idx][-1]
                yield x, y

    def _generator2(seq_data_list):
        for seq_data in seq_data_list:
            for idx in range(len(seq_data)):
                x, y = seq_data[idx][:-1], seq_data[idx][-1]
                yield x, y - x[0]

    def __new__(cls, np_data_list, length=50, stride=3, sampling_rate=2,
                batch_size=32, generator_option=1, train=True):
        # for np_data in np_data_list:
        #     assert len(np_data.shape) == 2, 'np_data must be 2D'
        n_features = np_data_list[0].shape[-1]
        if train:
            seq_data_list = [TimeseriesGenerator(data=elt, targets=elt, length=length,
                                                 sampling_rate=sampling_rate, stride=stride,
                                                 batch_size=batch_size, shuffle=True)
                             for elt in np_data_list]
        else:
            seq_data_list = [TimeseriesGenerator(data=elt, targets=elt, length=length,
                                                 sampling_rate=sampling_rate, stride=stride,
                                                 batch_size=batch_size)
                             for elt in np_data_list]
        if generator_option == 1:
            return tf.data.Dataset.from_generator(
                cls._generator1,
                output_signature=(
                    tf.TensorSpec(shape=(length - 1, n_features), dtype=tf.float32),
                    tf.TensorSpec(shape=(1, n_features), dtype=tf.float32)),
                args=(seq_data_list,)
            )
        if generator_option == 2:
            return tf.data.Dataset.from_generator(
                cls._generator2,
                output_signature=(
                    tf.TensorSpec(shape=(length - 1, n_features), dtype=tf.float32),
                    tf.TensorSpec(shape=(1, n_features), dtype=tf.float32)),
                args=(seq_data_list,)
            )
        else:
            raise ValueError("generator_option must be 1 or 2")

In [11]:
import time

data_loading_start = time.perf_counter()
np_data_list, _, _ = load_dataset('../datasets/train/')
dataset = HAIDataLoader(np_data_list).prefetch(4)
print("Data loading time:", time.perf_counter() - data_loading_start)
print("#"*50)

start_time = time.perf_counter()
idx = 0
for epoch_num in range(10):
    epoch_start_time = time.perf_counter()
    for sample in dataset:
        idx += 1
    print(f"epoch {epoch_num + 1} execution time:", time.perf_counter() - epoch_start_time)
print("#" * 50)
print("Total execution time:", time.perf_counter() - start_time)

ValueError: Attempt to convert a value (<BatchDataset shapes: (None, None, 68), types: tf.float64>) with an unsupported type (<class 'tensorflow.python.data.ops.dataset_ops.BatchDataset'>) to a Tensor.

In [91]:
def _generator1(seq_data_list):
    for seq_data in seq_data_list:
        for x, y in seq_data:
            yield x, y

In [92]:
n_features = np_data_list[0].shape[-1]
seq_data_list = [TimeseriesGenerator(data=elt, targets=elt, length=50,
                                                 sampling_rate=2, stride=3,
                                                 batch_size=32, shuffle=True)
                             for elt in np_data_list]

In [33]:
# std_x_shape, std_y_shape = seq_data_list[0][0][0].shape, seq_data_list[0][0][1].shape
# print(std_x_shape, std_y_shape)
# for seq_data in seq_data_list:
#     for elt in seq_data:
#         if elt[0].shape != std_x_shape or elt[1].shape != std_y_shape:
#             print(elt.shape)

(32, 25, 68) (32, 68)


In [93]:
temp = _generator1(seq_data_list)

In [94]:
x, y = next(temp)
x.shape, y.shape

((32, 25, 68), (32, 68))

In [89]:
x.shape

(32, 25, 68)

In [95]:
std_x_shape, std_y_shape = seq_data_list[0][0][0].shape, seq_data_list[0][0][1].shape

tf.data.Dataset.from_generator(
                _generator1,
                output_signature=(
                    tf.TensorSpec(shape=std_x_shape, dtype=tf.float64),
                    tf.TensorSpec(shape=std_y_shape, dtype=tf.float64)),
                args=(seq_data_list,)
            )

ValueError: Can't convert non-rectangular Python sequence to Tensor.