In [1]:
# Standard Libraries
import os
import json
import datetime
import numpy as np
import tensorflow as tf

import time
import nvtx

import argparse

import math
from tensorflow.keras.utils import Progbar

# ------------------------------- CUSTOM FUNCTIONS ------------------------------------------------
# Custom Library
import sys
sys.path.append('../')
    
from proxy_apps.apps.timeseries_prediction import deepDMD, proxyDeepDMD, proxyDeepDMDMGPU

from proxy_apps.utils.tf import TimingCallback
from proxy_apps.utils.data.main import NpEncoder
from proxy_apps.utils import file_reader, path_handler
from proxy_apps.utils.data.grid import GridNetworkDataHandler, GridNetworkTFDataHandler, GridNetworkNewGen, ReadFileAsDataset, TransientDataset

In [2]:
import pathlib

In [3]:
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator

In [4]:
# System Setup
config = file_reader.read_config()

_N_EPOCHS = 2
_BATCH_SIZE = 32
_APP_NAME = config["info"]["app_name"]
_NROWS = int(config["data"]["n_rows"])
_NCOLS = int(config["data"]["n_cols"])
_REPEAT_COLS = int(config["data"]["repeat_cols"])
_DTYPE = config["model"]["dtype"]

_LABEL = "TFDataOpt"
_SUFFIX =  "gpu" + '_' + \
            "a100" + '_' + \
            'ng' + str(1) + '_' + \
            'nc' + str(-1) + '_' + \
            'e' + str(_N_EPOCHS) + '_' + \
            'b' + str(_BATCH_SIZE) + '_' + \
            'r' + str(_REPEAT_COLS) + '_' + _LABEL

performance_dict = dict()

tic = time.time()

# current directory
curr_dir = "./"

# output directory
output_dir = path_handler.get_absolute_path(curr_dir, config["info"]["output_dir"] + config["info"]["name"] + "/" + config["info"]["app_name"] + "/" + _DTYPE + "/R" + str(_REPEAT_COLS) + "/")
if not os.path.exists(output_dir): os.makedirs(output_dir)

# TensorFlow Setup
print("[INFO] Tensorflow version: ", tf.__version__)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    print("Name:", gpu.name, "  Type:", gpu.device_type)

[INFO] Tensorflow version:  2.4.0


In [5]:
def benchmark(dataset, num_epochs=2, steps=10):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for s, sample in enumerate(dataset):
            # Performing a training step
            time.sleep(0.01)
            if s == steps - 1:
                break
            
    print("Execution time:", time.perf_counter() - start_time)

In [6]:
# tf.compat.v1.disable_eager_execution()
print("[INFO] Eager mode: ", tf.executing_eagerly()) # For easy reset of notebook state.

tf.keras.backend.clear_session()
tf.keras.backend.set_floatx('float64')

[INFO] Eager mode:  True


In [9]:
# ------------------------------- DATA LOADING ------------------------------------------------   
l_start = time.time()
if _LABEL == "Baseline":
    data_handler = GridNetworkDataHandler(scenario_dir=path_handler.get_absolute_path(curr_dir, config["info"]["input_dir"]),
                                            n_rows=_NROWS,
                                            n_cols=_NCOLS,
                                            repeat_cols=_REPEAT_COLS,
                                            dtype=_DTYPE
                                         ) 

    scenario_data = data_handler.load_grid_data()
elif _LABEL == "TFDataOptPrev":
    data_handler = GridNetworkTFDataHandler(scenario_dir=path_handler.get_absolute_path(curr_dir, config["info"]["input_dir"]),
                                            n_rows=_NROWS,
                                            n_cols=_NCOLS,
                                            repeat_cols=_REPEAT_COLS,
                                            dtype=_DTYPE
                                         ) 

    scenario_data = data_handler.load_grid_data()
elif _LABEL == "TFDataOpt":
#     data_handler = GridNetworkNewGen(scenario_dir=path_handler.get_absolute_path(curr_dir, config["info"]["input_dir"]),
#                                             n_rows=_NROWS,
#                                             n_cols=_NCOLS,
#                                             repeat_cols=_REPEAT_COLS,
#                                             d_type=_DTYPE
#                                          )

#     scenario_data = data_handler.load_grid_data()
    print("hello")

l_stop = time.time()
print('[INFO]: Time taken for loading datasets:', l_stop - l_start, 'seconds')

