## Generating Synthetic Financial Data using GANs

Implementation of the TimeGAN architecture in TensorFlow 2.0

In [None]:
import numpy as np
import tensorflow as tf

In [None]:
class Autoencoder(tf.keras.Model):
    def __init__(self, seq_len, n_features, latent_dim):
        super(Autoencoder, self).__init__()
        self.seq_len = seq_len
        self.n_features = n_features
        self.latent_dim = latent_dim
        
        # Encoder model
        self.encoder = tf.keras.Sequential([
            tf.keras.layers.LSTM(64, input_shape=(seq_len, n_features)),
            tf.keras.layers.Dense(latent_dim),
        ])
        
        # Decoder model
        self.decoder = tf.keras.Sequential([
            tf.keras.layers.Dense(64, input_shape=(latent_dim,)),
            tf.keras.layers.RepeatVector(seq_len),
            tf.keras.layers.LSTM(64, return_sequences=True),
            tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(n_features)),
        ])
        
    def call(self, x):
        # Encode input x
        encoded = self.encoder(x)
        # Decode encoded input
        decoded = self.decoder(encoded)
        # Return decoded input
        return decoded


class Discriminator(tf.keras.Model):
    def __init__(self, seq_len, n_features):
        super(Discriminator, self).__init__()
        self.seq_len = seq_len
        self.n_features = n_features
        
        # Bidirectional LSTM model
        self.rnn = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True))
        self.fc = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1))
        self.activation = tf.keras.activations.sigmoid
        
    def call(self, x):
        # Pass input x through the RNN
        x = self.rnn(x)
        # Apply fully connected layer
        x = self.fc(x)
        # Apply activation function
        x = self.activation(x)
        # Return output
        return x


class Generator(tf.keras.Model):
    def __init__(self, seq_len, latent_dim, n_features):
        super(Generator, self).__init__()
        self.seq_len = seq_len
        self.latent_dim = latent_dim
        self.n_features = n_features
        
        # LSTM model
        self.rnn = tf.keras.layers.LSTM(64, input_shape=(seq_len, latent_dim), return_sequences=True)
        self.fc = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(n_features))
        
    def call(self, x):
        # Pass input x through the LSTM
        x = self.rnn(x)
        # Apply fully connected layer
        x = self.fc(x)
        # Return output
        return x

The **TimeGAN model** is a generative model that can be used to generate synthetic time series data.

The code defines three different models: **Autoencoder, Discriminator, and Generator**. The Autoencoder is used to reconstruct the input data, the Discriminator is used to distinguish between real and fake data, and the Generator is used to generate new synthetic data.



In [None]:
class TimeGAN(tf.keras.Model):
    def __init__(self, seq_len, n_features, latent_dim):
        super(TimeGAN, self).__init__()
        self.seq_len = seq_len
        self.n_features = n_features
        self.latent_dim = latent_dim
        
        # Instantiate models
        self.generator = Generator(seq_len, latent_dim, n_features)
        self.discriminator = Discriminator(seq_len, n_features)
        self.autoencoder = Autoencoder(seq_len, n_features, latent_dim)
        
        # Define optimizers
        self.generator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        self.discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        
        # Define loss functions
        self.reconstruction_loss_fn = tf.keras.losses.MeanSquaredError()
        self.adversarial_loss_fn = tf.keras.losses.BinaryCrossentropy()
        
    def compile(self, reconstruction_weight=1.0, adversarial_weight=1.0):
        super(TimeGAN, self).compile()
        self.reconstruction_weight = reconstruction_weight
        self.adversarial_weight = adversarial_weight
    
    def train_step(self, x):
        real_x = x

        # Train generator
        with tf.GradientTape() as gen_tape:
            z = tf.random.normal((tf.shape(real_x)[0], self.seq_len, self.latent_dim))
            fake_x = self.generator(z)
            fake_discriminator_output = self.discriminator(fake_x)
            gen_loss = self.adversarial_weight * self.adversarial_loss_fn(tf.ones_like(fake_discriminator_output), fake_discriminator_output)
            reconstructed_x = self.autoencoder(fake_x)
            recon_loss = self.reconstruction_weight * self.reconstruction_loss_fn(real_x, reconstructed_x)
            gen_loss += recon_loss
        gen_gradients = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
        self.generator_optimizer.apply_gradients(zip(gen_gradients, self.generator.trainable_variables))

        # Train discriminator
        with tf.GradientTape() as disc_tape:
            real_discriminator_output = self.discriminator(real_x)
            fake_discriminator_output = self.discriminator(fake_x)
            disc_loss = self.adversarial_weight * (
                self.adversarial_loss_fn(tf.ones_like(real_discriminator_output), real_discriminator_output) +
                self.adversarial_loss_fn(tf.zeros_like(fake_discriminator_output), fake_discriminator_output)
            )
        disc_gradients = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)
        self.discriminator_optimizer.apply_gradients(zip(disc_gradients, self.discriminator.trainable_variables))

        return {"gen_loss": gen_loss, "disc_loss": disc_loss}

In the **compile** method, we are simply initializing the reconstruction_weight and adversarial_weight attributes of the class with the given values.

In the **train_step** method, we first define real_x as the input data x. We then train the generator by first generating fake data by passing random noise z through the generator, and computing the output of the discriminator on the fake data. We compute the generator loss as a weighted sum of the adversarial loss and the reconstruction loss, where the reconstruction loss measures the difference between the real and reconstructed data. We then compute the gradients of the generator loss with respect to the generator's trainable variables, and apply these gradients using the generator optimizer.

We then train the discriminator by computing the output of the discriminator on the real and fake data, and computing the adversarial loss as a weighted sum of the losses on the real and fake data. We then compute the gradients of the discriminator loss with respect to the discriminator's trainable variables, and apply these gradients using the discriminator optimizer.

Finally, we return a dictionary containing the generator loss and discriminator loss.

In [None]:
# Define the parameters for the synthetic data
seq_len = 50  # sequence length
n_features = 1  # number of features in the time series
latent_dim = 10  # dimension of the latent space
n_samples = 1000  # number of samples to generate

# Initialize the TimeGAN model
model = TimeGAN(seq_len=seq_len, n_features=n_features, latent_dim=latent_dim)

# Compile the model
model.compile(reconstruction_weight=1.0, adversarial_weight=1.0)

# Generate synthetic data
z = tf.random.normal((n_samples, seq_len, latent_dim))
generated_data = model.generator(z).numpy()

# Reshape the generated data to match the desired shape
generated_data = np.reshape(generated_data, (n_samples, seq_len, n_features))

Now, to generate synthetic data we first define the parameters for the synthetic data we want to generate, including the sequence length, number of features, latent dimension, and number of samples. We then initialize a TimeGAN model with these parameters, compile it, and generate synthetic data using the generator component of the model.

Finally, we reshape the generated data to match the desired shape (i.e., a 3D tensor with shape (n_samples, seq_len, n_features)).