## Preprocessing Data Notebook

In [1]:
import os, sys
import pandas as pd
import numpy as np
import torch
from multiprocessing import Process, Manager
import numba
import shutil
from datetime import datetime
import glob
from sklearn.model_selection import train_test_split
import h5py

sys.path.insert(0, 'src')
from fs_utils import remove_and_create_dir
from dqn_src.config import BaseConfig
from dqn_src.traffic_indicator import TrafficIndicatorFeature
from dqn_src import builtseq
from dqn_src.min_max_scaler import cScaler, load_scaler

In [2]:
config = BaseConfig()
target = ['time_in_sys']
np.random.seed(config.seed)

# CHANGE THIS IF NEEDED
data_root = os.path.join('/scratch/achen/COS561/deepqueuenet/dqn_data/data/')



In [None]:
def buildfolder(data_root, model_name):
    for name in [
            '_hdf', '_hdf/train', '_hdf/test1', '_hdf/test2', '_scaler',
            '_tfrecords', '_error'
    ]:
        folder = os.path.join(data_root, 'data/{}/{}'.format(model_name, name))
        if os.path.exists(folder):
            # shutil.rmtree(folder)
            continue
        os.makedirs(folder)
        
buildfolder(
    data_root=data_root,
    model_name=config.modelname)

In [3]:
def multi_process(func, FILES, args=None):
    print("in multi_process()")
    print("Length of FILES: {}".format(len(FILES)))
    it = 0
    while True:
        files = FILES[it * config.no_process:(it + 1) *
                      config.no_process]
        print("[{}] Itr {} processing {} files".format(datetime.now().strftime(r'%m%d_%H%M%S'), it+1, len(files)))
        if len(files) > 0:
            threads = []
            for file in files:
                ARGS = list(args)
                ARGS.append(file)
                t = Process(target=func, args=tuple(ARGS))
                threads.append(t)
                t.start()
            for thr in threads:
                thr.join()
            it += 1
        else:
            break

In [4]:


def feature_extraction(config: BaseConfig,
                       target: list[str],
                       data_root: str):
    
    """feature extraction module"""

    # @numba.jit
    def gettraffic(dst_folder, my_fet, file):
        df = pd.read_csv(file).fillna(config.sp_wgt)

        #traffic load features
        ins = TrafficIndicatorFeature(df, config.no_of_port, config.no_of_buffer,
                      config.window, config.ser_rate)
        C_dst_SET, LOAD = ins.getCount()
        for i in range(config.no_of_port):
            df['TI%i' % i] = LOAD[i]
        for i in range(config.no_of_port):
            for j in range(config.no_of_buffer):
                df['load_dst{}_{}'.format(i, j)] = C_dst_SET[(i, j)]

        #arrival patterns
        df['inter_arr_sys'] = df['timestamp (sec)'].diff()
        if config.no_of_buffer > 1:
            for i in range(config.no_of_buffer):
                t = df[df['priority'] ==
                       i]['timestamp (sec)'].diff().rename(
                           'inter_arr{}'.format(i)).to_frame()
                df = df.join(t)

        #save
        filename = file.split('/')[-1]
        drop_cols = ['timestamp (sec)'] + target
        if config.no_of_buffer == 1: drop_cols += ['priority']
        fet_cols = list(df.columns.drop(drop_cols))
        my_fet['fet_cols'] = fet_cols
        df[fet_cols + target].fillna(method='ffill').dropna().to_csv(
            '{}/{}'.format(dst_folder, filename), index=False)

    with Manager() as MG:
        my_fet = MG.dict()
        for mode in ['train', 'test']:
            print(data_root)
            file_dir = os.path.join(data_root, '{}/_traces/_{}'.format(config.modelname, mode))
            print(data_root)
            print("Looking in {} for files.".format(file_dir))
            FILES = []
            for dirpath, dirnames, filenames in os.walk(file_dir):
                for file in filenames:
                    if (os.path.splitext(file)[1]
                            == '.csv') and 'checkpoint' not in file:
                        FILES.append(os.path.join(dirpath, file))
            
            dst_folder = os.path.join(data_root, '{}/_traces/{}'.format(
                config.modelname, mode))
            if os.path.exists(dst_folder):
                shutil.rmtree(dst_folder)
            os.makedirs(dst_folder)
            multi_process(gettraffic,
                               FILES,
                               args=(dst_folder, my_fet))

            
        fet_cols = my_fet['fet_cols']
        
    feature_columns_save_path = os.path.join(data_root, config.modelname, 'feature_columns.pt')
    torch.save(fet_cols, feature_columns_save_path)
    return fet_cols

In [None]:
train_features_path = os.path.join(data_root, config.modelname, '_traces', 'train')
test_features_path = os.path.join(data_root, config.modelname, '_traces', 'test')
if os.path.exists(train_features_path) and os.path.exists(test_features_path):
    print("Extracted features already exist at {} and {}".format(train_features_path, test_features_path))

feature_columns = feature_extraction(
    config=config,
    target=target,
    data_root=data_root)



