In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
#Environment setup
!pip install torch torchvision torchaudio
!pip install opencv-python



In [8]:
#Data preparation
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import cv2
import numpy as np

class VideoDataset(Dataset):
    def __init__(self, video_dir, transform=None, seq_len=16):
        self.video_dir = video_dir
        self.transform = transform
        self.seq_len = seq_len
        # Check if the provided path is a directory or a file
        if os.path.isdir(video_dir):
            self.video_files = [os.path.join(video_dir, f) for f in os.listdir(video_dir) if f.endswith('.mp4')]
        else:
            self.video_files = [video_dir] # If it's a file, create a list with the single file path

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_path = self.video_files[idx]
        cap = cv2.VideoCapture(video_path)
        frames = []

        while len(frames) < self.seq_len:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, (64, 64))

            # Apply ToTensor transform to each frame individually
            if self.transform:
                frame = self.transform(frame)

            frames.append(frame)

        cap.release()

        if len(frames) < self.seq_len:
            frames.extend([frames[-1]] * (self.seq_len - len(frames)))

        # Stack frames after applying transforms
        frames = torch.stack(frames, dim=0) # Use torch.stack for tensors

        return frames # Return tensor of stacked frames

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Provide the directory containing the video file
video_dataset = VideoDataset(video_dir='/content/drive/MyDrive/Gen_DL Project @Sushant/Merged_Trim', transform=transform)
video_loader = DataLoader(video_dataset, batch_size=4, shuffle=True, num_workers=4)

In [9]:
#Defining the generator and discriminator
class Generator(nn.Module):
    def __init__(self, latent_dim=100, channels=3):
        super(Generator, self).__init__()
        self.init_size = 64 // 4
        self.l1 = nn.Sequential(nn.Linear(latent_dim, 128 * self.init_size ** 2))

        self.conv_blocks = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, channels, 3, stride=1, padding=1),
            nn.Tanh()
        )

    def forward(self, z):
        out = self.l1(z)
        out = out.view(out.shape[0], 128, self.init_size, self.init_size)
        img = self.conv_blocks(out)
        return img

class Discriminator(nn.Module):
    def __init__(self, channels=3):
        super(Discriminator, self).__init__()
        def discriminator_block(in_filters, out_filters, bn=True):
            block = [nn.Conv2d(in_filters, out_filters, 3, 2, 1), nn.LeakyReLU(0.2, inplace=True), nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        self.model = nn.Sequential(
            *discriminator_block(channels, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )

        ds_size = 64 // 2 ** 4
        self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), nn.Sigmoid())

    def forward(self, img):
        out = self.model(img)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)
        return validity

In [10]:
#Training loop
import torchvision.utils as vutils
import os

# Hyperparameters
latent_dim = 100
lr = 0.0002
b1 = 0.5
b2 = 0.999
n_epochs = 100
sample_interval = 400

# Initialize generator and discriminator
generator = Generator(latent_dim)
discriminator = Discriminator()

# Loss function
adversarial_loss = torch.nn.BCELoss()

# Optimizers
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(b1, b2))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(b1, b2))

Tensor = torch.cuda.FloatTensor if torch.cuda.is_available() else torch.FloatTensor
# Create the 'images' directory if it doesn't exist
os.makedirs("images", exist_ok=True)

