In [1]:
%load_ext autoreload
%autoreload 2
from dataclasses import dataclass
from pathlib import Path
import pandas as pd
import sys
sys.path.append('../../')
from omegaconf import OmegaConf
from dacite import from_dict

from tsbench.tslib.loading.csv_loader import CSVTimeSeriesDataset
from tsbench.tslib.traindataset_generator import TimeSeriesTrainDatasetGeneratorConfig, TimeSeriesTrainDatasetGenerator

from tsbench.tslib.utils import benchmark_dataloading

  from .autonotebook import tqdm as notebook_tqdm


# Benchmark dataloading with tslib

In initial experiments we observed, very slow dataloading especially when the normalizer was enabled.
In this notebook we investigate this further and hopefully find a solution (e.g. caching the final dataset in memory).

In [2]:
TRAIN_FILE = '../../datafiles/har_with_smartphones/train.csv'
TRAIN_FILE = Path(TRAIN_FILE)

In [3]:
# Benchmark 1: raw dataset loading
raw_ds = CSVTimeSeriesDataset(data_file=TRAIN_FILE, meta_columns=['subject', 'Activity'])

In [4]:
# Benchmark 2: full train dataset without normalization
cfg = """ 
pipeline:
  dataset:
    name: csvloader
    kwargs:
      data_file: /iarai/home/maximilian.beck/repos/tsbench_dev/datafiles/har_with_smartphones/train.csv
      meta_columns: [subject, Activity]
  windowing:
    window_size: 20 # each time series for the model will have length 10
    stride: 5 # each time series will be shifted by 5
  # normalizer: #! Note: this slows down training by a factor of 10!! Must be fixed!
  #   normalizer_file: /iarai/home/maximilian.beck/repos/tsbench/datafiles/har_with_smartphones/normalizer.json
  target_generator:
    name: csv_classification
    kwargs:
      class_column: Activity
      class_labels: ['STANDING', 'SITTING', 'LAYING', 'WALKING', 'WALKING_DOWNSTAIRS', 'WALKING_UPSTAIRS']
split: 
  name: random_split
  kwargs:
    lengths: [1.0, 0.0] # train, val
"""
cfg = OmegaConf.create(cfg)

cfg = from_dict(data_class=TimeSeriesTrainDatasetGeneratorConfig, data=OmegaConf.to_container(cfg))

train_ds_no_norm = TimeSeriesTrainDatasetGenerator(cfg)
train_ds_no_norm.generate_dataset()

Sample: 100%|██████████| 126/126 [00:00<00:00, 516.69it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 519.21it/s]
Generating window index: 100%|██████████| 126/126 [00:00<00:00, 11070.01it/s]
Total number of dropped timesteps due to windowing: 272




In [5]:
# Benchmark 3: full train dataset with normalization
cfg = """ 
pipeline:
  dataset:
    name: csvloader
    kwargs:
      data_file: /iarai/home/maximilian.beck/repos/tsbench_dev/datafiles/har_with_smartphones/train.csv
      meta_columns: [subject, Activity]
  windowing:
    window_size: 20 # each time series for the model will have length 10
    stride: 5 # each time series will be shifted by 5
  normalizer: #! Note: this slows down training by a factor of 10!! Must be fixed!
    normalizer_file: /iarai/home/maximilian.beck/repos/tsbench/datafiles/har_with_smartphones/normalizer.json
  target_generator:
    name: csv_classification
    kwargs:
      class_column: Activity
      class_labels: ['STANDING', 'SITTING', 'LAYING', 'WALKING', 'WALKING_DOWNSTAIRS', 'WALKING_UPSTAIRS']
split: 
  name: random_split
  kwargs:
    lengths: [1.0, 0.0] # train, val
"""
cfg = OmegaConf.create(cfg)

cfg = from_dict(data_class=TimeSeriesTrainDatasetGeneratorConfig, data=OmegaConf.to_container(cfg))

train_ds_norm = TimeSeriesTrainDatasetGenerator(cfg)
train_ds_norm.generate_dataset()

Sample: 100%|██████████| 126/126 [00:00<00:00, 505.47it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 510.19it/s]
Generating window index: 100%|██████████| 126/126 [00:00<00:00, 11645.97it/s]
Total number of dropped timesteps due to windowing: 272




In [6]:
len(train_ds_norm.train_split)

1038

In this benchmark we see, that normalization of each timeseries upon loading as implemented in the baseline slows down loading by a factor > 20!