/scratch/achen/COS561/deepqueuenet/dqn_data/data/
/scratch/achen/COS561/deepqueuenet/dqn_data/data/
Looking in /scratch/achen/COS561/deepqueuenet/dqn_data/data/4-port switch/FIFO/_traces/_train for files.
in multi_process()
Length of FILES: 1260
[0321_180027] Itr 1 processing 15 files
[0321_180047] Itr 2 processing 15 files
[0321_180111] Itr 3 processing 15 files
[0321_180138] Itr 4 processing 15 files
[0321_180205] Itr 5 processing 15 files
[0321_180229] Itr 6 processing 15 files
[0321_180250] Itr 7 processing 15 files
[0321_180314] Itr 8 processing 15 files
[0321_180335] Itr 9 processing 15 files
[0321_180401] Itr 10 processing 15 files
[0321_180423] Itr 11 processing 15 files
[0321_180446] Itr 12 processing 15 files
[0321_180512] Itr 13 processing 15 files
[0321_180537] Itr 14 processing 15 files
[0321_180604] Itr 15 processing 15 files
[0321_180624] Itr 16 processing 15 files
[0321_180652] Itr 17 processing 15 files
[0321_180716] Itr 18 processing 15 files
[0321_180735] Itr 19 proc

In [None]:
feature_columns = ['pkt len (byte)', 'src', 'dst', 'TI0', 'TI1', 'TI2', 'TI3', 'load_dst0_0', 'load_dst1_0', 'load_dst2_0', 'load_dst3_0', 'inter_arr_sys']
feature_columns_save_path = os.path.join(data_root, config.modelname, 'feature_columns.pt')
if not os.path.exists(feature_columns_save_path):
    torch.save(feature_columns, feature_columns_save_path)

### Convert CSV -> .h5 files

In [11]:
def load_hdf(file):
    with h5py.File(file, 'r') as hdf:
        x = hdf['x'][:]
        y = hdf['y'][:]
    return x, y

def write_hdf(file, x, y):
    with h5py.File(file, 'w') as hdf:
        hdf['x'] = x
        hdf['y'] = y

def write_hdf2(h, key, x, y):
    h['{}_x'.format(key)] = x
    h['{}_y'.format(key)] = y

def csv_2hdf(data_root: str,
             config: BaseConfig):
    """
    Build timeseries batches for bLSTM and save them in .hdf files
      - split train mode files for train and in-sample testing (test1);
      - save test mode files for out-of-sample testing (test2).
    """

    # @numba.jit
    def split_2hdf(mode, file):
        key = file.split('/')[-1].split('.csv')[0]
        t = pd.read_csv(file)
        # os.remove(file)

        ins = builtseq.build_timeseries(t.values, target_col_index=[-1])
        x, y = ins.timeseries(config.TIME_STEPS)
        "randomly selected part of the them. to represent the config"
        loc = np.random.choice(
            np.arange(len(y)),
            np.max([np.min([14000, len(y)]),
                    int(len(y) * 0.15)]),
            replace=False)
        x = x[loc]
        y = y[loc]
        if mode == 'train':
            x_train, x_test, y_train, y_test = train_test_split(
                x, y, test_size=config.test_size, shuffle=True)
            write_hdf(
                os.path.join(data_root, '{}/_hdf/train/{}.h5'.format(
                    config.modelname, key)), x_train, y_train)
            write_hdf(
                os.path.join(data_root, '{}/_hdf/test1/{}.h5'.format(
                    config.modelname, key)), x_test, y_test)
        else:
            write_hdf(
                os.path.join(data_root, '{}/_hdf/test2/{}.h5'.format(
                    config.modelname, key)), x, y)
    
    # for mode in ['train', 'test']:
    #         folder = os.path.join(data_root, '{}/_traces/{}'.format(config.modelname, mode))
    #         FILES = glob.glob('{}/*.csv'.format(folder))
    #         multi_process(split_2hdf, FILES, args=(mode, ))
    #         # shutil.rmtree(folder)
    
    # Create directories for each split
    hdf_folder = os.path.join(data_root, config.modelname, '_hdf')
    splits = ['train', 'test1', 'test2']
    for split in splits:
        split_folder = os.path.join(hdf_folder, split)
        if os.path.exists(split_folder):
            shutil.rmtree(split_folder)
        os.makedirs(split_folder)
        print("Created directory for {}".format(split_folder))
        
    for mode in ['train', 'test']:
        folder = os.path.join(data_root, '{}/_traces/{}'.format(config.modelname, mode))
        
        
        print("FILES folder: {}".format(folder))
        
        FILES = glob.glob('{}/*.csv'.format(folder))
        multi_process(split_2hdf, FILES, args=(mode, ))
        shutil.rmtree(folder)

In [12]:
train_hdf_path = os.path.join(data_root, config.modelname, '_hdf', 'train')
test1_hdf_path = os.path.join(data_root, config.modelname, '_hdf', 'test1')
test2_hdf_path = os.path.join(data_root, config.modelname, '_hdf', 'test2')
if os.path.exists(train_features_path) and os.path.exists(test1_hdf_path) and os.path.exists(test2_hdf_path):
    print("HD5 already exist at {}, {}, and {}".format(train_features_path, test1_hdf_path, test2_hdf_path))