for epoch in range(n_epochs):
    for i, videos in enumerate(video_loader):

        # Adversarial ground truths
        valid = Tensor(videos.size(0), 1).fill_(1.0)
        fake = Tensor(videos.size(0), 1).fill_(0.0)

        # Configure input
        real_videos = videos.type(Tensor)

        # -----------------
        #  Train Generator
        # -----------------

        optimizer_G.zero_grad()

        # Sample noise as generator input
        z = Tensor(np.random.normal(0, 1, (videos.shape[0], latent_dim)))

        # Generate a batch of videos
        gen_videos = generator(z)

        # Loss measures generator's ability to fool the discriminator
        g_loss = adversarial_loss(discriminator(gen_videos), valid)

        g_loss.backward()
        optimizer_G.step()

        # ---------------------
        #  Train Discriminator
        # ---------------------

        optimizer_D.zero_grad()

        # Iterate over frames in the video batch
        for frame in videos.transpose(0, 1): # Transpose to iterate over frames
            # Measure discriminator's ability to classify real from generated samples
            real_loss = adversarial_loss(discriminator(frame), valid)
            fake_loss = adversarial_loss(discriminator(gen_videos.detach()), fake)
            d_loss = (real_loss + fake_loss) / 2


        d_loss.backward()
        optimizer_D.step()

        print(f"[Epoch {epoch}/{n_epochs}] [Batch {i}/{len(video_loader)}] [D loss: {d_loss.item()}] [G loss: {g_loss.item()}]")

        batches_done = epoch * len(video_loader) + i
        if batches_done % sample_interval == 0:
            vutils.save_image(gen_videos.data[:25], f"images/{batches_done}.png", nrow=5, normalize=True)


[Epoch 0/100] [Batch 0/31] [D loss: 0.6901648044586182] [G loss: 0.695833683013916]
[Epoch 0/100] [Batch 1/31] [D loss: 0.6876206398010254] [G loss: 0.6954149007797241]
[Epoch 0/100] [Batch 2/31] [D loss: 0.6876992583274841] [G loss: 0.6958491206169128]
[Epoch 0/100] [Batch 3/31] [D loss: 0.6836988925933838] [G loss: 0.6966986656188965]
[Epoch 0/100] [Batch 4/31] [D loss: 0.6808103919029236] [G loss: 0.6968688368797302]
[Epoch 0/100] [Batch 5/31] [D loss: 0.6827840209007263] [G loss: 0.6991901993751526]
[Epoch 0/100] [Batch 6/31] [D loss: 0.6767371892929077] [G loss: 0.6992899775505066]
[Epoch 0/100] [Batch 7/31] [D loss: 0.6669030785560608] [G loss: 0.6990793347358704]
[Epoch 0/100] [Batch 8/31] [D loss: 0.6699700355529785] [G loss: 0.6977201700210571]
[Epoch 0/100] [Batch 9/31] [D loss: 0.6534278988838196] [G loss: 0.7007429599761963]
[Epoch 0/100] [Batch 10/31] [D loss: 0.6610182523727417] [G loss: 0.702817976474762]
[Epoch 0/100] [Batch 11/31] [D loss: 0.6454674601554871] [G loss: 

In [12]:
import os
import cv2

# Ensure the directory exists
if not os.path.exists('generated_frames'):
    os.makedirs('generated_frames')

# Function to save generated frames
def save_generated_frame(frame, frame_number):
    # Convert the PyTorch tensor to a NumPy array and then to BGR
    frame_np = frame.permute(1, 2, 0).numpy() * 255  # Assuming normalization was done earlier
    frame_bgr = cv2.cvtColor(frame_np.astype(np.uint8), cv2.COLOR_RGB2BGR)

    filename = os.path.join('generated_frames', f'frame_{frame_number}.png')
    cv2.imwrite(filename, frame_bgr)

# # Example usage within a loop (e.g., generating frames in a GAN)
# for frame_number in range(100):  # Assuming 100 frames to generate
#     # Replace this with your actual frame generation code
#     # Example: generate a random frame for demonstration
#     frame = torch.rand(3, 64, 64)
# # Save the generated frame
#     save_generated_frame(frame, frame_number)

import torch
import numpy as np
import torchvision.utils as vutils

# Assume generator is a pre-trained Video GAN generator
# Define the Generator class (placeholder, replace with your actual generator class)
class Generator(nn.Module):
    def __init__(self, latent_dim=100, channels=3):
        super(Generator, self).__init__()
        self.init_size = 64 // 4
        self.l1 = nn.Sequential(nn.Linear(latent_dim, 128 * self.init_size ** 2))

        self.conv_blocks = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, channels, 3, stride=1, padding=1),
            nn.Tanh()
        )

    def forward(self, z):
        out = self.l1(z)
        out = out.view(out.shape[0], 128, self.init_size, self.init_size)
        img = self.conv_blocks(out)
        return img
