# Video GAN

This is a replication exercise for learning, an experiment in novel artistic outputs, and (hopefully, eventually) a research contribution to GANs as applied to video synthesis. The goal is to generate abstracted videos evoking the subjective affect of certain objects, actions, and scenes in motion, through generative adversarial networks trained on input videos of the desired subjects.

Generative models are based on ["Generating Videos with Scene Dynamics" (2016)](http://www.cs.columbia.edu/~vondrick/tinyvideo/paper.pdf) and ["Improving Video Generation for Multi-functional Applications" (2017)](https://arxiv.org/pdf/1711.11453.pdf). Code is based on the latter's [GitHub repo](https://github.com/bernhard2202/improved-video-gan/) with updates for deprecated functions and eventually adjustments for intended outputs.

The creative part of this project is more nebulous for now but will require manipulating the generated videos such that they're able to be projected in a live setting paired with musical compositions. At minimum this will require interpolating the outputs which will be pretty small, or figuring out a way to generate larger outputs without significant runtime cost.

Using Google Colaboratory for TPU access. Will refactor into Python module once validated.

In [0]:
import cv2
import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import tensorflow as tf
import time

from tensorflow.python.client import device_lib
device_lib.list_local_devices()

In [0]:
from google.colab import drive
drive.mount('/content/drive/')

## Settings

In [0]:
# Video read settings
VIDEO_DIR = '/content/drive/My Drive/Colab Data/video-gan'
VIDEO_SIZE = [64, 64]
INPUT_SIZE = [240, 320]
FRAME_INT = 1
FRAME_CAP = 32

# Training parameters
BUFFER_SIZE = 100000
BATCH_SIZE = 1
EPOCHS = 50
Z_DIM = 100
CRIT_ITERATIONS = 5

# Adam optimizer
LEARNING_RATE = 0.0001
BETA1 = 0.5

# Output frequency
SAMPLE_RATE = 100
SAVE_RATE = 100
NUM_OUT = 1

# Use eager execution
tf.enable_eager_execution()

## Video Processing

### Extract frames

In [0]:
videos = glob.glob(os.path.join(VIDEO_DIR, '*.avi'))

# For each video in directory, capture every frame_int number of frames and 
# store in array where each frame is stacked horizontally.
for vnum, video in enumerate(videos):
    description = os.path.splitext(video)[0]
    vidcap = cv2.VideoCapture(os.path.join(VIDEO_DIR, video))
    success, image = vidcap.read()
    output = np.zeros((FRAME_CAP * image.shape[0], image.shape[1], image.shape[2]))
    loc, frames = 0, 0
    while success and frames < FRAME_CAP:
        output[frames * image.shape[0]:(frames + 1) * image.shape[0]] = image
        loc += FRAME_INT
        frames += 1
        vidcap.set(cv2.CAP_PROP_POS_MSEC, loc)
        success, image = vidcap.read()
    input_size = image.shape[:2]
    cv2.imwrite(os.path.join(VIDEO_DIR, description + str(vnum) + '.jpg'), np.float32(output))
vidcap.release()

### Read frames into tf data object

In [0]:
# Reads video image, decodes into tensor, resized to desired shape.
def parse_video(filename):
    image_string = tf.read_file(filename)
    image_decoded = tf.cast(tf.image.decode_jpeg(image_string, channels=3), tf.float32)
    frames = tf.reshape(image_decoded, [-1, input_size[0], input_size[1], 3])
    image_resized = tf.image.resize_images(frames, VIDEO_SIZE)
    return tf.subtract(tf.math.divide(image_resized, 127.5), 1.0)

# File name vector.
all_image_paths = glob.glob(os.path.join(VIDEO_DIR, '*.jpg'))
all_image_paths = [str(path) for path in all_image_paths]

# Construct dataset, shuffle, and repeat.
# TODO: Try shuffling after mapping?
dataset = tf.data.Dataset.from_tensor_slices(all_image_paths).shuffle(BUFFER_SIZE)
dataset = dataset.map(parse_video).batch(BATCH_SIZE)

## Utilities

### Custom Layers

In [0]:
class DiscriminatorLayer(tf.keras.Model):
    def __init__(self, input_dim, output_dim, kernel_size, strides, normalize=False):
        super(DiscriminatorLayer, self).__init__(name='')
        self.filters = input_dim * output_dim * kernel_size ** 3
        self.k = kernel_size
        self.strides = strides
        self.normalize = normalize

    def call(self, input_tensor):
        out = tf.keras.layers.Conv3D(filters=self.filters, 
                                     kernel_size=self.k,
                                     strides=self.strides,
                                     padding='same',
                                     kernel_initializer='he_normal')(input_tensor)
        if self.normalize:
            out = tf.contrib.layers.layer_norm(out)
            
        return tf.nn.leaky_relu(out)
    
class LinearLayer(tf.keras.Model):
    def __init__(self, output_dim, resize_in=None, resize_out=None):
        super(LinearLayer, self).__init__(name='')
        self.out_dim = output_dim
        self.resize_in = resize_in
        self.resize_out = resize_out

    def call(self, input_tensor):
        if self.resize_in is not None:
            input_tensor = tf.reshape(input_tensor, self.resize_in)
        shape = input_tensor.shape
        matrix = tf.random.normal([shape[1], self.out_dim], stddev=0.01)
        out = tf.matmul(input_tensor, matrix)
        if self.resize_out is not None:
            out = tf.reshape(out, self.resize_out)
        
        return out
        

### Output Utilities

In [0]:
def write_avi(batch, directory, name=''):
    writer = cv2.VideoWriter(out, cv2.VideoWriter_fourcc('X', 'V', 'I', 'D'),
                             frate, (video_size[0], video_size[1]))
    for fnum, frame in enumerate(batch):
        writer.write(np.uint8(frame))
    writer.release()

    
def convert_image(images, batch_size, col=5, row=5):
    images = tf.image.convert_image_dtype(tf.div(tf.add(images, 1.0), 2.0), tf.uint8)
    images = [image for image in tf.split(images, batch_size, axis=0)]
    rows = []
    for i in range(row):
        rows.append(tf.concat(images[col * i + 0:col * i + col], 2))
    image = tf.concat(rows, 1)
    return tf.image.encode_jpeg(tf.squeeze(image, [0]))

## Model

In [0]:
class VideoGAN():
  
    def __init__(self,
                 input,
                 batch_size,
                 frame_size,
                 crop_size,
                 learning_rate,
                 z_dim,
                 beta1,
                 critical_iterations,
                 num_out,
                 epochs):
        self.critical_iterations = critical_iterations
        self.crop_size = crop_size
        self.beta1 = beta1
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.z_dim = z_dim
        self.frame_size = frame_size
        self.videos = input
        self.num_out = num_out
        self.epochs = epochs
        
        self.generator = self.generator_model()
        self.discriminator = self.discriminator_model()

        self.gen_optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate, beta1=self.beta1, beta2=0.999)
        self.disc_optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate, beta1=self.beta1, beta2=0.999)

    def train_step(self, videos):
        # Generate noise from normal distribution
        noise = tf.random_normal([self.batch_size, self.z_dim])

        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            generated_videos = self.generator(noise, training=True)

            real_output = self.discriminator(videos, training=True)
            generated_output = self.discriminator(generated_videos, training=True)

            gen_loss = self.generator_loss(generated_output)
            disc_loss = self.discriminator_loss(real_output, generated_output)

        gradients_of_generator = gen_tape.gradient(gen_loss, self.generator.variables)
        gradients_of_discriminator = disc_tape.gradient(disc_loss, self.discriminator.variables)

        self.gen_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.variables))
        self.disc_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.variables))        
       
    def train(self):
        # Generate noise from normal distribution
        random_vector_for_generation = tf.random_normal([self.num_out, self.z_dim])
        for epoch in range(self.epochs):
            start = time.time()

            for video in self.videos:
                self.train_step(video)

            display.clear_output(wait=True)
            self.generate_and_save(self.generator,
                                   epoch + 1,
                                   random_vector_for_generation)

            # Save every n intervals
            if (epoch + 1) % self.save_int == 0:
                checkpoint.save(file_prefix = checkpoint_prefix)

            print ('Time taken for epoch {} is {} sec'.format(epoch + 1,
                                                              time.time()-start))
        # Generate samples after final epoch
        display.clear_output(wait=True)
        self.generate_and_save(generator,
                               self.epochs,
                               random_vector_for_generation)

    def generator_model(self):
        model = tf.keras.Sequential()
        
        # Linear block
        model.add(LinearLayer(resize_out=[-1, 2, 4, 4, 512], output_dim=512 * 4 * 4 * 2))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.LeakyReLU())
        
        # Convolution block 1
        model.add(tf.keras.layers.Conv3DTranspose(filters=512, kernel_size=4, strides=2, padding='same', 
                                                  kernel_initializer='he_normal', use_bias=True))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.LeakyReLU())
        
        # Convolution block 2
        model.add(tf.keras.layers.Conv3DTranspose(filters=256, kernel_size=4, strides=2, padding='same', 
                                                  kernel_initializer='he_normal', use_bias=True))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.LeakyReLU())
        
        # Convolution block 3
        model.add(tf.keras.layers.Conv3DTranspose(filters=128, kernel_size=4, strides=2, padding='same', 
                                                  kernel_initializer='he_normal', use_bias=True))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.LeakyReLU())
        
        # Convolution block 4
        model.add(tf.keras.layers.Conv3DTranspose(filters=64, kernel_size=4, strides=2, padding='same', 
                                                  kernel_initializer='he_normal', use_bias=True, activation='tanh'))

        return model

    def discriminator_model(self):
        initial_dim = self.crop_size
        model = tf.keras.Sequential()
        
        # Convolution block 1
        model.add(DiscriminatorLayer(input_dim=3, output_dim=initial_dim, kernel_size=4, strides=2))
                  
        # Convolution block 2
        model.add(DiscriminatorLayer(input_dim=initial_dim, output_dim=initial_dim * 2, kernel_size=4, strides=2))
                  
        # Convolution block 3
        model.add(DiscriminatorLayer(input_dim=initial_dim * 2, output_dim=initial_dim * 4, kernel_size=4, strides=2))
        
        # Convolution block 4
        model.add(DiscriminatorLayer(input_dim=initial_dim * 4, output_dim=initial_dim * 8, kernel_size=4, strides=2))
        
        # Convolution block 5
        model.add(DiscriminatorLayer(input_dim=initial_dim * 8, output_dim=1, kernel_size=4, strides=2, normalize=False))
                  
        # Linear block
        model.add(LinearLayer(resize_in=[self.batch_size, -1], output_dim=1))
                  
        return model

    def generator_loss(self, generated_output):
        return -tf.reduce_mean(generated_output)

    def discriminator_loss(self, real_output, generated_output):
        # Discriminator uses Wasserstein earth-mover loss function
        d_cost = tf.reduce_mean(generated_output) - tf.reduce_mean(real_output)
        alpha = tf.random_uniform(
            shape=[self.batch_size, 1],
            minval=0.,
            maxval=1.
        )
        dim = self.frame_size * self.crop_size * self.crop_size * 3
        real = tf.reshape(self.videos, [self.batch_size, dim])
        fake = tf.reshape(generated_output, [self.batch_size, dim])
        diff = fake - real
        interpolates = real + (alpha * diff)
        interpolates_reshaped = tf.reshape(interpolates, 
                                           [self.batch_size, self.frame_size, self.crop_size, self.crop_size, 3])
        with tf.GradientTape() as tape:
            d_hat, _ = self.discriminator(interpolates_reshaped, reuse=True)
        gradients = tape.gradient(d_hat, interpolates)[0]
        slopes = tf.sqrt(tf.reduce_sum(tf.square(gradients), reduction_indices=[1]))
        gradient_penalty = tf.reduce_mean((slopes - 1.) ** 2)
        
        return d_cost + 10 * gradient_penalty

    def generate_and_save(self, model, epoch, test_input):
        # TODO: This is for images not videos right now
        # Make sure the training parameter is set to False because we
        # don't want to train the batchnorm layer when doing inference.
        predictions = model(test_input, training=False)

        fig = plt.figure(figsize=(4,4))

        for i in range(predictions.shape[0]):
            plt.subplot(4, 4, i+1)
            plt.imshow(predictions[i, :, :, 0] * 127.5 + 127.5, cmap='gray')
            plt.axis('off')

        plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
        plt.show()

## Train

In [0]:
model = VideoGAN(dataset,
                 batch_size=BATCH_SIZE,
                 frame_size=FRAME_CAP,
                 crop_size=VIDEO_SIZE[0],
                 learning_rate=LEARNING_RATE,
                 z_dim=Z_DIM,
                 beta1=BETA1,
                 critical_iterations=CRIT_ITERATIONS,
                 epochs=EPOCHS,
                 num_out=NUM_OUT)
model.train()