In [32]:
# Cell 1 - Imports
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras import layers

# Cell 2 - Data Processing Functions
def load_svc_file(file_path):
    """Load and process a single SVC file."""
    # Skip the first row (count) and read the data
    df = pd.read_csv(file_path, skiprows=1, header=None, 
                     delim_whitespace=True)
    
    # Assign column names
    df.columns = ['x', 'y', 'timestamp', 'pen_status', 
                 'pressure', 'azimuth', 'altitude']
    
    # Normalize timestamp to start from 0
    df['timestamp'] = (df['timestamp'] - df['timestamp'].min())
    
    return df

def process_dataset(directory, num_files=None, sequence_length=100):
    """Process all SVC files in directory and prepare sequences for VRNN."""
    # Get list of SVC files
    svc_files = [f for f in os.listdir(directory) if f.endswith('.svc')]
    if num_files:
        svc_files = svc_files[:num_files]
    
    all_sequences = []
    scalers = {}
    
    for file in svc_files:
        file_path = os.path.join(directory, file)
        df = load_svc_file(file_path)
        
        # Create scalers for each feature
        scaler = MinMaxScaler()
        normalized_data = scaler.fit_transform(df[['x', 'y', 'timestamp', 
                                                 'pressure', 'azimuth', 'altitude']])
        
        # Keep pen_status as binary
        normalized_data = np.column_stack((normalized_data, df['pen_status'].values))
        
        # Create sequences
        for i in range(0, len(normalized_data) - sequence_length, sequence_length // 2):
            sequence = normalized_data[i:i + sequence_length]
            if len(sequence) == sequence_length:
                all_sequences.append(sequence)
        
        scalers[file] = scaler
    
    return np.array(all_sequences), scalers

# Cell 3 - Modified VRNN Cell for Clock Drawing
class ClockDrawingVRNNCell(tf.keras.layers.Layer):
    def __init__(self, latent_dim, feature_dim, **kwargs):
        super(ClockDrawingVRNNCell, self).__init__(**kwargs)
        self.latent_dim = latent_dim
        self.feature_dim = 7
        self.rnn = layers.GRUCell(latent_dim)
        
        # Required attributes for RNN cells
        self.state_size = self.rnn.state_size
        self.output_size = (feature_dim, latent_dim, latent_dim, 
                          latent_dim, latent_dim, latent_dim)
        
        # Feature extraction
        self.feature_extractor = keras.Sequential([
            layers.Dense(32, activation='relu'),
            layers.Dense(latent_dim)
        ])
        
        # Prior network
        self.prior_net = keras.Sequential([
            layers.Dense(32, activation='relu'),
            layers.Dense(latent_dim * 2)  # mu and logvar
        ])
        
        # Encoder network
        self.encoder_net = keras.Sequential([
            layers.Dense(32, activation='relu'),
            layers.Dense(latent_dim * 2)  # mu and logvar
        ])
        
        # Decoder network
        self.decoder_net = keras.Sequential([
            layers.Dense(32, activation='relu'),
            layers.Dense(feature_dim * 2)  # mu and logvar for features
        ])

    def reparameterize(self, mu, logvar, feature_dim=None):
        std = tf.exp(0.5 * logvar)
        batch_size = tf.shape(mu)[0]
        # Use feature_dim if provided (for decoder output), otherwise use latent_dim
        dim = feature_dim if feature_dim is not None else self.latent_dim
        eps = tf.random.normal(shape=(batch_size, dim))
        return mu + eps * std
    
    def call(self, inputs, states, training=True):
        h_prev = states[0]  # Get previous hidden state
        
        # Extract features
        x_features = self.feature_extractor(inputs)
        
        # Calculate prior
        prior_params = self.prior_net(h_prev)
        prior_mu, prior_logvar = tf.split(prior_params, 2, axis=-1)
        
        # Calculate posterior
        encoder_input = tf.concat([x_features, h_prev], axis=-1)
        posterior_params = self.encoder_net(encoder_input)
        posterior_mu, posterior_logvar = tf.split(posterior_params, 2, axis=-1)
        
        # Sample latent vector
        if training:
            z = self.reparameterize(posterior_mu, posterior_logvar)
        else:
            z = self.reparameterize(prior_mu, prior_logvar)
        
        # Update RNN state
        rnn_input = tf.concat([x_features, z], axis=-1)
        h_next_state, _ = self.rnn(rnn_input, [h_prev])
        
        # Decode
        decoder_input = tf.concat([z, h_next_state], axis=-1)
        output_params = self.decoder_net(decoder_input)
        output_mu, output_logvar = tf.split(output_params, 2, axis=-1)
        
        # Generate output using the feature dimension
        output = self.reparameterize(output_mu, output_logvar, self.feature_dim)
        
        return (output, z, posterior_mu, prior_mu, 
                posterior_logvar, prior_logvar), [h_next_state]

    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        if batch_size is None:
            batch_size = tf.shape(inputs)[0]
        return [tf.zeros((batch_size, self.latent_dim))]

class ClockDrawingVRNNGAN(tf.keras.Model):
    def __init__(self, feature_dim, latent_dim, sequence_length, **kwargs):
        super(ClockDrawingVRNNGAN, self).__init__(**kwargs)
        self.feature_dim = feature_dim
        self.latent_dim = latent_dim
        self.sequence_length = sequence_length
        
        # Generator (VRNN)
        self.vrnn_cell = ClockDrawingVRNNCell(latent_dim, feature_dim)
        self.generator = layers.RNN(self.vrnn_cell, return_sequences=True)
        
        # Discriminator
        self.discriminator = keras.Sequential([
            layers.LSTM(32, return_sequences=True),
            layers.LSTM(16),
            layers.Dense(1, activation='sigmoid')
        ])
        
        # Tracking metrics
        self.gen_loss_tracker = keras.metrics.Mean(name="generator_loss")
        self.disc_loss_tracker = keras.metrics.Mean(name="discriminator_loss")
        self.recon_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
    
    @property
    def metrics(self):
        return [
            self.gen_loss_tracker,
            self.disc_loss_tracker,
            self.recon_loss_tracker,
            self.kl_loss_tracker
        ]
    
    def compile(self, gen_optimizer, disc_optimizer):
        super().compile()
        self.gen_optimizer = gen_optimizer
        self.disc_optimizer = disc_optimizer
    
    @tf.function
    def train_step(self, data):
        # Unpack data
        real_sequences = data
        batch_size = tf.shape(real_sequences)[0]
        
        # Train discriminator
        with tf.GradientTape() as disc_tape:
            # Generate fake sequences
            gen_outputs = self.generator(real_sequences[:, :-1, :], training=True)
            fake_sequences = gen_outputs[0]
            
            # Get discriminator predictions
            real_preds = self.discriminator(real_sequences, training=True)
            fake_preds = self.discriminator(fake_sequences, training=True)
            
            # Calculate discriminator loss
            disc_real_loss = tf.reduce_mean(
                tf.keras.losses.binary_crossentropy(tf.ones_like(real_preds), real_preds)
            )
            disc_fake_loss = tf.reduce_mean(
                tf.keras.losses.binary_crossentropy(tf.zeros_like(fake_preds), fake_preds)
            )
            disc_loss = (disc_real_loss + disc_fake_loss) / 2
        
        # Apply discriminator gradients
        disc_gradients = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)
        self.disc_optimizer.apply_gradients(zip(disc_gradients, self.discriminator.trainable_variables))
        
        # Train generator
        with tf.GradientTape() as gen_tape:
            # Generate fake sequences
            gen_outputs = self.generator(real_sequences, training=True)
            fake_sequences = gen_outputs[0]
            fake_preds = self.discriminator(fake_sequences, training=True)
            
            # Calculate losses
            posterior_mu = gen_outputs[2]
            prior_mu = gen_outputs[3]
            posterior_logvar = gen_outputs[4]
            prior_logvar = gen_outputs[5]
            
            # KL divergence loss
            kl_loss = 0.5 * tf.reduce_mean(
                prior_logvar - posterior_logvar + 
                (tf.exp(posterior_logvar) + tf.square(posterior_mu - prior_mu)) / 
                tf.exp(prior_logvar) - 1
            )
            
            # Reconstruction loss
            recon_loss = tf.reduce_mean(
                tf.reduce_sum(tf.square(real_sequences - fake_sequences), axis=-1)
            )
            
            # Generator adversarial loss
            gen_loss = tf.reduce_mean(
                tf.keras.losses.binary_crossentropy(tf.ones_like(fake_preds), fake_preds)
            )
            
            # Total generator loss
            total_gen_loss = gen_loss + recon_loss + 0.1 * kl_loss
        
        # Apply generator gradients
        gen_gradients = gen_tape.gradient(total_gen_loss, self.generator.trainable_variables)
        self.gen_optimizer.apply_gradients(zip(gen_gradients, self.generator.trainable_variables))
        
        # Update metrics
        self.gen_loss_tracker.update_state(gen_loss)
        self.disc_loss_tracker.update_state(disc_loss)
        self.recon_loss_tracker.update_state(recon_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "gen_loss": self.gen_loss_tracker.result(),
            "disc_loss": self.disc_loss_tracker.result(),
            "recon_loss": self.recon_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    
    def generate(self, num_samples):
        # Generate random initial input
        initial_input = tf.random.normal([num_samples, 1, self.feature_dim])
        
        # Generate sequence
        generated = []
        current_input = initial_input
        state = [tf.zeros((num_samples, self.latent_dim))]
        
        for _ in range(self.sequence_length):
            outputs, state = self.vrnn_cell(current_input[:, 0], state, training=False)
            generated.append(outputs[0])
            current_input = tf.expand_dims(outputs[0], 1)
        
        return tf.concat(generated, axis=1)


# Cell 5 - Training Setup and Execution
def train_model(data_dir, num_files=None, sequence_length=100, 
                latent_dim=32, epochs=100, batch_size=32):
    # Process data
    sequences, scalers = process_dataset(data_dir, num_files, sequence_length)
    feature_dim = sequences.shape[-1]
    
    # Create and compile model
    model = ClockDrawingVRNNGAN(
        feature_dim=feature_dim,
        latent_dim=latent_dim,
        sequence_length=sequence_length
    )
    
    model.compile(
        gen_optimizer=tf.keras.optimizers.Adam(1e-4),
        disc_optimizer=tf.keras.optimizers.Adam(1e-4)
    )
    
    # Train model
    history = model.fit(
        sequences, epochs=epochs, batch_size=batch_size,
        callbacks=[
            tf.keras.callbacks.EarlyStopping(
                monitor='recon_loss',
                patience=10,
                restore_best_weights=True
            )
        ]
    )
    
    return model, scalers, history

# Cell 6 - Generation and Visualization
def generate_and_visualize(model, scalers, num_samples=1):
    # Generate sequences
    generated_sequences = model.generate(num_samples)
    generated_sequences = tf.reshape(generated_sequences, [num_samples, -1, model.feature_dim]).numpy()

    
    # Inverse transform the sequences using the first scaler
    scaler = list(scalers.values())[0]
    
    plt.figure(figsize=(15, 5))
    for i in range(num_samples):
        sequence = generated_sequences[i]
        
        # Separate pen_status
        pen_status = sequence[:, -1] > 0.5
        
        # Inverse transform coordinates
        coords = sequence[:, :6]  # x, y, timestamp, pressure, azimuth, altitude
        coords = scaler.inverse_transform(coords)
        
        plt.subplot(1, num_samples, i + 1)
        plt.scatter(coords[pen_status, 0], -coords[pen_status, 1], 
                   c='blue', s=1, alpha=0.7, label='On Paper')
        plt.scatter(coords[~pen_status, 0], -coords[~pen_status, 1], 
                   c='red', s=1, alpha=0.7, label='In Air')
        plt.title(f'Generated Drawing {i + 1}')
        plt.xlabel('x')
        plt.ylabel('-y')
        plt.legend()
        plt.axis('equal')
    
    plt.tight_layout()
    plt.show()
    
    return generated_sequences

In [33]:
# Training the model

model, scalers, history = train_model(
    data_dir='Task2',
    num_files=2,
    sequence_length=100,
    epochs=10,
    batch_size=32
)

# Generate synthetic drawings
generated_sequences = generate_and_visualize(model, scalers, num_samples=3)

  df = pd.read_csv(file_path, skiprows=1, header=None,
  df = pd.read_csv(file_path, skiprows=1, header=None,


Epoch 1/10




ValueError: in user code:

    File "C:\Users\annek\AppData\Local\Temp\ipykernel_10452\2944670965.py", line 192, in train_step  *
        gen_outputs = self.generator(real_sequences[:, :-1, :], training=True)
    File "C:\Users\annek\AppData\Roaming\Python\Python312\site-packages\keras\src\utils\traceback_utils.py", line 122, in error_handler  **
        raise e.with_traceback(filtered_tb) from None

    ValueError: Exception encountered when calling RNN.call().
    
    [1mDimension 1 in both shapes must be equal, but are 7 and 32. Shapes are [?,7] and [?,32].
    	From merging shape 0 with other shapes. for '{{node rnn_10_1/Cast/x}} = Pack[N=6, T=DT_FLOAT, axis=0](rnn_10_1/strided_slice_3, rnn_10_1/strided_slice_4, rnn_10_1/strided_slice_5, rnn_10_1/strided_slice_6, rnn_10_1/strided_slice_7, rnn_10_1/strided_slice_8)' with input shapes: [?,7], [?,32], [?,32], [?,32], [?,32], [?,32].[0m
    
    Arguments received by RNN.call():
      • sequences=tf.Tensor(shape=(None, 99, 7), dtype=float32)
      • initial_state=None
      • mask=None
      • training=True