In [7]:
NUM_EPOCHS = 2
print('\nBenchmark 1: raw dataset loading')
benchmark_dataloading(raw_ds, NUM_EPOCHS)

print('\nBenchmark 2: full train dataset without normalization')
benchmark_dataloading(train_ds_no_norm.train_split, NUM_EPOCHS)

print('\nBenchmark 3: full train dataset with normalization')
benchmark_dataloading(train_ds_norm.train_split, NUM_EPOCHS)


Benchmark 1: raw dataset loading
Sample: 100%|██████████| 126/126 [00:00<00:00, 620.02it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 646.11it/s]
Epoch: 100%|██████████| 2/2 [00:00<00:00,  4.91it/s]
Dataloading benchmark for CSVTimeSeriesDataset
Time in seconds
Time per epoch (num_epochs=2): [0.20908284187316895, 0.20009446144104004]
Average time per epoch: 0.2045886516571045
Total time: 0.41030216217041016
Time first epoch: 0.20908284187316895 / Time last epoch: 0.20009446144104004

Benchmark 2: full train dataset without normalization
Sample: 100%|██████████| 1038/1038 [00:04<00:00, 248.97it/s]
Sample: 100%|██████████| 1038/1038 [00:04<00:00, 251.58it/s]
Epoch: 100%|██████████| 2/2 [00:08<00:00,  4.15s/it]
Dataloading benchmark for TimeSeriesTrainDatasetSubset
Time in seconds
Time per epoch (num_epochs=2): [4.174413442611694, 4.129810094833374]
Average time per epoch: 4.152111768722534
Total time: 8.305387020111084
Time first epoch: 4.174413442611694 / Time last epoch: 4.129810

In [8]:
train_ds_norm.train_split.dataset.dataset

<tsbench.tslib.target.target_dataset.TimeSeriesTargetDataset at 0x7ff5380e65d0>

Solution Approach: Cache the final pipeline result of the in the TimeSeriesTrainDataset and load the full dataset upon creation once.

### Numpy Array vs dictionary for caching
https://chat.openai.com/share/72e8de07-ac42-4f87-9ea2-b4a4effca906

conclusion: we use numpy arrays as the access for the target dataset is always via integers indices and we want to save memory and optimize for speed.

In [9]:
import numpy as np
import sys

# Example dataset size
dataset_size = 1000000

# NumPy array with integers
numpy_array = np.zeros(dataset_size, dtype=object)

# Dictionary with integers
dictionary = {i: 0 for i in range(dataset_size)}

# Memory usage in bytes
numpy_memory_usage = sys.getsizeof(numpy_array)
dictionary_memory_usage = sys.getsizeof(dictionary)

print(f"NumPy array memory usage: {numpy_memory_usage} bytes")
print(f"Dictionary memory usage: {dictionary_memory_usage} bytes")

NumPy array memory usage: 8000112 bytes
Dictionary memory usage: 41943128 bytes


In [10]:
import numpy as np
import timeit

# Example dataset size
dataset_size = 1000000

# Creating a NumPy array with integers
numpy_array = np.arange(dataset_size)

# Creating a dictionary with integers
dictionary = {i: i for i in range(dataset_size)}

# Function to measure access time for NumPy array
def access_numpy_array():
    index = np.random.randint(0, dataset_size)
    value = numpy_array[index]

# Function to measure access time for dictionary
def access_dictionary():
    index = np.random.randint(0, dataset_size)
    value = dictionary[index]

# Measure access time for NumPy array
numpy_access_time = timeit.timeit(access_numpy_array, number=1000)

# Measure access time for dictionary
dictionary_access_time = timeit.timeit(access_dictionary, number=1000)

print(f"NumPy array access time: {numpy_access_time} seconds")
print(f"Dictionary access time: {dictionary_access_time} seconds")

NumPy array access time: 0.002144481986761093 seconds
Dictionary access time: 0.0022666417062282562 seconds


### Results: Added Caching to TargetDataset