# Create an instance of the generator
generator = Generator()

# Save the state dictionary of the generator instance
torch.save(generator.state_dict(), 'generator.pth')
print("Generator model saved as generator.pth")
# Load the pre-trained generator model
latent_dim = 100  # Replace with the actual latent dimension of your model
generator = Generator(latent_dim)
generator.load_state_dict(torch.load('generator.pth'))
generator.eval()  # Set the generator to evaluation mode

# Generate a batch of latent vectors
num_frames = 100  # Number of frames to generate
z = torch.randn(num_frames, latent_dim)  # Generate random latent vectors

# Generate frames using the generator model
generated_frames = generator(z)

# Save the generated frames as images (optional)
for frame_number in range(num_frames):
    frame = generated_frames[frame_number]
    vutils.save_image(frame, f"generated_frames/frame_{frame_number:03d}.png", normalize=True)

# To generate a video from frames, you can use tools like OpenCV
import cv2

# Set up video writer
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('generated_video.avi', fourcc, 20.0, (64, 64))

for frame_number in range(num_frames):
    frame_path = f"generated_frames/frame_{frame_number:03d}.png"
    frame = cv2.imread(frame_path)
    out.write(frame)

out.release()
cv2.destroyAllWindows()

Generator model saved as generator.pth


In [13]:
import os

# Check if the directory exists
frame_dir = 'generated_frames'
if os.path.exists(frame_dir):
    # List files in the directory
    files = os.listdir(frame_dir)
    print(f"Found {len(files)} frames in '{frame_dir}':")
    print(files[:10])  # Print the first 10 filenames
else:
    print(f"Directory '{frame_dir}' does not exist.")


Found 100 frames in 'generated_frames':
['frame_088.png', 'frame_014.png', 'frame_050.png', 'frame_035.png', 'frame_028.png', 'frame_082.png', 'frame_093.png', 'frame_005.png', 'frame_048.png', 'frame_017.png']


In [14]:
import cv2
import os

def frames_to_video(frame_dir, output_video_path, fps=24):
    """
    Converts a directory of image frames into a video file.

    Args:
        frame_dir (str): Path to the directory containing image frames.
        output_video_path (str): Path to save the output video file.
        fps (int, optional): Frames per second for the output video. Defaults to 24.
    """

    images = [img for img in os.listdir(frame_dir) if img.endswith(".png")]
    frame = cv2.imread(os.path.join(frame_dir, images[0]))
    height, width, layers = frame.shape

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # You can use other codecs like 'XVID'
    video = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    for image in images:
        video.write(cv2.imread(os.path.join(frame_dir, image)))

    cv2.destroyAllWindows()
    video.release()
    print(f"Video saved to {output_video_path}")

In [15]:
video_frame_dir = 'generated_frames'  # This should match the directory where frames are saved
frames_to_video(video_frame_dir, 'generated_video.avi')

Video saved to generated_video.avi


In [16]:
!pip install opencv-python-headless ipython

Collecting jedi>=0.16 (from ipython)
  Downloading jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jedi
Successfully installed jedi-0.19.1


In [17]:
import cv2
from IPython.display import Video, display

# Combine saved frames into video
def frames_to_video(frame_dir, output_video):
    images = [img for img in os.listdir(frame_dir) if img.endswith(".png")]
    # Sort images based on the numeric part of the filename, handling cases without underscores
    images.sort(key=lambda x: int(x.split('_')[1].split('.')[0]) if '_' in x else int(x.split('.')[0]))

    frame = cv2.imread(os.path.join(frame_dir, images[0]))
    height, width, layers = frame.shape

    video = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*'DIVX'), 15, (width, height))

    for image in images:
        video.write(cv2.imread(os.path.join(frame_dir, image)))

    cv2.destroyAllWindows()
    video.release()

# Set the directory where frames are saved
video_frame_dir = 'generated_frames'  # Replace with your directory
output_video_path = 'generated_video.avi'

# Combine frames into video
frames_to_video(video_frame_dir, output_video_path)

# Display the generated video
display(Video(output_video_path, embed=True))