In [26]:
pip install tensorflow-addons

[31mERROR: Could not find a version that satisfies the requirement tensorflow-addons (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for tensorflow-addons[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


In [1]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt

In [2]:
# =========================
# Enable Mixed Precision (if GPU is available)
# =========================
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("GPU detected. Enabling mixed precision training.")
    from tensorflow.keras import mixed_precision
    policy = mixed_precision.Policy('mixed_float16')
    mixed_precision.set_global_policy(policy)
else:
    print("No GPU detected. Training will run on CPU.")

No GPU detected. Training will run on CPU.


In [3]:
# =========================
# 1. Preprocessing Functions
# =========================
def load_and_preprocess_video(video_path, target_size=(256, 256), max_frames=500):
    print(f"Loading video: {video_path}")
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened() and len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, target_size)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = frame / 127.5 - 1.0  # Normalize to [-1, 1]
        frames.append(frame)
    cap.release()
    print(f"Loaded {len(frames)} frames from {video_path}")
    return np.array(frames)

def load_videos_from_folder_subset(folder_path, target_size=(256, 256), max_frames=500, num_videos=10):
    all_videos = sorted([f for f in os.listdir(folder_path) if f.endswith('.avi')])
    videos = all_videos[:num_videos]
    frames = []
    for video_file in videos:
        video_path = os.path.join(folder_path, video_file)
        video_frames = load_and_preprocess_video(video_path, target_size, max_frames)
        frames.append(video_frames)
    total_frames = np.concatenate(frames, axis=0) if frames else np.array([])
    print(f"Total frames loaded from folder {folder_path}: {total_frames.shape[0] if len(total_frames) > 0 else 0}")
    return total_frames


In [4]:
# =========================
# 2. Residual Block (Simplified)
# =========================
def residual_block(x, filters, kernel_size=3):
    shortcut = x
    x = layers.Conv2D(filters, kernel_size, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)  # Use BatchNorm instead of GroupNorm
    x = layers.LeakyReLU(negative_slope=0.2)(x)  # Updated parameter name
    x = layers.Conv2D(filters, kernel_size, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([shortcut, x])
    x = layers.LeakyReLU(negative_slope=0.2)(x)  # Updated parameter name
    return x

In [5]:
# =========================
# 3. Simplified GAN Architectures
# =========================
class TemporalGAN:
    def __init__(self, frame_shape=(256, 256, 3), latent_dim=128):
        print("Initializing TemporalGAN...")
        self.frame_shape = frame_shape
        self.latent_dim = latent_dim

        self.generator = self.build_generator()
        self.discriminator = self.build_discriminator()

        # Use a slightly lower learning rate for discriminator to slow its convergence
        self.gen_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
        self.disc_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001, beta_1=0.5)
        self.cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
        print("TemporalGAN initialized.")

    def build_generator(self):
        print("Building generator with residual blocks...")
        inputs = layers.Input(shape=(self.latent_dim,))
        x = layers.Dense(16 * 16 * 256, use_bias=False)(inputs)
        x = layers.BatchNormalization()(x)  # Use BatchNorm instead of GroupNorm
        x = layers.LeakyReLU(negative_slope=0.2)(x)  # Updated parameter name
        x = layers.Reshape((16, 16, 256))(x)
        
        # Upsample to 32x32
        x = layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU(negative_slope=0.2)(x)  # Updated parameter name
        # Residual block at 32x32
        x = residual_block(x, 128)
        
        # Upsample to 64x64
        x = layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU(negative_slope=0.2)(x)  # Updated parameter name
        # Residual block at 64x64
        x = residual_block(x, 64)
        
        # Upsample to 128x128
        x = layers.Conv2DTranspose(32, (5, 5), strides=(2, 2), padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU(negative_slope=0.2)(x)  # Updated parameter name
        
        # Upsample to 256x256
        outputs = layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', activation='tanh')(x)
        
        model = models.Model(inputs, outputs)
        print("Generator built.")
        return model

    def build_discriminator(self):
        print("Building simplified discriminator...")
        model = models.Sequential([
            layers.InputLayer(shape=self.frame_shape),
            layers.GaussianNoise(0.1),
            layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same'),  # No SpectralNorm
            layers.LeakyReLU(negative_slope=0.2),  # Updated parameter name
            layers.Dropout(0.4),
            layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'),
            layers.LeakyReLU(negative_slope=0.2),  # Updated parameter name
            layers.Dropout(0.4),
            layers.Conv2D(256, (5, 5), strides=(2, 2), padding='same'),
            layers.LeakyReLU(negative_slope=0.2),  # Updated parameter name
            layers.Dropout(0.4),
            layers.Flatten(),
            layers.Dense(1)
        ])
        print("Discriminator built.")
        return model

    def generator_loss(self, fake_output):
        return self.cross_entropy(tf.ones_like(fake_output), fake_output)

    def discriminator_loss(self, real_output, fake_output):
        real_loss = self.cross_entropy(tf.ones_like(real_output) * 0.9, real_output)
        fake_loss = self.cross_entropy(tf.zeros_like(fake_output), fake_output)
        return real_loss + fake_loss

In [6]:
# =========================
# 4. Create an Optimized Data Pipeline
# =========================
def create_dataset(frames, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices(frames)
    dataset = dataset.shuffle(1000).batch(batch_size).repeat().prefetch(tf.data.AUTOTUNE)  # Add .repeat()
    return dataset

In [13]:
# =========================
# 5. Modified Training Function
# =========================
def train_temporal_gan_with_monitoring(frames, epochs=500, batch_size=8, latent_dim=128, output_dir="synthetic_frames_enhanced"):
    gen_losses = []
    disc_losses = []
    os.makedirs(output_dir, exist_ok=True)
    
    dataset = create_dataset(frames, batch_size)
    tgan = TemporalGAN(frame_shape=(256, 256, 3), latent_dim=latent_dim)
    
    steps_per_epoch = len(frames) // batch_size  # Calculate steps per epoch
    
    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")
        batch_gen_losses = []
        batch_disc_losses = []
        
        for step, real_frames in enumerate(dataset):
            if step >= steps_per_epoch:  # Stop after one epoch
                break
            
            # Train generator twice
            for _ in range(2):
                noise = tf.random.normal([batch_size, latent_dim])
                with tf.GradientTape() as gen_tape:
                    generated_frames = tgan.generator(noise, training=True)
                    fake_output = tgan.discriminator(generated_frames, training=True)
                    gen_loss = tgan.generator_loss(fake_output)
                gradients_gen = gen_tape.gradient(gen_loss, tgan.generator.trainable_variables)
                tgan.gen_optimizer.apply_gradients(zip(gradients_gen, tgan.generator.trainable_variables))
            
            # Train discriminator once
            noise = tf.random.normal([batch_size, latent_dim])
            with tf.GradientTape() as disc_tape:
                generated_frames = tgan.generator(noise, training=True)
                real_output = tgan.discriminator(real_frames, training=True)
                fake_output = tgan.discriminator(generated_frames, training=True)
                disc_loss = tgan.discriminator_loss(real_output, fake_output)
            gradients_disc = disc_tape.gradient(disc_loss, tgan.discriminator.trainable_variables)
            tgan.disc_optimizer.apply_gradients(zip(gradients_disc, tgan.discriminator.trainable_variables))
            
            batch_gen_losses.append(gen_loss.numpy())
            batch_disc_losses.append(disc_loss.numpy())
        
        # Monitoring and saving
        avg_gen_loss = np.mean(batch_gen_losses)
        avg_disc_loss = np.mean(batch_disc_losses)
        print(f"Epoch {epoch+1}: gen_loss={avg_gen_loss:.4f}, disc_loss={avg_disc_loss:.4f}")
        
        if (epoch+1) % 10 == 0:  # Save samples every 10 epochs
            noise = tf.random.normal([4, latent_dim])  # Generate 4 samples
            generated_samples = tgan.generator(noise, training=False).numpy()
            generated_samples = (generated_samples + 1) * 127.5
            for idx, frame in enumerate(generated_samples.astype(np.uint8)):
                output_path = os.path.join(output_dir, f"epoch{epoch+1}_sample{idx}.png")
                cv2.imwrite(output_path, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    
    # Plot final training statistics
    plt.figure(figsize=(12, 6))
    plt.plot(gen_losses, label='Generator Loss')
    plt.plot(disc_losses, label='Discriminator Loss')
    plt.title("Training Progress")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.savefig(os.path.join(output_dir, "training_curve.png"))
    plt.close()

In [17]:
# =========================
# 5. Modified Training Function with Loss Monitoring (edited)
# =========================
def train_temporal_gan_with_monitoring(frames, epochs=500, batch_size=8, latent_dim=128, output_dir="synthetic_frames_enhanced"):
    os.makedirs(output_dir, exist_ok=True)
    
    # Loss lists for tracking per epoch
    gen_losses = []
    disc_losses = []
    
    dataset = create_dataset(frames, batch_size)
    tgan = TemporalGAN(frame_shape=(256, 256, 3), latent_dim=latent_dim)
    
    steps_per_epoch = len(frames) // batch_size
    
    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")
        batch_gen_losses = []
        batch_disc_losses = []
        
        for step, real_frames in enumerate(dataset):
            if step >= steps_per_epoch:
                break
            
            # Train generator twice
            for _ in range(2):
                noise = tf.random.normal([batch_size, latent_dim])
                with tf.GradientTape() as gen_tape:
                    generated_frames = tgan.generator(noise, training=True)
                    fake_output = tgan.discriminator(generated_frames, training=True)
                    gen_loss = tgan.generator_loss(fake_output)
                gradients_gen = gen_tape.gradient(gen_loss, tgan.generator.trainable_variables)
                tgan.gen_optimizer.apply_gradients(zip(gradients_gen, tgan.generator.trainable_variables))
            
            # Train discriminator once
            noise = tf.random.normal([batch_size, latent_dim])
            with tf.GradientTape() as disc_tape:
                generated_frames = tgan.generator(noise, training=True)
                real_output = tgan.discriminator(real_frames, training=True)
                fake_output = tgan.discriminator(generated_frames, training=True)
                disc_loss = tgan.discriminator_loss(real_output, fake_output)
            gradients_disc = disc_tape.gradient(disc_loss, tgan.discriminator.trainable_variables)
            tgan.disc_optimizer.apply_gradients(zip(gradients_disc, tgan.discriminator.trainable_variables))
            
            batch_gen_losses.append(gen_loss.numpy())
            batch_disc_losses.append(disc_loss.numpy())
        
        # Calculate average losses per epoch and store them for plotting.
        avg_gen_loss = np.mean(batch_gen_losses)
        avg_disc_loss = np.mean(batch_disc_losses)
        gen_losses.append(avg_gen_loss)
        disc_losses.append(avg_disc_loss)
        print(f"Epoch {epoch+1}: gen_loss={avg_gen_loss:.4f}, disc_loss={avg_disc_loss:.4f}")
        
        # Monitoring and saving samples every 10 epochs.
        if (epoch+1) % 10 == 0:
            noise = tf.random.normal([4, latent_dim])
            generated_samples = tgan.generator(noise, training=False).numpy()
            generated_samples = (generated_samples + 1) * 127.5  # Denormalize back to [0, 255]
            for idx, frame in enumerate(generated_samples.astype(np.uint8)):
                output_path = os.path.join(output_dir, f"epoch{epoch+1}_sample{idx}.png")
                cv2.imwrite(output_path, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    
    # Plot final training statistics
    plt.figure(figsize=(12, 6))
    plt.plot(gen_losses, label='Generator Loss')
    plt.plot(disc_losses, label='Discriminator Loss')
    plt.title("Training Progress")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.savefig(os.path.join(output_dir, "training_curve.png"))
    plt.close()

In [15]:
# =========================
# 6. Data Loading and Running Training
# =========================
# Replace with the actual root folder path that contains B
root_folder = '/Users/omvishal/Desktop/B'  # <<<--- Change this to your folder path
train_videos_folder = os.path.join(root_folder, "train", "BenchPress")
print(f"Loading training videos from: {train_videos_folder}")
train_frames = load_videos_from_folder_subset(train_videos_folder, target_size=(256, 256), max_frames=500, num_videos=10)
print(f"Total training frames loaded: {train_frames.shape[0]}")

Loading training videos from: /Users/omvishal/Desktop/B/train/BenchPress
Loading video: /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c01.avi
Loaded 151 frames from /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c01.avi
Loading video: /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c03.avi
Loaded 104 frames from /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c03.avi
Loading video: /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c04.avi
Loaded 105 frames from /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c04.avi
Loading video: /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c05.avi
Loaded 88 frames from /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c05.avi
Loading video: /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c06.avi
Loaded 111 frames from /Users/omvishal/Desktop/B/train/BenchPress/v_BenchPress_g01_c06.avi
Loading video: /Users/omvishal/Desktop/B/train/BenchPress/

In [72]:
# Set training parameters.
epochs = 250
batch_size = 8
latent_dim = 128
output_dir = "synthetic_frames_enhanced"

# Start training.
train_temporal_gan_with_monitoring(train_frames, epochs=epochs, batch_size=batch_size, latent_dim=latent_dim, output_dir=output_dir)

Initializing TemporalGAN...
Building generator with residual blocks...
Generator built.
Building simplified discriminator...
Discriminator built.
TemporalGAN initialized.

Epoch 1/250
Epoch 1: gen_loss=1.0633, disc_loss=1.1632

Epoch 2/250
Epoch 2: gen_loss=0.9949, disc_loss=1.2423

Epoch 3/250
Epoch 3: gen_loss=1.0336, disc_loss=1.1981

Epoch 4/250
Epoch 4: gen_loss=1.0921, disc_loss=1.1730

Epoch 5/250
Epoch 5: gen_loss=1.2033, disc_loss=1.1132

Epoch 6/250
Epoch 6: gen_loss=1.3057, disc_loss=1.0509

Epoch 7/250
Epoch 7: gen_loss=1.4033, disc_loss=1.0017

Epoch 8/250
Epoch 8: gen_loss=1.4966, disc_loss=0.9921

Epoch 9/250
Epoch 9: gen_loss=1.5935, disc_loss=0.9118

Epoch 10/250
Epoch 10: gen_loss=1.6455, disc_loss=0.9267

Epoch 11/250
Epoch 11: gen_loss=1.7555, disc_loss=0.8918

Epoch 12/250
Epoch 12: gen_loss=1.8419, disc_loss=0.8579

Epoch 13/250
Epoch 13: gen_loss=1.8886, disc_loss=0.8406

Epoch 14/250
Epoch 14: gen_loss=1.9999, disc_loss=0.8120

Epoch 15/250
Epoch 15: gen_loss=2.

In [None]:
# Set training parameters.
epochs = 10
batch_size = 8
latent_dim = 128
output_dir = "synthetic_frames_test3(10epoch)"

# Start training.
train_temporal_gan_with_monitoring(train_frames, epochs=epochs, batch_size=batch_size, latent_dim=latent_dim, output_dir=output_dir)