In [3]:
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split
data_folder = '../data'

In [4]:
# create a data loader
def get_data_pipeline(path_to_trajectories, path_to_observations, test_size=0.2):
    trajectories = np.load(path_to_trajectories)
    observations = np.load(path_to_observations)
    train_obs, test_obs, train_true, test_true  = train_test_split(observations, trajectories, test_size=test_size)
    train_dataset = tf.data.Dataset.from_tensor_slices((train_obs, train_true))
    test_dataset = tf.data.Dataset.from_tensor_slices((test_obs, test_true))
    return train_dataset, test_dataset

In [33]:
train, test = get_data_pipeline('{}/trajectories.npy'.format(data_folder), '{}/observations.npy'.format(data_folder), 0.2)

In [34]:
print(train)

<TensorSliceDataset shapes: ((32, 2), (32, 3)), types: (tf.float64, tf.float64)>


In [46]:
def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(tf.keras.layers.Conv1D(filters, size, strides=2, padding='same',kernel_initializer=initializer, use_bias=False))
    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())
    result.add(tf.keras.layers.LeakyReLU())
    return result

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(
    tf.keras.layers.Conv1DTranspose(filters, size, strides=2,
                                    padding='same',
                                    kernel_initializer=initializer,
                                    use_bias=False))
    result.add(tf.keras.layers.BatchNormalization())
    if apply_dropout:
        result.add(tf.keras.layers.Dropout(0.5))
    result.add(tf.keras.layers.ReLU())
    return result

def generator():
    inputs = tf.keras.layers.Input(shape=[32, 2])

    down_stack = [
    downsample(32, 4, apply_batchnorm=False),  # (batch_size, 16, 32)
    downsample(64, 4),  # (batch_size, 8, 64)
    #downsample(128, 4),  # (batch_size, 4, 128)
    #downsample(256, 4),  # (batch_size, 2, 256)
    #downsample(512, 4),  # (batch_size, 1, 512)
    ]

    up_stack = [
    #upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 1024)
    #upsample(256, 4, apply_dropout=True),  # (batch_size, 4, 512)
    #upsample(128, 4, apply_dropout=True),  # (batch_size, 8, 256)
    upsample(64, 4),  # (batch_size, 16, 128)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv1DTranspose(3, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh')  # (batch_size, 256, 256, 3)

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])

    x = last(x)
    return tf.keras.Model(inputs=inputs, outputs=x)

def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)

    inp = tf.keras.layers.Input(shape=[32, 3], name='input_signal')
    tar = tf.keras.layers.Input(shape=[32, 3], name='target_signal')

    x = tf.keras.layers.concatenate([inp, tar])  # (batch_size, 32, channels*2)

    down1 = downsample(32, 4, False)(x)  # (batch_size, 16, 32)
    down2 = downsample(64, 4)(down1)  # (batch_size, 8, 64)
    #down3 = downsample(256, 4)(down2)  # (batch_size, 32, 32, 256)

    zero_pad1 = tf.keras.layers.ZeroPadding2D()(down2)  # (batch_size, 10, 64)
    conv = tf.keras.layers.Conv1D(128, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1)  # (batch_size, 5, 128)

    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)
    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)
    zero_pad2 = tf.keras.layers.ZeroPadding1D()(leaky_relu)  # (batch_size, 7, 128)

    last = tf.keras.layers.Conv1D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2)  # (batch_size, 3, 1)

    return tf.keras.Model(inputs=[inp, tar], outputs=last)

def get_simple_model():
    pass

In [41]:
down_model = downsample(32, 4)
x = list(train)[0][0]
x = x[np.newaxis, :, :]
down_result = down_model(x)
print (down_result.shape)

(1, 16, 32)


In [36]:
up_model = upsample(30, 4)
up_result = up_model(down_result)
print (up_result.shape)

(1, 32, 30)


In [47]:
nn = generator()
y = nn(x)
print(y.shape)

(1, 32, 3)


In [48]:
nn.summary()

Model: "model_2"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_6 (InputLayer)            [(None, 32, 2)]      0                                            
__________________________________________________________________________________________________
sequential_60 (Sequential)      (None, 16, 32)       256         input_6[0][0]                    
__________________________________________________________________________________________________
sequential_61 (Sequential)      (None, 8, 64)        8448        sequential_60[0][0]              
__________________________________________________________________________________________________
sequential_62 (Sequential)      (None, 16, 64)       16640       sequential_61[0][0]              
____________________________________________________________________________________________