In [11]:
# Benchmark 2: full train dataset without normalization
cfg = """ 
pipeline:
  dataset:
    name: csvloader
    kwargs:
      data_file: /iarai/home/maximilian.beck/repos/tsbench_dev/datafiles/har_with_smartphones/train.csv
      meta_columns: [subject, Activity]
  windowing:
    window_size: 20 # each time series for the model will have length 10
    stride: 5 # each time series will be shifted by 5
  # normalizer: #! Note: this slows down training by a factor of 10!! Must be fixed!
  #   normalizer_file: /iarai/home/maximilian.beck/repos/tsbench/datafiles/har_with_smartphones/normalizer.json
  target_generator:
    name: csv_classification
    kwargs:
      class_column: Activity
      class_labels: ['STANDING', 'SITTING', 'LAYING', 'WALKING', 'WALKING_DOWNSTAIRS', 'WALKING_UPSTAIRS']
  cache_processed_dataset: True #! This is the change!
split: 
  name: random_split
  kwargs:
    lengths: [1.0, 0.0] # train, val
"""
cfg = OmegaConf.create(cfg)

cfg = from_dict(data_class=TimeSeriesTrainDatasetGeneratorConfig, data=OmegaConf.to_container(cfg))

train_ds_no_norm = TimeSeriesTrainDatasetGenerator(cfg)
train_ds_no_norm.generate_dataset()

Sample: 100%|██████████| 126/126 [00:00<00:00, 504.40it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 506.60it/s]
Generating window index: 100%|██████████| 126/126 [00:00<00:00, 11659.33it/s]
Total number of dropped timesteps due to windowing: 272
Fill Processed Items Cache: 100%|██████████| 1038/1038 [00:04<00:00, 256.53it/s]




In [12]:
# Benchmark 3: full train dataset with normalization
cfg = """ 
pipeline:
  dataset:
    name: csvloader
    kwargs:
      data_file: /iarai/home/maximilian.beck/repos/tsbench_dev/datafiles/har_with_smartphones/train.csv
      meta_columns: [subject, Activity]
  windowing:
    window_size: 20 # each time series for the model will have length 10
    stride: 5 # each time series will be shifted by 5
  normalizer: #! Note: this slows down training by a factor of 10!! Must be fixed!
    normalizer_file: /iarai/home/maximilian.beck/repos/tsbench/datafiles/har_with_smartphones/normalizer.json
  target_generator:
    name: csv_classification
    kwargs:
      class_column: Activity
      class_labels: ['STANDING', 'SITTING', 'LAYING', 'WALKING', 'WALKING_DOWNSTAIRS', 'WALKING_UPSTAIRS']
  cache_processed_dataset: True #! This is the change!

split: 
  name: random_split
  kwargs:
    lengths: [1.0, 0.0] # train, val
"""
cfg = OmegaConf.create(cfg)

cfg = from_dict(data_class=TimeSeriesTrainDatasetGeneratorConfig, data=OmegaConf.to_container(cfg))

train_ds_norm = TimeSeriesTrainDatasetGenerator(cfg)
train_ds_norm.generate_dataset()

Sample: 100%|██████████| 126/126 [00:00<00:00, 497.51it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 510.23it/s]
Generating window index: 100%|██████████| 126/126 [00:00<00:00, 12564.96it/s]
Total number of dropped timesteps due to windowing: 272
Fill Processed Items Cache: 100%|██████████| 1038/1038 [01:33<00:00, 11.12it/s]




Repeat Benchmark with caching enabled:
Result: Lightning fast dataloading during training and validation, at the cost of iterating just once over the full dataset at the beginning of training. ;-)

In [13]:
NUM_EPOCHS = 10
print('\nBenchmark 1: raw dataset loading')
benchmark_dataloading(raw_ds, NUM_EPOCHS)

print('\nBenchmark 2: full train dataset without normalization')
benchmark_dataloading(train_ds_no_norm.train_split, NUM_EPOCHS)

print('\nBenchmark 3: full train dataset with normalization')
benchmark_dataloading(train_ds_norm.train_split, NUM_EPOCHS)


Benchmark 1: raw dataset loading
Epoch:   0%|          | 0/10 [00:00<?, ?it/s]Sample: 100%|██████████| 126/126 [00:00<00:00, 629.32it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 644.77it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 645.19it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 646.75it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 647.58it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 661.37it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 656.72it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 656.65it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 655.21it/s]
Sample: 100%|██████████| 126/126 [00:00<00:00, 657.21it/s]
Epoch: 100%|██████████| 10/10 [00:01<00:00,  5.06it/s]
Dataloading benchmark for CSVTimeSeriesDataset
Time in seconds
Time per epoch (num_epochs=10): [0.20562458038330078, 0.1992647647857666, 0.1990680694580078, 0.19869756698608398, 0.19789576530456543, 0.1941518783569336, 0.19552016258239746, 0.19568467140197754, 0.1958253383636