In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers

class VAE(keras.Model):
    def __init__(self, input_dim=9, hidden_dim=7, latent_dim=2):
        super(VAE, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim
        
        # Build encoder and decoder
        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()
        
    def build_encoder(self):
        """Build the encoder network: input(9) -> hidden(7) -> latent(2)"""
        inputs = keras.Input(shape=(self.input_dim,))
        
        # Hidden layer
        hidden = layers.Dense(self.hidden_dim, activation="relu")(inputs)
        
        # Mean and log variance for latent space
        z_mean = layers.Dense(self.latent_dim, name="z_mean")(hidden)
        z_log_var = layers.Dense(self.latent_dim, name="z_log_var")(hidden)
        
        return keras.Model(inputs, [z_mean, z_log_var], name="encoder")
    
    def build_decoder(self):
        """Build the decoder network: latent(2) -> hidden(7) -> output(9)"""
        latent_inputs = keras.Input(shape=(self.latent_dim,))
        
        # Hidden layer
        hidden = layers.Dense(self.hidden_dim, activation="relu")(latent_inputs)
        
        # Output layer (sigmoid for values between 0 and 1)
        outputs = layers.Dense(self.input_dim, activation="sigmoid")(hidden)
        
        return keras.Model(latent_inputs, outputs, name="decoder")
    
    def reparameterize(self, z_mean, z_log_var):
        """Reparameterization trick: sample from N(mu, sigma) using N(0,1)"""
        batch_size = tf.shape(z_mean)[0]
        epsilon = tf.random.normal(shape=(batch_size, self.latent_dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    def call(self, inputs):
        """Forward pass through the VAE"""
        z_mean, z_log_var = self.encoder(inputs)
        z = self.reparameterize(z_mean, z_log_var)
        reconstructed = self.decoder(z)
        return reconstructed, z_mean, z_log_var
    
    def encode(self, x):
        """Encode input to latent space"""
        z_mean, z_log_var = self.encoder(x)
        return self.reparameterize(z_mean, z_log_var)
    
    def decode(self, z):
        """Decode from latent space"""
        return self.decoder(z)

class VAETrainer:
    def __init__(self, vae, optimizer=None):
        self.vae = vae
        self.optimizer = optimizer or keras.optimizers.Adam(1e-3)
        
        # Metrics
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
    
    def compute_loss(self, x):
        """Compute VAE loss: reconstruction loss + KL divergence"""
        reconstructed, z_mean, z_log_var = self.vae(x)
        
        # Reconstruction loss (mean squared error for continuous data)
        reconstruction_loss = tf.reduce_mean(
            tf.reduce_sum(tf.square(x - reconstructed), axis=1)
        )
        
        # KL divergence loss
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        
        total_loss = reconstruction_loss + kl_loss
        return total_loss, reconstruction_loss, kl_loss
    
    @tf.function
    def train_step(self, x):
        """Single training step"""
        with tf.GradientTape() as tape:
            total_loss, reconstruction_loss, kl_loss = self.compute_loss(x)
        
        gradients = tape.gradient(total_loss, self.vae.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.vae.trainable_variables))
        
        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    
    def train(self, dataset, epochs=100, verbose=1):
        """Train the VAE"""
        for epoch in range(epochs):
            # Reset metrics
            self.total_loss_tracker.reset_states()
            self.reconstruction_loss_tracker.reset_states()
            self.kl_loss_tracker.reset_states()
            
            # Training loop
            for step, x_batch in enumerate(dataset):
                metrics = self.train_step(x_batch)
            
            if verbose and (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch + 1}/{epochs}")
                print(f"Loss: {metrics['loss']:.4f}, "
                      f"Reconstruction: {metrics['reconstruction_loss']:.4f}, "
                      f"KL: {metrics['kl_loss']:.4f}")

def generate_synthetic_data(n_samples=1000):
    """Generate synthetic 9-dimensional data for demonstration"""
    np.random.seed(42)
    
    # Create two clusters in 9D space
    cluster1 = np.random.multivariate_normal(
        mean=[0.3, 0.7, 0.2, 0.8, 0.1, 0.9, 0.4, 0.6, 0.5],
        cov=np.eye(9) * 0.05,
        size=n_samples // 2
    )
    
    cluster2 = np.random.multivariate_normal(
        mean=[0.8, 0.2, 0.9, 0.1, 0.7, 0.3, 0.6, 0.4, 0.5],
        cov=np.eye(9) * 0.05,
        size=n_samples // 2
    )
    
    # Combine and normalize to [0, 1]
    data = np.vstack([cluster1, cluster2])
    data = np.clip(data, 0, 1)  # Ensure values are in [0, 1]
    
    # Create labels for visualization
    labels = np.concatenate([np.zeros(n_samples // 2), np.ones(n_samples // 2)])
    
    return data.astype(np.float32), labels.astype(int)

def plot_latent_space(vae, data, labels=None, title="Latent Space Representation"):
    """Plot the 2D latent space representation"""
    # Encode to latent space
    z_mean, _ = vae.encoder(data)
    
    plt.figure(figsize=(8, 6))
    if labels is not None:
        scatter = plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels, cmap='viridis', alpha=0.6)
        plt.colorbar(scatter, label='Cluster')
    else:
        plt.scatter(z_mean[:, 0], z_mean[:, 1], alpha=0.6)
    
    plt.xlabel("Latent Dimension 1")
    plt.ylabel("Latent Dimension 2")
    plt.title(title)
    plt.grid(True, alpha=0.3)
    plt.show()

def plot_reconstructions(vae, data, n_samples=5):
    """Plot original vs reconstructed vectors"""
    indices = np.random.choice(len(data), n_samples, replace=False)
    x_sample = data[indices]
    reconstructions, _, _ = vae(x_sample)
    
    fig, axes = plt.subplots(2, n_samples, figsize=(15, 6))
    
    for i in range(n_samples):
        # Original
        axes[0, i].bar(range(9), x_sample[i])
        axes[0, i].set_title(f"Original {i+1}")
        axes[0, i].set_ylim(0, 1)
        axes[0, i].set_xticks(range(9))
        
        # Reconstruction
        axes[1, i].bar(range(9), reconstructions[i])
        axes[1, i].set_title(f"Reconstructed {i+1}")
        axes[1, i].set_ylim(0, 1)
        axes[1, i].set_xticks(range(9))
    
    plt.tight_layout()
    plt.show()

def plot_generated_samples(vae, n_samples=5):
    """Generate and plot new samples from random latent vectors"""
    # Sample random points in latent space
    random_latent = tf.random.normal(shape=(n_samples, vae.latent_dim))
    generated_samples = vae.decode(random_latent)
    
    plt.figure(figsize=(15, 3))
    for i in range(n_samples):
        plt.subplot(1, n_samples, i + 1)
        plt.bar(range(9), generated_samples[i])
        plt.title(f"Generated {i+1}")
        plt.ylim(0, 1)
        plt.xticks(range(9))
    
    plt.suptitle("Generated Samples from Random Latent Vectors")
    plt.tight_layout()
    plt.show()

def interpolate_in_latent_space(vae, data, n_steps=5):
    """Interpolate between two points in latent space"""
    # Get two random samples
    indices = np.random.choice(len(data), 2, replace=False)
    x1, x2 = data[indices[0]:indices[0]+1], data[indices[1]:indices[1]+1]
    
    # Encode to latent space
    z1_mean, _ = vae.encoder(x1)
    z2_mean, _ = vae.encoder(x2)
    
    # Interpolate
    alphas = np.linspace(0, 1, n_steps)
    interpolated_z = []
    for alpha in alphas:
        z_interp = (1 - alpha) * z1_mean + alpha * z2_mean
        interpolated_z.append(z_interp)
    
    interpolated_z = tf.concat(interpolated_z, axis=0)
    interpolated_samples = vae.decode(interpolated_z)
    
    # Plot interpolation
    plt.figure(figsize=(15, 3))
    for i in range(n_steps):
        plt.subplot(1, n_steps, i + 1)
        plt.bar(range(9), interpolated_samples[i])
        plt.title(f"α = {alphas[i]:.2f}")
        plt.ylim(0, 1)
        plt.xticks(range(9))
    
    plt.suptitle("Latent Space Interpolation")
    plt.tight_layout()
    plt.show()

def print_model_summary(vae):
    """Print model architecture summary"""
    print("VAE Architecture:")
    print("=" * 50)
    print(f"Input dimension: {vae.input_dim}")
    print(f"Hidden dimension: {vae.hidden_dim}")
    print(f"Latent dimension: {vae.latent_dim}")
    print("\nEncoder:")
    vae.encoder.summary()
    print("\nDecoder:")
    vae.decoder.summary()

# Example usage
if __name__ == "__main__":
    # Generate synthetic data
    print("Generating synthetic 9D data...")
    data, labels = generate_synthetic_data(n_samples=2000)
    
    # Split into train/test
    train_size = int(0.8 * len(data))
    x_train, x_test = data[:train_size], data[train_size:]
    y_train, y_test = labels[:train_size], labels[train_size:]
    
    # Create dataset
    batch_size = 64
    train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
    train_dataset = train_dataset.shuffle(buffer_size=1000).batch(batch_size)
    
    # Create VAE
    vae = VAE(input_dim=9, hidden_dim=7, latent_dim=2)
    print_model_summary(vae)
    
    # Train VAE
    trainer = VAETrainer(vae)
    print("\nTraining VAE...")
    trainer.train(train_dataset, epochs=100)
    
    # Visualizations
    print("\nPlotting reconstructions...")
    plot_reconstructions(vae, x_test)
    
    print("Plotting generated samples...")
    plot_generated_samples(vae)
    
    print("Plotting latent space...")
    plot_latent_space(vae, x_test, y_test)
    
    print("Plotting latent space interpolation...")
    interpolate_in_latent_space(vae, x_test)

ModuleNotFoundError: No module named 'tensorflow.python'

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers

class VAE(keras.Model):
    def __init__(self, latent_dim=32, input_shape=(28, 28, 1)):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim
        self.input_shape = input_shape
        
        # Build encoder and decoder
        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()
        
    def build_encoder(self):
        """Build the encoder network"""
        inputs = keras.Input(shape=self.input_shape)
        
        # Convolutional layers
        x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(inputs)
        x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Flatten()(x)
        x = layers.Dense(16, activation="relu")(x)
        
        # Mean and log variance for latent space
        z_mean = layers.Dense(self.latent_dim, name="z_mean")(x)
        z_log_var = layers.Dense(self.latent_dim, name="z_log_var")(x)
        
        return keras.Model(inputs, [z_mean, z_log_var], name="encoder")
    
    def build_decoder(self):
        """Build the decoder network"""
        latent_inputs = keras.Input(shape=(self.latent_dim,))
        x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
        x = layers.Reshape((7, 7, 64))(x)
        
        # Transpose convolutional layers
        x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
        
        # Output layer
        decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
        
        return keras.Model(latent_inputs, decoder_outputs, name="decoder")
    
    def reparameterize(self, z_mean, z_log_var):
        """Reparameterization trick: sample from N(mu, sigma) using N(0,1)"""
        batch_size = tf.shape(z_mean)[0]
        epsilon = tf.random.normal(shape=(batch_size, self.latent_dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    def call(self, inputs):
        """Forward pass through the VAE"""
        z_mean, z_log_var = self.encoder(inputs)
        z = self.reparameterize(z_mean, z_log_var)
        reconstructed = self.decoder(z)
        return reconstructed, z_mean, z_log_var
    
    def encode(self, x):
        """Encode input to latent space"""
        z_mean, z_log_var = self.encoder(x)
        return self.reparameterize(z_mean, z_log_var)
    
    def decode(self, z):
        """Decode from latent space"""
        return self.decoder(z)

class VAETrainer:
    def __init__(self, vae, optimizer=None):
        self.vae = vae
        self.optimizer = optimizer or keras.optimizers.Adam(1e-4)
        
        # Metrics
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
    
    def compute_loss(self, x):
        """Compute VAE loss: reconstruction loss + KL divergence"""
        reconstructed, z_mean, z_log_var = self.vae(x)
        
        # Reconstruction loss (binary crossentropy)
        reconstruction_loss = tf.reduce_mean(
            tf.reduce_sum(
                keras.losses.binary_crossentropy(x, reconstructed), axis=(1, 2)
            )
        )
        
        # KL divergence loss
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        
        total_loss = reconstruction_loss + kl_loss
        return total_loss, reconstruction_loss, kl_loss
    
    @tf.function
    def train_step(self, x):
        """Single training step"""
        with tf.GradientTape() as tape:
            total_loss, reconstruction_loss, kl_loss = self.compute_loss(x)
        
        gradients = tape.gradient(total_loss, self.vae.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.vae.trainable_variables))
        
        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    
    def train(self, dataset, epochs=10, verbose=1):
        """Train the VAE"""
        for epoch in range(epochs):
            # Reset metrics
            self.total_loss_tracker.reset_states()
            self.reconstruction_loss_tracker.reset_states()
            self.kl_loss_tracker.reset_states()
            
            # Training loop
            for step, x_batch in enumerate(dataset):
                metrics = self.train_step(x_batch)
            
            if verbose:
                print(f"Epoch {epoch + 1}/{epochs}")
                print(f"Loss: {metrics['loss']:.4f}, "
                      f"Reconstruction: {metrics['reconstruction_loss']:.4f}, "
                      f"KL: {metrics['kl_loss']:.4f}")

def load_and_preprocess_data():
    """Load and preprocess MNIST dataset"""
    (x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
    
    # Normalize to [0, 1] and add channel dimension
    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0
    x_train = np.expand_dims(x_train, -1)
    x_test = np.expand_dims(x_test, -1)
    
    return x_train, x_test

def plot_latent_space(vae, x_test, y_test=None, n_samples=1000):
    """Plot the latent space representation"""
    # Sample subset of test data
    indices = np.random.choice(len(x_test), n_samples, replace=False)
    x_sample = x_test[indices]
    
    # Encode to latent space
    z_mean, _ = vae.encoder(x_sample)
    
    plt.figure(figsize=(8, 6))
    if y_test is not None:
        y_sample = y_test[indices]
        scatter = plt.scatter(z_mean[:, 0], z_mean[:, 1], c=y_sample, cmap='tab10', alpha=0.6)
        plt.colorbar(scatter)
    else:
        plt.scatter(z_mean[:, 0], z_mean[:, 1], alpha=0.6)
    
    plt.xlabel("Latent Dimension 1")
    plt.ylabel("Latent Dimension 2")
    plt.title("Latent Space Representation")
    plt.show()

def plot_generated_images(vae, n_samples=10):
    """Generate and plot new images from random latent vectors"""
    # Sample random points in latent space
    random_latent = tf.random.normal(shape=(n_samples, vae.latent_dim))
    generated_images = vae.decode(random_latent)
    
    plt.figure(figsize=(20, 4))
    for i in range(n_samples):
        plt.subplot(1, n_samples, i + 1)
        plt.imshow(generated_images[i, :, :, 0], cmap='gray')
        plt.axis('off')
    plt.suptitle("Generated Images from Random Latent Vectors")
    plt.show()

def plot_reconstructions(vae, x_test, n_samples=10):
    """Plot original vs reconstructed images"""
    indices = np.random.choice(len(x_test), n_samples, replace=False)
    x_sample = x_test[indices]
    reconstructions, _, _ = vae(x_sample)
    
    plt.figure(figsize=(20, 4))
    for i in range(n_samples):
        # Original
        ax = plt.subplot(2, n_samples, i + 1)
        plt.imshow(x_sample[i, :, :, 0], cmap='gray')
        plt.title("Original")
        plt.axis('off')
        
        # Reconstruction
        ax = plt.subplot(2, n_samples, i + 1 + n_samples)
        plt.imshow(reconstructions[i, :, :, 0], cmap='gray')
        plt.title("Reconstructed")
        plt.axis('off')
    plt.show()

# Example usage
if __name__ == "__main__":
    # Load data
    x_train, x_test = load_and_preprocess_data()
    
    # Create dataset
    batch_size = 128
    train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
    train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
    
    # Create and train VAE
    vae = VAE(latent_dim=2)  # 2D latent space for visualization
    trainer = VAETrainer(vae)
    
    print("Training VAE...")
    trainer.train(train_dataset, epochs=10)
    
    # Visualizations
    print("Plotting reconstructions...")
    plot_reconstructions(vae, x_test)
    
    print("Plotting generated images...")
    plot_generated_images(vae)
    
    # Load labels for latent space visualization
    (_, y_train), (_, y_test) = keras.datasets.mnist.load_data()
    print("Plotting latent space...")
    plot_latent_space(vae, x_test, y_test)b

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers

class CVAE(keras.Model):
    def __init__(self, input_dim=9, hidden_dim=7, latent_dim=2, num_classes=3):
        super(CVAE, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim
        self.num_classes = num_classes
        
        # Build encoder and decoder
        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()
        
    def build_encoder(self):
        """Build the conditional encoder network: input(9) + condition(num_classes) -> hidden(7) -> latent(2)"""
        # Input data
        data_inputs = keras.Input(shape=(self.input_dim,), name="data_input")
        
        # Condition input (one-hot encoded)
        condition_inputs = keras.Input(shape=(self.num_classes,), name="condition_input")
        
        # Concatenate data and condition
        combined = layers.Concatenate()([data_inputs, condition_inputs])
        
        # Hidden layer
        hidden = layers.Dense(self.hidden_dim, activation="relu")(combined)
        
        # Mean and log variance for latent space
        z_mean = layers.Dense(self.latent_dim, name="z_mean")(hidden)
        z_log_var = layers.Dense(self.latent_dim, name="z_log_var")(hidden)
        
        return keras.Model([data_inputs, condition_inputs], [z_mean, z_log_var], name="encoder")
    
    def build_decoder(self):
        """Build the conditional decoder network: latent(2) + condition(num_classes) -> hidden(7) -> output(9)"""
        # Latent input
        latent_inputs = keras.Input(shape=(self.latent_dim,), name="latent_input")
        
        # Condition input (one-hot encoded)
        condition_inputs = keras.Input(shape=(self.num_classes,), name="condition_input")
        
        # Concatenate latent and condition
        combined = layers.Concatenate()([latent_inputs, condition_inputs])
        
        # Hidden layer
        hidden = layers.Dense(self.hidden_dim, activation="relu")(combined)
        
        # Output layer (sigmoid for values between 0 and 1)
        outputs = layers.Dense(self.input_dim, activation="sigmoid")(hidden)
        
        return keras.Model([latent_inputs, condition_inputs], outputs, name="decoder")
    
    def reparameterize(self, z_mean, z_log_var):
        """Reparameterization trick: sample from N(mu, sigma) using N(0,1)"""
        batch_size = tf.shape(z_mean)[0]
        epsilon = tf.random.normal(shape=(batch_size, self.latent_dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    def call(self, inputs):
        """Forward pass through the CVAE"""
        data, conditions = inputs
        z_mean, z_log_var = self.encoder([data, conditions])
        z = self.reparameterize(z_mean, z_log_var)
        reconstructed = self.decoder([z, conditions])
        return reconstructed, z_mean, z_log_var
    
    def encode(self, data, conditions):
        """Encode input to latent space"""
        z_mean, z_log_var = self.encoder([data, conditions])
        return self.reparameterize(z_mean, z_log_var)
    
    def decode(self, z, conditions):
        """Decode from latent space"""
        return self.decoder([z, conditions])

class CVAETrainer:
    def __init__(self, cvae, optimizer=None):
        self.cvae = cvae
        self.optimizer = optimizer or keras.optimizers.Adam(1e-3)
        
        # Metrics
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
    
    def compute_loss(self, data, conditions):
        """Compute CVAE loss: reconstruction loss + KL divergence"""
        reconstructed, z_mean, z_log_var = self.cvae([data, conditions])
        
        # Reconstruction loss (mean squared error for continuous data)
        reconstruction_loss = tf.reduce_mean(
            tf.reduce_sum(tf.square(data - reconstructed), axis=1)
        )
        
        # KL divergence loss
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        
        total_loss = reconstruction_loss + kl_loss
        return total_loss, reconstruction_loss, kl_loss
    
    @tf.function
    def train_step(self, data, conditions):
        """Single training step"""
        with tf.GradientTape() as tape:
            total_loss, reconstruction_loss, kl_loss = self.compute_loss(data, conditions)
        
        gradients = tape.gradient(total_loss, self.cvae.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.cvae.trainable_variables))
        
        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    
    def train(self, dataset, epochs=100, verbose=1):
        """Train the CVAE"""
        for epoch in range(epochs):
            # Reset metrics
            self.total_loss_tracker.reset_states()
            self.reconstruction_loss_tracker.reset_states()
            self.kl_loss_tracker.reset_states()
            
            # Training loop
            for step, (data_batch, condition_batch) in enumerate(dataset):
                metrics = self.train_step(data_batch, condition_batch)
            
            if verbose and (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch + 1}/{epochs}")
                print(f"Loss: {metrics['loss']:.4f}, "
                      f"Reconstruction: {metrics['reconstruction_loss']:.4f}, "
                      f"KL: {metrics['kl_loss']:.4f}")

def generate_conditional_synthetic_data(n_samples=1500, num_classes=3):
    """Generate synthetic 9-dimensional data with different conditions/classes"""
    np.random.seed(42)
    
    data_list = []
    labels_list = []
    
    # Define different patterns for each class
    class_patterns = [
        # Class 0: Low values pattern
        {"mean": [0.2, 0.3, 0.1, 0.4, 0.2, 0.3, 0.1, 0.2, 0.3], "cov": 0.03},
        # Class 1: Medium values pattern  
        {"mean": [0.5, 0.6, 0.4, 0.7, 0.5, 0.6, 0.4, 0.5, 0.6], "cov": 0.03},
        # Class 2: High values pattern
        {"mean": [0.8, 0.7, 0.9, 0.6, 0.8, 0.7, 0.9, 0.8, 0.7], "cov": 0.03}
    ]
    
    samples_per_class = n_samples // num_classes
    
    for class_id in range(num_classes):
        pattern = class_patterns[class_id]
        
        class_data = np.random.multivariate_normal(
            mean=pattern["mean"],
            cov=np.eye(9) * pattern["cov"],
            size=samples_per_class
        )
        
        # Clip to [0, 1] range
        class_data = np.clip(class_data, 0, 1)
        
        data_list.append(class_data)
        labels_list.append(np.full(samples_per_class, class_id))
    
    # Combine all classes
    data = np.vstack(data_list).astype(np.float32)
    labels = np.concatenate(labels_list).astype(int)
    
    # Shuffle the data
    indices = np.random.permutation(len(data))
    data = data[indices]
    labels = labels[indices]
    
    return data, labels

def labels_to_onehot(labels, num_classes):
    """Convert integer labels to one-hot encoding"""
    return tf.one_hot(labels, num_classes)

def plot_conditional_latent_space(cvae, data, labels, title="Conditional Latent Space"):
    """Plot the 2D latent space for different conditions"""
    conditions_onehot = labels_to_onehot(labels, cvae.num_classes)
    
    # Encode to latent space
    z_mean, _ = cvae.encoder([data, conditions_onehot])
    
    plt.figure(figsize=(10, 8))
    colors = ['red', 'blue', 'green', 'orange', 'purple']
    
    for class_id in range(cvae.num_classes):
        mask = labels == class_id
        plt.scatter(z_mean[mask, 0], z_mean[mask, 1], 
                   c=colors[class_id], label=f'Class {class_id}', alpha=0.6)
    
    plt.xlabel("Latent Dimension 1")
    plt.ylabel("Latent Dimension 2")
    plt.title(title)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

def plot_conditional_reconstructions(cvae, data, labels, n_samples=5):
    """Plot original vs reconstructed vectors for each class"""
    fig, axes = plt.subplots(2 * cvae.num_classes, n_samples, 
                            figsize=(15, 4 * cvae.num_classes))
    
    for class_id in range(cvae.num_classes):
        # Get samples from this class
        class_mask = labels == class_id
        class_data = data[class_mask]
        class_labels = labels[class_mask]
        
        if len(class_data) < n_samples:
            continue
            
        indices = np.random.choice(len(class_data), n_samples, replace=False)
        x_sample = class_data[indices]
        y_sample = class_labels[indices]
        
        conditions_onehot = labels_to_onehot(y_sample, cvae.num_classes)
        reconstructions, _, _ = cvae([x_sample, conditions_onehot])
        
        for i in range(n_samples):
            row_orig = class_id * 2
            row_recon = class_id * 2 + 1
            
            # Original
            axes[row_orig, i].bar(range(9), x_sample[i])
            axes[row_orig, i].set_title(f"Class {class_id} - Original {i+1}")
            axes[row_orig, i].set_ylim(0, 1)
            axes[row_orig, i].set_xticks(range(9))
            
            # Reconstruction
            axes[row_recon, i].bar(range(9), reconstructions[i])
            axes[row_recon, i].set_title(f"Class {class_id} - Reconstructed {i+1}")
            axes[row_recon, i].set_ylim(0, 1)
            axes[row_recon, i].set_xticks(range(9))
    
    plt.tight_layout()
    plt.show()

def generate_conditional_samples(cvae, n_samples_per_class=3):
    """Generate new samples for each condition"""
    fig, axes = plt.subplots(cvae.num_classes, n_samples_per_class, 
                            figsize=(12, 3 * cvae.num_classes))
    
    for class_id in range(cvae.num_classes):
        # Create condition vector for this class
        conditions = np.zeros((n_samples_per_class, cvae.num_classes))
        conditions[:, class_id] = 1  # One-hot encoding
        conditions = tf.constant(conditions, dtype=tf.float32)
        
        # Sample random latent vectors
        random_latent = tf.random.normal(shape=(n_samples_per_class, cvae.latent_dim))
        
        # Generate samples
        generated_samples = cvae.decode(random_latent, conditions)
        
        for i in range(n_samples_per_class):
            if cvae.num_classes == 1:
                ax = axes[i]
            else:
                ax = axes[class_id, i]
                
            ax.bar(range(9), generated_samples[i])
            ax.set_title(f"Class {class_id} - Generated {i+1}")
            ax.set_ylim(0, 1)
            ax.set_xticks(range(9))
    
    plt.suptitle("Generated Samples by Condition")
    plt.tight_layout()
    plt.show()

def interpolate_between_conditions(cvae, data, labels):
    """Interpolate between different conditions in latent space"""
    # Get one sample from each class
    samples_by_class = []
    for class_id in range(cvae.num_classes):
        class_mask = labels == class_id
        class_data = data[class_mask]
        if len(class_data) > 0:
            samples_by_class.append(class_data[0:1])  # Take first sample
    
    if len(samples_by_class) < 2:
        print("Need at least 2 classes for interpolation")
        return
    
    # Encode samples to latent space
    latent_codes = []
    for i, sample in enumerate(samples_by_class):
        condition = np.zeros((1, cvae.num_classes))
        condition[0, i] = 1
        condition = tf.constant(condition, dtype=tf.float32)
        
        z_mean, _ = cvae.encoder([sample, condition])
        latent_codes.append(z_mean)
    
    # Interpolate between first two classes
    z1, z2 = latent_codes[0], latent_codes[1]
    n_steps = 5
    alphas = np.linspace(0, 1, n_steps)
    
    # Create conditions for interpolation (gradually change from class 0 to class 1)
    plt.figure(figsize=(15, 6))
    
    for i, alpha in enumerate(alphas):
        # Interpolate latent code
        z_interp = (1 - alpha) * z1 + alpha * z2
        
        # Interpolate condition (gradually change from class 0 to class 1)
        condition = np.array([[1-alpha, alpha] + [0] * (cvae.num_classes-2)])
        condition = tf.constant(condition, dtype=tf.float32)
        
        # Generate sample
        generated = cvae.decode(z_interp, condition)
        
        plt.subplot(1, n_steps, i + 1)
        plt.bar(range(9), generated[0])
        plt.title(f"α = {alpha:.2f}")
        plt.ylim(0, 1)
        plt.xticks(range(9))
    
    plt.suptitle("Interpolation Between Conditions")
    plt.tight_layout()
    plt.show()

def print_cvae_summary(cvae):
    """Print CVAE architecture summary"""
    print("CVAE Architecture:")
    print("=" * 50)
    print(f"Input dimension: {cvae.input_dim}")
    print(f"Hidden dimension: {cvae.hidden_dim}")
    print(f"Latent dimension: {cvae.latent_dim}")
    print(f"Number of classes: {cvae.num_classes}")
    print("\nEncoder (takes data + condition):")
    cvae.encoder.summary()
    print("\nDecoder (takes latent + condition):")
    cvae.decoder.summary()

# Example usage
if __name__ == "__main__":
    # Generate conditional synthetic data
    print("Generating synthetic conditional 9D data...")
    num_classes = 3
    data, labels = generate_conditional_synthetic_data(n_samples=1800, num_classes=num_classes)
    
    # Split into train/test
    train_size = int(0.8 * len(data))
    x_train, x_test = data[:train_size], data[train_size:]
    y_train, y_test = labels[:train_size], labels[train_size:]
    
    # Convert labels to one-hot
    y_train_onehot = labels_to_onehot(y_train, num_classes)
    y_test_onehot = labels_to_onehot(y_test, num_classes)
    
    # Create dataset
    batch_size = 64
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train_onehot))
    train_dataset = train_dataset.shuffle(buffer_size=1000).batch(batch_size)
    
    # Create CVAE
    cvae = CVAE(input_dim=9, hidden_dim=7, latent_dim=2, num_classes=num_classes)
    print_cvae_summary(cvae)
    
    # Train CVAE
    trainer = CVAETrainer(cvae)
    print("\nTraining CVAE...")
    trainer.train(train_dataset, epochs=100)
    
    # Visualizations
    print("\nPlotting conditional reconstructions...")
    plot_conditional_reconstructions(cvae, x_test, y_test)
    
    print("Generating conditional samples...")
    generate_conditional_samples(cvae)
    
    print("Plotting conditional latent space...")
    plot_conditional_latent_space(cvae, x_test, y_test)
    
    print("Interpolating between conditions...")
    interpolate_between_conditions(cvae, x_test, y_test)

In [None]:
"""
LSTM Autoencoder for anomaly detection in multidimensional time series
Base case: 7 input dimensions, latent space = 3
TensorFlow 2 / Keras implementation.

Features:
- Model definition (encoder, decoder)
- Synthetic data generator with injected anomalies
- Training pipeline with early stopping
- Reconstruction error-based anomaly scoring and thresholding
- Simple evaluation and plotting

Run as a script or import functions for experimentation.
"""

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks, optimizers
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# Reproducibility
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

# -------------------------
# Config / Hyperparameters
# -------------------------
TIMESTEPS = 50            # length of each sequence window
N_FEATURES = 7            # dimensionality of input (user requested base case)
LATENT_DIM = 3            # size of the bottleneck (user requested)
LSTM_UNITS = 64          # units in LSTM layers
BATCH_SIZE = 64
EPOCHS = 100
LEARNING_RATE = 1e-3

# -------------------------
# Model: LSTM Autoencoder
# -------------------------

def build_lstm_autoencoder(timesteps=TIMESTEPS, n_features=N_FEATURES,
                           latent_dim=LATENT_DIM, lstm_units=LSTM_UNITS):
    """Builds a sequence-to-sequence LSTM autoencoder.

    Encoder:
      - LSTM (returns final state) -> Dense latent

    Decoder:
      - RepeatVector -> LSTM (return sequences) -> TimeDistributed(Dense(n_features))

    Returns: compiled Keras model and an encoder model (for latent extraction)
    """
    # Encoder
    encoder_inputs = layers.Input(shape=(timesteps, n_features), name="encoder_input")
    x = layers.LSTM(lstm_units, return_sequences=False, name="enc_lstm_1")(encoder_inputs)
    latent = layers.Dense(latent_dim, activation="linear", name="latent_vector")(x)

    # Decoder
    x = layers.RepeatVector(timesteps, name="repeat_vector")(latent)
    x = layers.LSTM(lstm_units, return_sequences=True, name="dec_lstm_1")(x)
    decoder_outputs = layers.TimeDistributed(layers.Dense(n_features), name="decoder_output")(x)

    autoencoder = models.Model(encoder_inputs, decoder_outputs, name="lstm_autoencoder")

    # also create encoder model for latent extraction
    encoder_model = models.Model(encoder_inputs, latent, name="encoder_model")

    optimizer = optimizers.Adam(learning_rate=LEARNING_RATE)
    autoencoder.compile(optimizer=optimizer, loss="mse")

    return autoencoder, encoder_model


# -------------------------
# Synthetic Data Generator
# -------------------------

def generate_synthetic_data(n_samples=5000, timesteps=TIMESTEPS, n_features=N_FEATURES,
                            anomaly_fraction=0.02, anomaly_magnitude=5.0):
    """Generate multivariate time series windows with some injected anomalies.

    Returns: X (n_samples, timesteps, n_features), y (binary labels per window: 0 normal, 1 anomaly)
    """
    # base normal signals: mixture of sinusoids + gaussian noise per feature
    t = np.linspace(0, 2 * np.pi, timesteps)
    X = np.zeros((n_samples, timesteps, n_features), dtype=np.float32)

    for i in range(n_samples):
        for f in range(n_features):
            freq = 0.5 + 0.5 * (f + 1) / n_features
            phase = np.random.RandomState(SEED + i + f).rand() * 2 * np.pi
            amp = 1.0 + 0.1 * f
            signal = amp * np.sin(freq * t + phase)
            noise = 0.1 * np.random.normal(size=timesteps)
            X[i, :, f] = signal + noise

    # labels
    y = np.zeros((n_samples,), dtype=int)

    # inject anomalies into a fraction of windows
    n_anom = int(n_samples * anomaly_fraction)
    anomaly_indices = np.random.choice(n_samples, size=n_anom, replace=False)
    for idx in anomaly_indices:
        # choose random feature(s) and random time segments to corrupt
        n_corrupt_features = np.random.randint(1, n_features // 2 + 1)
        corrupt_features = np.random.choice(n_features, size=n_corrupt_features, replace=False)
        start = np.random.randint(0, timesteps // 2)
        length = np.random.randint(timesteps // 8, timesteps // 2)
        end = min(timesteps, start + length)
        for f in corrupt_features:
            # add a large offset and scaled noise
            X[idx, start:end, f] += anomaly_magnitude * (np.random.randn(end - start) + 3.0)
        y[idx] = 1

    return X, y


# -------------------------
# Utility: compute reconstruction errors and threshold
# -------------------------

def reconstruction_errors(model, X):
    """Return per-window MSE reconstruction error (mean over timesteps and features)
    and also the per-timestep/per-feature error if desired.
    """
    X_pred = model.predict(X, verbose=0)
    se = np.mean(np.square(X - X_pred), axis=(1, 2))  # MSE per-window
    return se


def choose_threshold(errors, method="percentile", percentile=99.0):
    """Choose an anomaly threshold from reconstruction errors. Methods supported:
    - percentile: use a high percentile of the training errors
    - std: mean + k * std
    """
    if method == "percentile":
        thresh = np.percentile(errors, percentile)
    elif method == "std":
        thresh = errors.mean() + 3.0 * errors.std()
    else:
        raise ValueError("Unknown method")
    return thresh


# -------------------------
# Example training + detection
# -------------------------

def example_run():
    # Generate data
    X, y = generate_synthetic_data(n_samples=5000)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=SEED)

    # We'll train only on *normal* windows (y == 0)
    X_train_norm = X_train[y_train == 0]
    print(f"Training on {len(X_train_norm)} normal windows (out of {len(X_train)})")

    # Build model
    autoencoder, encoder = build_lstm_autoencoder()
    autoencoder.summary()

    # Callbacks
    es = callbacks.EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)

    history = autoencoder.fit(
        X_train_norm, X_train_norm,
        validation_split=0.1,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        callbacks=[es],
        verbose=2
    )

    # Compute reconstruction errors on train-normal (used to set threshold)
    train_errors = reconstruction_errors(autoencoder, X_train_norm)
    thresh = choose_threshold(train_errors, method="percentile", percentile=99.5)
    print(f"Chosen threshold (99.5 percentile of train normal errors): {thresh:.6f}")

    # Evaluate on test set
    test_errors = reconstruction_errors(autoencoder, X_test)
    y_pred = (test_errors >= thresh).astype(int)

    # Basic metrics
    from sklearn.metrics import classification_report, confusion_matrix
    print("Confusion matrix (test set):")
    print(confusion_matrix(y_test, y_pred))
    print(classification_report(y_test, y_pred, digits=4))

    # Plot example errors and threshold
    plt.figure(figsize=(10, 4))
    plt.plot(test_errors, label="reconstruction_error")
    plt.hlines(thresh, xmin=0, xmax=len(test_errors)-1, colors="r", linestyles="dashed", label="threshold")
    plt.legend()
    plt.title("Reconstruction errors on test set")
    plt.xlabel("window index")
    plt.ylabel("MSE")
    plt.show()

    # Show some examples of reconstructed sequences
    n_examples = 3
    idxs = np.random.choice(len(X_test), size=n_examples, replace=False)
    X_pred = autoencoder.predict(X_test[idxs])

    for i, idx in enumerate(idxs):
        fig, axes = plt.subplots(N_FEATURES, 1, figsize=(10, 2 * N_FEATURES), sharex=True)
        fig.suptitle(f"Window idx {idx} - label={y_test[idx]} - error={test_errors[idx]:.6f}")
        for f in range(N_FEATURES):
            axes[f].plot(X_test[idx][:, f], label="orig")
            axes[f].plot(X_pred[i][:, f], label="recon", linestyle="--")
            axes[f].legend(loc="upper right")
        plt.tight_layout()
        plt.show()

    return autoencoder, encoder, thresh


if __name__ == "__main__":
    example_run()

In [None]:
"""
LSTM Autoencoder for anomaly detection in multidimensional time series
Base case: 7 input dimensions, latent space = 3
TensorFlow 2 / Keras implementation with clean subclassed model API.
"""

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks, optimizers
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# Reproducibility
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

# -------------------------
# Config / Hyperparameters
# -------------------------
TIMESTEPS = 50
N_FEATURES = 7
LATENT_DIM = 3
LSTM_UNITS = 64
BATCH_SIZE = 64
EPOCHS = 100
LEARNING_RATE = 1e-3

# -------------------------
# Model: Subclassed LSTM Autoencoder
# -------------------------
class LSTMAutoencoder(tf.keras.Model):
    def __init__(self, timesteps, n_features, latent_dim, lstm_units):
        super().__init__()
        self.encoder_lstm = layers.LSTM(lstm_units, return_sequences=False, name="enc_lstm")
        self.latent_dense = layers.Dense(latent_dim, activation="linear", name="latent")
        self.repeat_vector = layers.RepeatVector(timesteps, name="repeat")
        self.decoder_lstm = layers.LSTM(lstm_units, return_sequences=True, name="dec_lstm")
        self.decoder_output = layers.TimeDistributed(layers.Dense(n_features), name="output")

    def call(self, inputs):
        x = self.encoder_lstm(inputs)
        latent = self.latent_dense(x)
        x = self.repeat_vector(latent)
        x = self.decoder_lstm(x)
        return self.decoder_output(x)

# -------------------------
# Synthetic Data Generator
# -------------------------

def generate_synthetic_data(n_samples=5000, timesteps=TIMESTEPS, n_features=N_FEATURES,
                            anomaly_fraction=0.02, anomaly_magnitude=5.0):
    t = np.linspace(0, 2 * np.pi, timesteps)
    X = np.zeros((n_samples, timesteps, n_features), dtype=np.float32)
    for i in range(n_samples):
        for f in range(n_features):
            freq = 0.5 + 0.5 * (f + 1) / n_features
            phase = np.random.RandomState(SEED + i + f).rand() * 2 * np.pi
            amp = 1.0 + 0.1 * f
            signal = amp * np.sin(freq * t + phase)
            noise = 0.1 * np.random.normal(size=timesteps)
            X[i, :, f] = signal + noise
    y = np.zeros((n_samples,), dtype=int)
    n_anom = int(n_samples * anomaly_fraction)
    anomaly_indices = np.random.choice(n_samples, size=n_anom, replace=False)
    for idx in anomaly_indices:
        n_corrupt_features = np.random.randint(1, n_features // 2 + 1)
        corrupt_features = np.random.choice(n_features, size=n_corrupt_features, replace=False)
        start = np.random.randint(0, timesteps // 2)
        length = np.random.randint(timesteps // 8, timesteps // 2)
        end = min(timesteps, start + length)
        for f in corrupt_features:
            X[idx, start:end, f] += anomaly_magnitude * (np.random.randn(end - start) + 3.0)
        y[idx] = 1
    return X, y

# -------------------------
# Utility
# -------------------------

def reconstruction_errors(model, X):
    X_pred = model.predict(X, verbose=0)
    return np.mean(np.square(X - X_pred), axis=(1, 2))

def choose_threshold(errors, percentile=99.0):
    return np.percentile(errors, percentile)

# -------------------------
# Example run
# -------------------------

def example_run():
    X, y = generate_synthetic_data(n_samples=5000)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=SEED)
    X_train_norm = X_train[y_train == 0]

    autoencoder = LSTMAutoencoder(TIMESTEPS, N_FEATURES, LATENT_DIM, LSTM_UNITS)
    optimizer = optimizers.Adam(learning_rate=LEARNING_RATE)
    autoencoder.compile(optimizer=optimizer, loss="mse")

    es = callbacks.EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
    autoencoder.fit(X_train_norm, X_train_norm, validation_split=0.1, epochs=EPOCHS,
                    batch_size=BATCH_SIZE, callbacks=[es], verbose=2)

    train_errors = reconstruction_errors(autoencoder, X_train_norm)
    thresh = choose_threshold(train_errors, percentile=99.5)
    print(f"Chosen threshold: {thresh:.6f}")

    test_errors = reconstruction_errors(autoencoder, X_test)
    y_pred = (test_errors >= thresh).astype(int)

    from sklearn.metrics import classification_report, confusion_matrix
    print(confusion_matrix(y_test, y_pred))
    print(classification_report(y_test, y_pred, digits=4))

    plt.figure(figsize=(10, 4))
    plt.plot(test_errors, label="reconstruction_error")
    plt.hlines(thresh, 0, len(test_errors)-1, colors="r", linestyles="dashed", label="threshold")
    plt.legend()
    plt.show()

if __name__ == "__main__":
    example_run()


In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

class LSTMAutoencoder:
    """
    LSTM Autoencoder for multivariate time series anomaly detection
    """
    
    def __init__(self, input_dim=7, latent_dim=3, sequence_length=10, 
                 lstm_units=64, learning_rate=0.001):
        """
        Initialize LSTM Autoencoder
        
        Args:
            input_dim: Number of features in input data
            latent_dim: Dimension of latent/encoded representation
            sequence_length: Length of input sequences
            lstm_units: Number of LSTM units in encoder/decoder
            learning_rate: Learning rate for optimizer
        """
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.sequence_length = sequence_length
        self.lstm_units = lstm_units
        self.learning_rate = learning_rate
        
        self.model = None
        self.encoder = None
        self.decoder = None
        self.scaler = MinMaxScaler()
        self.threshold = None
        
        self._build_model()
    
    def _build_model(self):
        """Build the LSTM Autoencoder architecture"""
        
        # Input layer
        input_layer = tf.keras.Input(shape=(self.sequence_length, self.input_dim))
        
        # Encoder
        encoded = tf.keras.layers.LSTM(self.lstm_units, return_sequences=True, 
                                     name='encoder_lstm1')(input_layer)
        encoded = tf.keras.layers.Dropout(0.2)(encoded)
        encoded = tf.keras.layers.LSTM(self.lstm_units//2, return_sequences=False, 
                                     name='encoder_lstm2')(encoded)
        
        # Bottleneck (latent representation)
        latent = tf.keras.layers.Dense(self.latent_dim, activation='tanh', 
                                     name='latent_layer')(encoded)
        
        # Decoder
        decoded = tf.keras.layers.RepeatVector(self.sequence_length)(latent)
        decoded = tf.keras.layers.LSTM(self.lstm_units//2, return_sequences=True, 
                                     name='decoder_lstm1')(decoded)
        decoded = tf.keras.layers.Dropout(0.2)(decoded)
        decoded = tf.keras.layers.LSTM(self.lstm_units, return_sequences=True, 
                                     name='decoder_lstm2')(decoded)
        
        # Output layer
        output_layer = tf.keras.layers.TimeDistributed(
            tf.keras.layers.Dense(self.input_dim, activation='linear'),
            name='output_layer'
        )(decoded)
        
        # Create models
        self.model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
        self.encoder = tf.keras.Model(inputs=input_layer, outputs=latent)
        
        # Compile model
        self.model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate),
            loss='mse',
            metrics=['mae']
        )
    
    def prepare_sequences(self, data):
        """
        Prepare sequences for LSTM input
        
        Args:
            data: Input data of shape (samples, features)
            
        Returns:
            sequences: Array of shape (num_sequences, sequence_length, features)
        """
        sequences = []
        for i in range(len(data) - self.sequence_length + 1):
            sequences.append(data[i:i + self.sequence_length])
        return np.array(sequences)
    
    def preprocess_data(self, X, fit_scaler=True):
        """
        Preprocess data: normalize and create sequences
        
        Args:
            X: Input data
            fit_scaler: Whether to fit the scaler (True for training data)
            
        Returns:
            X_processed: Preprocessed sequences
        """
        if fit_scaler:
            X_scaled = self.scaler.fit_transform(X)
        else:
            X_scaled = self.scaler.transform(X)
        
        X_sequences = self.prepare_sequences(X_scaled)
        return X_sequences
    
    @tf.function
    def train_step(self, x_batch):
        """
        Custom training step for the autoencoder
        
        Args:
            x_batch: Batch of input sequences
            
        Returns:
            loss: Training loss for the batch
        """
        with tf.GradientTape() as tape:
            # Forward pass
            reconstructed = self.model(x_batch, training=True)
            # Compute loss
            loss = tf.keras.losses.mse(x_batch, reconstructed)
            loss = tf.reduce_mean(loss)
        
        # Compute gradients
        gradients = tape.gradient(loss, self.model.trainable_variables)
        # Apply gradients
        self.model.optimizer.apply_gradients(
            zip(gradients, self.model.trainable_variables)
        )
        
        return loss
    
    def fit(self, X_train, epochs=100, batch_size=32, validation_split=0.2, 
            verbose=1, early_stopping_patience=10):
        """
        Train the LSTM Autoencoder
        
        Args:
            X_train: Training data
            epochs: Number of training epochs
            batch_size: Batch size for training
            validation_split: Fraction of data to use for validation
            verbose: Verbosity level
            early_stopping_patience: Patience for early stopping
            
        Returns:
            history: Training history
        """
        print(f"Preprocessing training data...")
        X_train_processed = self.preprocess_data(X_train, fit_scaler=True)
        print(f"Training sequences shape: {X_train_processed.shape}")
        
        # Callbacks
        callbacks = [
            tf.keras.callbacks.EarlyStopping(
                monitor='val_loss',
                patience=early_stopping_patience,
                restore_best_weights=True
            ),
            tf.keras.callbacks.ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=5,
                min_lr=1e-7
            )
        ]
        
        # Train the model
        history = self.model.fit(
            X_train_processed, X_train_processed,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=validation_split,
            callbacks=callbacks,
            verbose=verbose
        )
        
        return history
    
    def calculate_reconstruction_error(self, X):
        """
        Calculate reconstruction error for anomaly detection
        
        Args:
            X: Input data
            
        Returns:
            errors: Reconstruction errors for each sequence
        """
        X_processed = self.preprocess_data(X, fit_scaler=False)
        reconstructed = self.model.predict(X_processed, verbose=0)
        
        # Calculate MSE for each sequence
        errors = np.mean(np.square(X_processed - reconstructed), axis=(1, 2))
        return errors
    
    def set_threshold(self, X_normal, percentile=95):
        """
        Set anomaly detection threshold based on normal data
        
        Args:
            X_normal: Normal (non-anomalous) data
            percentile: Percentile to use as threshold
        """
        errors = self.calculate_reconstruction_error(X_normal)
        self.threshold = np.percentile(errors, percentile)
        print(f"Anomaly threshold set to: {self.threshold:.6f}")
    
    def predict_anomalies(self, X):
        """
        Predict anomalies in the input data
        
        Args:
            X: Input data to check for anomalies
            
        Returns:
            predictions: Binary predictions (1 for anomaly, 0 for normal)
            errors: Reconstruction errors
        """
        if self.threshold is None:
            raise ValueError("Threshold not set. Call set_threshold() first.")
        
        errors = self.calculate_reconstruction_error(X)
        predictions = (errors > self.threshold).astype(int)
        
        return predictions, errors
    
    def encode(self, X):
        """
        Encode input data to latent representation
        
        Args:
            X: Input data
            
        Returns:
            latent_repr: Encoded latent representations
        """
        X_processed = self.preprocess_data(X, fit_scaler=False)
        latent_repr = self.encoder.predict(X_processed, verbose=0)
        return latent_repr
    
    def plot_training_history(self, history):
        """Plot training history"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
        
        # Loss plot
        ax1.plot(history.history['loss'], label='Training Loss')
        ax1.plot(history.history['val_loss'], label='Validation Loss')
        ax1.set_title('Model Loss')
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Loss')
        ax1.legend()
        ax1.grid(True)
        
        # MAE plot
        ax2.plot(history.history['mae'], label='Training MAE')
        ax2.plot(history.history['val_mae'], label='Validation MAE')
        ax2.set_title('Model MAE')
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('MAE')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.show()
    
    def plot_reconstruction_error(self, errors_normal, errors_anomaly=None):
        """Plot reconstruction error distribution"""
        plt.figure(figsize=(10, 6))
        
        plt.hist(errors_normal, bins=50, alpha=0.7, label='Normal', density=True)
        if errors_anomaly is not None:
            plt.hist(errors_anomaly, bins=50, alpha=0.7, label='Anomaly', density=True)
        
        if self.threshold is not None:
            plt.axvline(x=self.threshold, color='red', linestyle='--', 
                       label=f'Threshold: {self.threshold:.4f}')
        
        plt.xlabel('Reconstruction Error')
        plt.ylabel('Density')
        plt.title('Distribution of Reconstruction Errors')
        plt.legend()
        plt.grid(True)
        plt.show()


# Example usage and testing
def generate_sample_data():
    """Generate sample multivariate time series data with anomalies"""
    np.random.seed(42)
    
    # Normal data: sinusoidal patterns with some noise
    n_samples = 1000
    time = np.linspace(0, 50, n_samples)
    
    normal_data = np.zeros((n_samples, 7))
    for i in range(7):
        freq = 0.1 + i * 0.05
        phase = i * np.pi / 4
        normal_data[:, i] = np.sin(2 * np.pi * freq * time + phase) + \
                           0.1 * np.random.randn(n_samples)
    
    # Anomalous data: sudden spikes and different patterns
    n_anomalies = 100
    anomaly_data = np.zeros((n_anomalies, 7))
    time_anom = np.linspace(0, 5, n_anomalies)
    
    for i in range(7):
        # Create anomalies with different patterns
        if i < 3:
            # Spikes
            anomaly_data[:, i] = 3 * np.random.randn(n_anomalies)
        else:
            # Different frequency patterns
            anomaly_data[:, i] = 2 * np.sin(10 * time_anom + i) + \
                               0.5 * np.random.randn(n_anomalies)
    
    return normal_data, anomaly_data

def main():
    """Main function demonstrating LSTM-AE usage"""
    print("LSTM Autoencoder for Anomaly Detection")
    print("=" * 50)
    
    # Generate sample data
    print("1. Generating sample data...")
    normal_data, anomaly_data = generate_sample_data()
    print(f"Normal data shape: {normal_data.shape}")
    print(f"Anomaly data shape: {anomaly_data.shape}")
    
    # Split normal data for training and testing
    split_idx = int(0.8 * len(normal_data))
    X_train = normal_data[:split_idx]
    X_test_normal = normal_data[split_idx:]
    
    # Initialize LSTM Autoencoder
    print("\n2. Initializing LSTM Autoencoder...")
    lstm_ae = LSTMAutoencoder(
        input_dim=7,
        latent_dim=3,
        sequence_length=10,
        lstm_units=64,
        learning_rate=0.001
    )
    
    print(f"Model architecture:")
    lstm_ae.model.summary()
    
    # Train the model
    print("\n3. Training the model...")
    history = lstm_ae.fit(
        X_train,
        epochs=50,
        batch_size=32,
        validation_split=0.2,
        verbose=1,
        early_stopping_patience=10
    )
    
    # Plot training history
    print("\n4. Plotting training history...")
    lstm_ae.plot_training_history(history)
    
    # Set anomaly detection threshold
    print("\n5. Setting anomaly detection threshold...")
    lstm_ae.set_threshold(X_test_normal, percentile=95)
    
    # Test on normal data
    print("\n6. Testing on normal data...")
    pred_normal, errors_normal = lstm_ae.predict_anomalies(X_test_normal)
    print(f"Normal data - Anomalies detected: {np.sum(pred_normal)}/{len(pred_normal)}")
    
    # Test on anomalous data
    print("\n7. Testing on anomalous data...")
    pred_anomaly, errors_anomaly = lstm_ae.predict_anomalies(anomaly_data)
    print(f"Anomalous data - Anomalies detected: {np.sum(pred_anomaly)}/{len(pred_anomaly)}")
    
    # Plot reconstruction error distributions
    print("\n8. Plotting reconstruction error distributions...")
    lstm_ae.plot_reconstruction_error(errors_normal, errors_anomaly)
    
    # Evaluate performance
    print("\n9. Performance Evaluation:")
    y_true = np.concatenate([np.zeros(len(pred_normal)), np.ones(len(pred_anomaly))])
    y_pred = np.concatenate([pred_normal, pred_anomaly])
    
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred))
    
    print("\nConfusion Matrix:")
    print(confusion_matrix(y_true, y_pred))
    
    # Test encoding functionality
    print("\n10. Testing latent encoding...")
    latent_repr = lstm_ae.encode(X_test_normal[:5])
    print(f"Original data shape: {X_test_normal[:5].shape}")
    print(f"Latent representation shape: {latent_repr.shape}")
    print(f"Sample latent vectors:\n{latent_repr}")

if __name__ == "__main__":
    main()