In [38]:
import logging
import os
import numpy as np
import pandas as pd
import toml
config_path = "../UserPrediction6DOF/tools/config.toml"
from sklearn.model_selection import train_test_split

In [43]:
def train_val_test_split(X, y, test_ratio):
    val_ratio = test_ratio / (1 - test_ratio)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_ratio, shuffle=False)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=val_ratio, shuffle=False)
    return X_train, X_val, X_test, y_train, y_val, y_test


def test_train_split(X, y, test_ratio):
    val_ratio = test_ratio / (1 - test_ratio)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_ratio, shuffle=False)
    return X_train, X_test, y_train, y_test


def add_sliding_window(X, y, seq_length, pred_step):
    X_w = []
    y_w = []

    # SLIDING WINDOW LOOKING INTO PAST TO PREDICT 20 ROWS INTO FUTURE
    for i in range(seq_length, len(X) - pred_step + 1):
        X_w.append(X[i - seq_length:i, 0:X.shape[1]])
        y_w.append(y[i:i + pred_step, 0:y.shape[1]])

    X_w, y_w = np.array(X_w), np.array(y_w)

    logging.info("------------- Creating 3D datasets and adding sliding window ------------")
    logging.info(f'X_w.shape: {X_w.shape}')
    logging.info(f'y_w.shape: {y_w.shape}')
    logging.info(f"Sliding window of {seq_length} added and 3D datasets are created!")
    logging.info("--------")
    return X_w, y_w


def load_dataset(dataset_path):
    logging.info(f"---------------- Reading 2D dataset from {dataset_path} ----------------")
    df = pd.read_csv(os.path.join(dataset_path, "dataset.csv"))
    logging.info(f"Dataset shape: {df.shape}")
    logging.info(f'Columns: {list(df.columns)}')
    logging.info("--------")
    return df


def prepare_X_y(df, features, seq_length, pred_step, outputs):
    X = df[features].to_numpy()
    logging.info("------------------ Creating 2D X and y datasets  -----------------------")
    logging.info(f'X.shape: {X.shape}')
    logging.info(f'Using past {seq_length} values for predict in {pred_step} in future')

    y = df[outputs].to_numpy()
    logging.info(f'y.shape: {y.shape}')
    logging.info('2D datasets X and y created')
    logging.info("--------")
    return X, y


def prepare_loaders(X_train, y_train, X_test, y_test, batch_size=64):
    train_features = torch.Tensor(X_train)
    train_targets = torch.Tensor(y_train)
    test_features = torch.Tensor(X_test)
    test_targets = torch.Tensor(y_test)

    train = TensorDataset(train_features, train_targets)
    test = TensorDataset(test_features, test_targets)

    train_loader = DataLoader(train, batch_size=batch_size, shuffle=False, drop_last=True)
    test_loader = DataLoader(test, batch_size=batch_size, shuffle=False, drop_last=True)

    return train_loader, test_loader


def load_data(X_train, X_val, X_test, y_train, y_val, y_test, batch_size=64):
    """

    :param X_train:
    :param X_val:
    :param X_test:
    :param y_train:
    :param y_val:
    :param y_test:
    :param batch_size:
    :return:

    The drop_last=True parameter ignores the last batch
    (when the number of examples in a dataset is not divisible
    by a batch_size) while drop_last=False will make the last batch
    smaller than a batch_size
    """

    train_features = torch.Tensor(X_train)
    train_targets = torch.Tensor(y_train)
    val_features = torch.Tensor(X_val)
    val_targets = torch.Tensor(y_val)
    test_features = torch.Tensor(X_test)
    test_targets = torch.Tensor(y_test)

    train = TensorDataset(train_features, train_targets)
    val = TensorDataset(val_features, val_targets)
    test = TensorDataset(test_features, test_targets)

    train_loader = DataLoader(train, batch_size=batch_size, shuffle=False, drop_last=True)
    val_loader = DataLoader(val, batch_size=batch_size, shuffle=False, drop_last=True)
    test_loader = DataLoader(test, batch_size=batch_size, shuffle=False, drop_last=True)
    test_loader_one = DataLoader(test, batch_size=1, shuffle=False, drop_last=True)

    return train_loader, val_loader, test_loader, test_loader_one


def save_numpy_array(dataset_path, filename, np_array):
    np.save(os.path.join(dataset_path, f'{filename}.npy'), np_array)
    logging.info(f'WRITE: {filename} saved to {dataset_path}')