# else:
csv_2hdf(
    config=config,
    data_root=data_root)

Created directory for /scratch/achen/COS561/deepqueuenet/dqn_data/data/4-port switch/FIFO/_hdf/train
Created directory for /scratch/achen/COS561/deepqueuenet/dqn_data/data/4-port switch/FIFO/_hdf/test1
Created directory for /scratch/achen/COS561/deepqueuenet/dqn_data/data/4-port switch/FIFO/_hdf/test2
FILES folder: /scratch/achen/COS561/deepqueuenet/dqn_data/data/4-port switch/FIFO/_traces/train
in multi_process()
Length of FILES: 1260
[0321_201045] Itr 1 processing 15 files
[0321_201047] Itr 2 processing 15 files
[0321_201049] Itr 3 processing 15 files
[0321_201050] Itr 4 processing 15 files
[0321_201052] Itr 5 processing 15 files
[0321_201054] Itr 6 processing 15 files
[0321_201055] Itr 7 processing 15 files
[0321_201057] Itr 8 processing 15 files
[0321_201058] Itr 9 processing 15 files
[0321_201100] Itr 10 processing 15 files
[0321_201101] Itr 11 processing 15 files
[0321_201103] Itr 12 processing 15 files
[0321_201104] Itr 13 processing 15 files
[0321_201106] Itr 14 processing 15 f

### Perform min/max scaling and save scalers

In [19]:


def load_hdf(file):
    with h5py.File(file, 'r') as hdf:
        x = hdf['x'][:]
        y = hdf['y'][:]
    return x, y
# feature_columns = ['pkt len (byte)', 'src', 'dst', 'TI0', 'TI1', 'TI2', 'TI3', 'load_dst0_0', 'load_dst1_0', 'load_dst2_0', 'load_dst3_0', 'inter_arr_sys']
def min_max(data_root,
            feature_columns,
            config):
    """cal. data_range from the training dataset."""
    src = os.path.join(data_root, '{}/_hdf/train/*.h5'.format(config.modelname))
    print("path: {}".format(src))
    for i, file in enumerate(glob.glob(src)):
        x, y = load_hdf(file)
        xmin = pd.Series(x.min(axis=0).min(axis=0), index=feature_columns)
        xmax = pd.Series(x.max(axis=0).max(axis=0), index=feature_columns)
        ymin = pd.Series(np.array([y.min(axis=0).min(axis=0)]).flatten(),
                         index=target)
        ymax = pd.Series(np.array([y.max(axis=0).max(axis=0)]).flatten(),
                         index=target)
        if i == 0:
            x_MIN = xmin
            x_MAX = xmax
            y_MIN = ymin
            y_MAX = ymax
        else:
            x_MIN = np.minimum(x_MIN, xmin)
            x_MAX = np.maximum(x_MAX, xmax)
            y_MIN = np.minimum(y_MIN, ymin)
            y_MAX = np.maximum(y_MAX, ymax)

    ins = cScaler(x_MIN, x_MAX, 'x', feature_columns,
                  config.no_of_port, config.no_of_buffer)
    ins.cluster()
    ins.save(os.path.join(data_root, '{}/_scaler'.format(config.modelname)))
    cScaler(y_MIN, y_MAX,
            'y').save(os.path.join(data_root, '{}/_scaler'.format(config.modelname)))
                       




In [21]:
scaler_dir = os.path.join(data_root, config.modelname, '_scaler')
if os.path.exists(scaler_dir):
    shutil.rmtree(scaler_dir)
os.makedirs(scaler_dir)
feature_columns_save_path = os.path.join(data_root, config.modelname, 'feature_columns.pt')
feature_columns = torch.load(feature_columns_save_path)

min_max(
    data_root=data_root,
    feature_columns=feature_columns,
    config=config)


path: /scratch/achen/COS561/deepqueuenet/dqn_data/data/4-port switch/FIFO/_hdf/train/*.h5


In [26]:
hd5_file = 'data/dqn_data/data/4-port switch/FIFO/_hdf/train/4port10link1_0.1_trace1.h5'

with h5py.File(hd5_file, "r") as f:
    # Print all root level object names (aka keys) 
    # these can be group or dataset names 
    print("Keys: %s" % f.keys())
    
    a_group_key = list(f.keys())[0]
    b_group_key = list(f.keys())[1]
    print(a_group_key)
    print(f[a_group_key])
    print(f[b_group_key])

Keys: <KeysViewHDF5 ['x', 'y']>
x
<HDF5 dataset "x": shape (11200, 42, 12), type "<f8">
<HDF5 dataset "y": shape (11200, 1), type "<f8">


In [None]:
# Important functions
# featureET.model_input() seems to save training data as tfrecords and merge all the train files together, test files together into 1 h5 file
# featureET.merge_sample merges reads from merged h5 files
# featureET.load_sample loads h5 files