hello
[INFO]: Time taken for loading datasets: 0.00025916099548339844 seconds


In [12]:
# window_size = 800
# shift_size = 10
# stride = 1
# N = 3

# training_data = data_handler.get_training_data(scenario_data, window_size, shift_size, stride, N, 1)
# benchmark(training_data)

In [13]:
class TrainingDataGenerator(tf.data.Dataset):
    def _generator(file_name, n_rows, n_cols, n_repeat, x_indexer, y_indexer, norm, scale_factor):
        # Opening the file
        dataset = TransientDataset(file_name.decode('utf-8'))
        raw_data = np.repeat(np.concatenate([dataset.F, dataset.Vm], axis=1), n_repeat, axis=1)[:n_rows, :]
        split_index = (n_cols * n_repeat) // 2
        
        flat_X_data = raw_data[x_indexer].reshape(-1, n_cols)
        flat_Y_data = raw_data[y_indexer].reshape(-1, n_cols)
        if norm:
            flat_X_data[:, :split_index] = scale_factor*(flat_X_data[:, :split_index] - 60)
            flat_X_data[:, split_index:] = 10*(flat_X_data[:, :split_index] - 1)
            
            flat_Y_data[:, :split_index] = scale_factor*(flat_Y_data[:, :split_index] - 60)
            flat_Y_data[:, split_index:] = 10*(flat_Y_data[:, :split_index] - 1)
            
        yield (flat_X_data, flat_Y_data)

    def __new__(cls, file_name, n_rows, flat_n_rows, n_cols, n_repeat, x_indexer, y_indexer, d_type, norm=True, scale_factor=2*np.pi):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = (tf.TensorSpec(shape=(flat_n_rows, n_cols*n_repeat), dtype=d_type),
                                tf.TensorSpec(shape=(flat_n_rows, n_cols*n_repeat), dtype=d_type)),
            args=(file_name, n_rows, n_cols, n_repeat, x_indexer, y_indexer, norm, scale_factor,)
        )