def load_numpy_array(dataset_path, filename):
    data = np.load(os.path.join(dataset_path, f'{filename}.npy'))
    logging.info(f'READ: {filename} loaded from {dataset_path}')
    logging.info(f'{filename}.shape: {data.shape}')
    return data

In [40]:
class Dataset:
    def __init__(self, features, outputs, dataset_path, results_path, pred_window=100):
        self.cfg = toml.load(config_path)
        self.dt = self.cfg['dt']
        self.pred_window = pred_window * 1e-3  # convert to seconds
        self.pred_step = int(self.pred_window / self.dt)
        self.dataset_path = dataset_path
        self.results_path = results_path
        self.model = None  # set by select_model()
        self.params = None  # set by select_model()
        self.X, self.y = [], []
        self.X_w, self.y_w = [], []
        self.X_train, self.X_val, self.X_test = [], [], []
        self.y_train, self.y_val, self.y_test = [], [], []
        self.config = None
        self.seq_length_input = 20  # input length of timeseries from the past
        self.seq_length_output = self.pred_step  # output length of timeseries in the future
        # -------------  FEATURES ---------------#
        self.features = features
        # only position and rotation
        # self.features = self.cfg['pos_coords'] + self.cfg['quat_coords']

        # --------------  OUTPUTS ---------------#
        # position and rotation in future will be predicted
        self.outputs = outputs
    def _prepare_raw_dataset(self):
        # Read full dataset from CSV file
        df = load_dataset(self.dataset_path)
        # create 2D arrays of features and outputs
        self.X, self.y = prepare_X_y(df, self.features, self.seq_length_input, self.pred_step, self.outputs)

    def _add_sliding_window(self):
        # Features and outputs with sequence_len = sliding window
        self.X_w, self.y_w = add_sliding_window(self.X, self.y, self.seq_length_input, self.pred_step)
    
    def _split(self):
    # Splitting the data into train, validation, and test sets
        self.X_train, self.X_val, self.X_test, \
            self.y_train, self.y_val, self.y_test = train_val_test_split(self.X_w, self.y_w, 0.2)

        logging.info(f"X_train {self.X_train.shape}, X_val {self.X_val.shape}, "
                     f"X_test{self.X_test.shape}, y_train {self.y_train.shape}, "
                     f"y_val {self.y_val.shape}, y_test {self.y_test.shape}")

        path = os.path.join(self.results_path, 'train_val_test')
        if not os.path.exists(path):
            os.makedirs(path, exist_ok=True)

        save_numpy_array(path, 'X_train', self.X_train)
        save_numpy_array(path, 'X_val', self.X_val)
        save_numpy_array(path, 'X_test', self.X_test)
        save_numpy_array(path, 'y_train', self.y_train)
        save_numpy_array(path, 'y_val', self.y_val)
        save_numpy_array(path, 'y_test', self.y_test)


In [59]:
cfg = toml.load(config_path)
features_pos = cfg['pos_coords'] + cfg['velocity']
features_pos_pure = cfg['pos_coords']
outputs_pos = cfg['pos_coords']
features_rot = cfg['quat_coords'] + cfg['velocity']
features_rot_pure = cfg['quat_coords']
outputs_rot = cfg['quat_coords']


In [57]:
ds_pos = Dataset(features_pos, outputs_pos, dataset_path='../data/flipped', results_path='../data/flipped/position')
ds_pos._prepare_raw_dataset()
ds_pos._add_sliding_window()
ds_pos._split()

In [58]:
ds_rot = Dataset(features_rot, outputs_rot, dataset_path='../data/flipped', results_path='../data/flipped/rotation')
ds_rot._prepare_raw_dataset()
ds_rot._add_sliding_window()
ds_rot._split()

In [60]:
ds_pos_pure = Dataset(features_pos_pure, outputs_pos, dataset_path='../data/flipped', results_path='../data/flipped/position_pure')
ds_pos_pure._prepare_raw_dataset()
ds_pos_pure._add_sliding_window()
ds_pos_pure._split()

In [61]:
ds_rot_pure = Dataset(features_rot_pure, outputs_rot, dataset_path='../data/flipped', results_path='../data/flipped/rotation_pure')
ds_rot_pure._prepare_raw_dataset()
ds_rot_pure._add_sliding_window()
ds_rot_pure._split()