In [1]:
import pickle
import os
import random
import numpy as np
from pathlib import Path
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras

2022-02-18 00:07:34.615092: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


# data pre-processing

In [7]:
def wh_ETL(dir_path, idx, mode='train', feature_range=(0, 1), pickle_save_dir='model_save/test1/pickles'):
    # load from folder
    whs = list(Path(dir_path).glob('*.npy'))
    whs = np.stack([np.load(path) for path in whs])
    print(whs.shape)
    nb_test_samples = int(0.2 * whs.shape[0])
    whs = whs[idx]
    if mode is 'train':
        whs = whs[2*nb_test_samples:]
    elif mode is 'val':
        whs = whs[0:nb_test_samples]
    else:
        whs = whs[nb_test_samples:2*nb_test_samples]    
    
    # move sample axis to the last, reshape, and remove the first layer
    whs = np.moveaxis(whs, 1, 3).reshape(-1, 34)[:, 1:]
    
    # Min Max normalization
    scaler = MinMaxScaler(feature_range=feature_range)
    scaled_whs = scaler.fit_transform(whs)
    
    # PCA
    pca = PCA(n_components=0.96, svd_solver='full')
    out = pca.fit_transform(scaled_whs)
    
    # save scaler and pca as pickles
    if mode is 'train':
        Path(pickle_save_dir).mkdir(parents=True, exist_ok=True)
        pickle.dump(scaler, open(f'{pickle_save_dir}/scaler.pkl', 'wb'))
        pickle.dump(pca, open(f'{pickle_save_dir}/pca.pkl', 'wb'))
    return out, pca


def load_X(dir_path):
    Vars = []
    names = [p.name for p in Path(X_path).glob('*/')]
    for name in names:
        path = Path(X_path).joinpath(name)
        data = np.stack([np.load(p) for p in path.glob('*.npy')])
        if name == 'cape':
            data = data[:, np.newaxis, ..., np.newaxis]
            data = np.concatenate([data]*34, axis=1)
        else:
            data = data[..., np.newaxis]
        Vars.append(data)
    X = np.concatenate(Vars, axis=-1)[:, 1:, ...]
    X = np.moveaxis(X, 1, -2)

    return X.astype(np.float32)


def pad_boundary(arr, kernel_size):
    '''
    arr : input array
    kernel_size : training sample size
    '''

    pad_size = int((kernel_size - 1) / 2)

    if len(arr.shape) == 5:
        pad_arr = np.pad(arr, pad_size, 'wrap')
        pad_arr = pad_arr[pad_size:-pad_size, ..., pad_size:-pad_size, pad_size:-pad_size]
    else:
        raise ValueError('len(arr.shape) should be 5')
    return pad_arr


def X_ETL(path, idx, mode='train'):
    # load_X and select train/val/test set due to lack of memory
    X = load_X(path)
    nb_test_samples = int(0.2 * X.shape[0])
    
    print(X.shape)
    X = X[idx]
    if mode is 'train':
        X = X[2*nb_test_samples:]
    elif mode is 'val':
        X = X[0:nb_test_samples]
    else:
        X = X[nb_test_samples:2*nb_test_samples]
    
    #X = pad_boundary(X, 7)
    #X = np.lib.stride_tricks.sliding_window_view(X, (7, 7), axis=(1, 2))
    #X = np.moveaxis(X, 4, -1)
    # return X.reshape(-1, 33, 7, 7, 9)
    return X

In [3]:
# split data
seed = 777
idx = np.arange(1314)
random.seed(seed)
random.shuffle(idx)
nb_test_samples = int(0.2 * idx.shape[0])

In [18]:
# y preprocess
wh_path = 'data/target/wh/'
pickle_save_dir='model_save/test1/pickles'
mode = 'train'
y, pca = wh_ETL(wh_path, idx, mode=mode, pickle_save_dir=pickle_save_dir)

(1314, 34, 32, 32)


In [21]:
pca = pickle.load(open('model_save/test1/pickles/pca.pkl', 'rb'),)

In [22]:
pca.inverse_transform(y).shape

(808960, 33)

In [9]:
# X preprocess
mode = 'test'
X_path = 'data/vars/'
X = X_ETL(X_path, idx, mode=mode)

(1314, 32, 32, 33, 9)


In [12]:
names = [p.name for p in Path(X_path).glob('*/')]
names

['cape', 'mfd', 'mse', 'q', 't', 'u', 'v', 'w', 'z']

In [13]:
np.save('X_test_ori.npy', X)

In [12]:
X.shape, y.shape

((268288, 33, 7, 7, 9), (268288, 5))

# save as tf record dataset

In [74]:
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))): # if value ist tensor
        value = value.numpy() # get value of tensor
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a floast_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_array(array):
    array = tf.io.serialize_tensor(array)
    return array

def parse_single_data(feature, label):
    #define the dictionary -- the structure -- of our single example
    data = {
        'feature' : _bytes_feature(tf.io.serialize_tensor(feature).numpy()),
        'label' : _bytes_feature(tf.io.serialize_tensor(label).numpy())
    }

    out = tf.train.Example(features=tf.train.Features(feature=data))
    return out

def write_data_to_tfr_short(datas, labels, filename:str="data"):
    filename= filename+".tfrecords"
    writer = tf.io.TFRecordWriter(filename) #create a writer that'll store our data to disk
    count = 0

    for index in range(len(datas)):

        #get the data we want to write
        current_data = datas[index]
        current_label = labels[index]

        out = parse_single_data(feature=current_data, label=current_label)
        writer.write(out.SerializeToString())
        count += 1

    writer.close()
    print(f"Wrote {count} elements to TFRecord")
    return count

def _parse_data_function(example_proto):
    data_feature_description = {
        'feature' : tf.io.FixedLenFeature([], tf.string),
        'label' : tf.io.FixedLenFeature([], tf.string)
    }

    # Parse the input tf.train.Example proto using the dictionary above.
    features = tf.io.parse_single_example(example_proto, data_feature_description)
    features['feature'] = tf.io.parse_tensor(features['feature'], 'float')
    features['label'] = tf.io.parse_tensor(features['label'], 'double')
    return features


In [75]:
# write data to TFRecord
# write_data_to_tfr_short(X, y, filename='test_dataset')

# read TFRecord
raw_dataset = tf.data.TFRecordDataset('test_dataset.tfrecords')
parsed_dataset = raw_dataset.map(_parse_data_function)

In [79]:
y.dtype

dtype('float64')

In [83]:
for data in parsed_dataset.take(10):
    print(data['feature'].shape, data['label'].shape)

(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
(33, 7, 7, 9) (5,)