In [14]:
def get_indexer(n_rows, window_size, shift_size, start_point, leave_last):
    return np.arange(window_size)[None, :] + start_point + shift_size*np.arange(((n_rows - window_size - leave_last - start_point) // shift_size) + 1)[:, None]

In [15]:
window_size = 800
shift_size = 10
stride = 1
N = 3

scenario_dir = path_handler.get_absolute_path(curr_dir, config["info"]["input_dir"])
dir_list = [scenario_dir + "/" + f + "/" for f in os.listdir(scenario_dir)]
list_files = tf.data.Dataset.from_tensor_slices(dir_list)

x_indexer = get_indexer(_NROWS, window_size, shift_size, 0, 3)
y_indexer = get_indexer(_NROWS, window_size, shift_size, 1, 0)

trimmed_scenarios = list_files.interleave(lambda x: ReadFileAsDataset(x, _NROWS, x_indexer.shape[0] * x_indexer.shape[1], 
                                                                      _NCOLS, _REPEAT_COLS, 
                                                                      x_indexer, y_indexer,
                                                                      _DTYPE
                                                                     ).prefetch(tf.data.AUTOTUNE),
                                          num_parallel_calls=tf.data.AUTOTUNE,
                                          cycle_length=30, 
                                          block_length=1 
                                         )

flat_data = trimmed_scenarios.flat_map(lambda x, y: tf.data.Dataset.from_tensor_slices((x, y))).cache()
benchmark(flat_data.batch(65536))

Execution time: 8.604403546079993


In [16]:
for x, y in flat_data.take(1):
    print(x, y)

tf.Tensor(
[  0.08234169   0.08398943   0.08258069   0.08214684   0.08244791
   0.08239397   0.08254649   0.08293292   0.08758576   0.08190964
   0.08206848   0.08195436   0.08184469   0.08165109   0.08036628
   0.0798736    0.08542501   0.08431926   0.04962437   0.03824485
   0.08420936   0.09290615   0.09679139   0.08237581   0.08692641
   0.11023702   0.09832751   0.14486176   0.15468657   0.08604158
   0.08542665   0.09650718   0.09626098   0.09627406   0.09861495
   0.09202704   0.09158971   0.09184748   0.09850423   0.03992778
  -0.03307551  -0.0259497    0.09590663   0.09609054   0.10489532
   0.10175143   0.06886592   0.05863196   0.1120856    0.12620923
   0.1111895    0.14195852   0.08204857   0.08240602   0.08138131
   0.03664793   0.02118596   0.09608962   0.12064654   0.07192819
   0.17851272   0.07751199   0.10213777   0.09318556   0.09092183
  -0.03392203  -0.02954703   0.15240775  -9.176583    -9.160106
  -9.174193    -9.178532    -9.175521    -9.176061    -9.174535
  -

In [None]:
# benchmark(flat_data.batch(65536))

In [None]:
# benchmark(trimmed_scenarios.batch(65536))

In [None]:
if _LABEL in ["Baseline", "TFDataOptPrev"]:
    # ------------------------------- DATA PREPROCESSING ------------------------------------------------
    i_start = time.time()
    X_data, Y_data, U_data, V_data, Yp, Yf = data_handler.create_windows(scenario_data)
    i_stop = time.time()
    print('[INFO]: Time taken for creating X datasets:', i_stop - i_start, 'seconds')
    
    # ------------------------------- DATA NORMALIZATION ------------------------------------------------
    n_start = time.time()
    X_array, Y_array, U_array, V_array, Yp_array, Yf_array = data_handler.scale_data(X_data, Y_data, U_data, V_data, Yp, Yf)
    n_stop = time.time()
    print('[INFO]: Time taken for normalization:', n_stop - n_start, 'seconds')

In [None]:
# timing callback
timing_cb = TimingCallback()

model = tf.keras.Sequential([
    tf.keras.layers.Dense(128),
    tf.keras.layers.Dense(128),
    tf.keras.layers.Dense(64),
    tf.keras.layers.Dense(1)
])

In [None]:
model.compile(optimizer='adam', loss='mse')
model.build(input_shape=(None,136))
model.summary()

In [None]:
if _LABEL == "Baseline":
    model.fit(X_array, Y_array, epochs=2, batch_size=_BATCH_SIZE, callbacks=[timing_cb])
    m_stop = time.time()
elif _LABEL == "TFDataOptPrev":
    # tensorflow dataset conversion
    tf_conv_start = time.time()
    training_dataset = tf.data.Dataset.zip((X_array, Y_array)).batch(_BATCH_SIZE)
    training_dataset = training_dataset.cache()
    training_dataset = training_dataset.shuffle(buffer_size=22)
    training_dataset = training_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    tf_conv_stop = time.time()
    print('[INFO]: Tensorflow Conversion Time:', tf_conv_stop - tf_conv_start, 'seconds')
    
    m_start = time.time()
    model.fit(training_dataset, epochs=_N_EPOCHS, callbacks=[timing_cb])
    m_stop = time.time()
elif _LABEL == "TFDataOpt":
    window_size = 800
    shift_size = 10
    stride = 1
    N = 3
    training_data_gen = data_handler.get_training_data(scenario_data, window_size, shift_size, stride, N, 1)
    training_dataset = training_data_gen.batch(_BATCH_SIZE)
    training_dataset = training_dataset.cache()
    training_dataset = training_dataset.shuffle(buffer_size=22)
    training_dataset = training_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    m_start = time.time()
    model.fit(training_dataset, epochs=_N_EPOCHS, callbacks=[timing_cb], workers=64, use_multiprocessing=True)
    m_stop = time.time()
    
# print info
print('[INFO]: Time taken for model training (time module):', m_stop - m_start, 'seconds')
print('[INFO]: Time taken for model training (Keras):', sum(timing_cb.logs), 'seconds')

In [None]:
tf.keras.preprocessing.sequence.TimeseriesGenerator

In [None]:
# window_size = 800
# shift_size = 10
# stride = 1
# N = 3

# data_handler_seqgen = GridNetworkSequentialGen(scenario_dir=path_handler.get_absolute_path(curr_dir, config["info"]["input_dir"]),
#                                             n_rows=_NROWS,
#                                             n_cols=_NCOLS,
#                                             repeat_cols=_REPEAT_COLS,
#                                             d_type=_DTYPE
#                                          )
# trimmed_scenarios_seqgen = data_handler_seqgen.load_grid_data()
# training_data_seqgen = data_handler_seqgen.get_training_data(trimmed_scenarios_seqgen, window_size, shift_size, stride, N, 1)
# benchmark(training_data_seqgen)