In [None]:
# Pipeline 
# Get the downstream upmixing pre-trained weight model
# Train that model with 90 unseen advesarial videos (random black-masked videos)
# Compare that model with the original pre-trained model to 160 unseen advesarial test videos
# Compare that model with the original pre-trained model to the 48 unseen adversarial tests videos


# Model Set Up and Evaluation
## COSC 89.30 Final Project
## Authors: Tai Wan Kim, Phuc Tran, Mark Lekina Rorat
Modified https://github.com/karreny/telling-left-from-right for Google Colab set up and resolved errors/dependency issues. Additional adversarial training was done on black-box randomly masked videos. More details are explained in our paper.

Pipeline: 
1. Get the downstream upmixing pre-trained weight model
2. Train that model with 90 unseen advesarial videos (random black-masked videos)
3. Compare that model with the original pre-trained model to 160 unseen advesarial test videos
4. Compare that model with the original pre-trained model to the 48 unseen adversarial tests videos

## Additional methods implemented to add noise to test set.

## Reference: K. Yang, B. Russell and J. Salamon, "Telling Left from Right: Learning Spatial Correspondence between Sight and Sound", IEEE Conference on Computer Vision and Pattern Recognition (CVPR), Virtual Conference, June 2020.

### Basics

In [None]:
!git clone https://github.com/karreny/telling-left-from-right.git

fatal: destination path 'telling-left-from-right' already exists and is not an empty directory.


In [None]:
cd telling-left-from-right/

/content/telling-left-from-right


In [None]:
!git switch upmixing-demo

Already on 'upmixing-demo'
Your branch is up to date with 'origin/upmixing-demo'.


Import Dependencies

In [None]:
import sys
sys.path.append("/content/telling-left-from-right/upmixing-final")
from unet import UpmixResnet18Scratch
import numpy as np
import os
import cv2
import torch
import torch.nn as nn
import librosa
from PIL import Image
from IPython.display import Video
from IPython.display import Audio
import subprocess
import random
import cv2
### upmix audio and compute loss
import torch.nn as nn
import soundfile as sf
from scipy.io import wavfile

Import Google Drive to Mount 

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


### Unzip the 300 Videos

These 300 videos will be used as a comparative analysis between the adversarial learned weights and the original weights for standard test losses

In [None]:
import zipfile

zip_path = '/content/drive/MyDrive/COSC89/test_300.zip'
destination_folder = '/content/'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(destination_folder)

### Process the fine tune videos

In [None]:
zip_path = '/content/drive/MyDrive/COSC89/fine_tune.zip'
destination_folder = '/content/'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(destination_folder)

### IMPORTANT: ONLY NECESSARY THE FIRST TIME
Preprocess the fine-tune videos, the test_300 videos are already preprocessed and put into the Google Drive

In [None]:
def preprocess_video(VIDEO_PATH, SAVE_DIR):
    # preprocess video using ffmpeg
    video_input_path = os.path.splitext(os.path.basename(VIDEO_PATH))[0] + ".mp4"
    video_input_path = os.path.join(SAVE_DIR, video_input_path)
    cmd = "ffmpeg -i %s -filter:v fps=fps=30 -strict -2 %s" % (VIDEO_PATH, video_input_path)
    print("Running in shell:", cmd)
    subprocess.call(cmd, shell=True)

    # extract audio using ffmpeg
    audio_input_path = os.path.splitext(os.path.basename(video_input_path))[0] + ".mp3"
    audio_input_path = os.path.join(SAVE_DIR, audio_input_path)
    cmd = "ffmpeg -i %s -f mp3 -ab 192000 -vn %s" % (video_input_path, audio_input_path)
    print("Running in shell:", cmd)
    subprocess.call(cmd, shell=True)

In [None]:
%%capture
folder_path = '/content/fine_tune/video/'
video_file_names = []

SAVE_DIR = "fine_tune_processed"
os.makedirs(SAVE_DIR + str("_train"))
os.makedirs(SAVE_DIR + str("_test"))

# if not os.path.isdir(SAVE_DIR):
#     os.makedirs(SAVE_DIR)
    
for file_name in os.listdir(folder_path):
    file_path = os.path.join(folder_path, file_name)
    if os.path.isfile(file_path):
        video_file_names.append(os.path.splitext(os.path.basename(file_name))[0])

print(f"{len(video_file_names)} videos appended to list.")

# Uncomment to save for the video file to process
for i, file_name in enumerate(video_file_names):
  video_path = folder_path + file_name + ".mp4"
  cur_save_dir = ""
  if i < 80: 
    cur_save_dir = SAVE_DIR + str("_train")
  else:
    cur_save_dir = SAVE_DIR + str("_test")
  preprocess_video(video_path, cur_save_dir)

#### Download for fine_tune zip folder for later

#### Download the Zip File

In [None]:
# create a zip file of downloaded videos
!zip -r /content/fine_tune_test.zip /content/telling-left-from-right/fine_tune_processed_test

In [None]:
!zip -r /content/fine_tune_train.zip /content/telling-left-from-right/fine_tune_processed_train

In [None]:
from google.colab import files
files.download("/content/fine_tune_train.zip") # download zipped file to loc
files.download("/content/fine_tune_test.zip") # download zipped file to loc

### Extract the fine_tune_train and fine_tune_test dataset

In [None]:
import zipfile

zip_path = '/content/drive/MyDrive/COSC89/fine_tune_train.zip'
destination_folder = '/content/'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(destination_folder)

zip_path = '/content/drive/MyDrive/COSC89/fine_tune_test.zip'
destination_folder = '/content/'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(destination_folder)

### Loading Up Two Pre-Trained Weights Model-- One will be Trained for Black-Boxed Attacks

In [None]:
print(torch.cuda.is_available()) # check GPU status

True


In [None]:
%%capture
local_path = '/content/drive/MyDrive/COSC89/upmixing-final-exp-1-flip-checkpoint-best.pth.tar' # upload model checkpoint from google drive

# load model
model = UpmixResnet18Scratch()

pretrained = (local_path)
avnet = nn.DataParallel(model)
checkpoint = torch.load(pretrained)
avnet.load_state_dict(checkpoint['state_dict'])
print("loaded pretrained model from", pretrained)
pre_model = avnet.module
pre_model.cuda()

In [None]:
%%capture
local_path = '/content/drive/MyDrive/COSC89/upmixing-final-exp-1-flip-checkpoint-best.pth.tar' # upload model checkpoint from google drive

# load model
model = UpmixResnet18Scratch()
pretrained = (local_path)
avnet = nn.DataParallel(model)
checkpoint = torch.load(pretrained)
avnet.load_state_dict(checkpoint['state_dict'])
print("loaded pretrained model from", pretrained)
adv_model = avnet.module
adv_model.cuda()

### Preliminary Stuffs to Process Stereo to Mono and Add Noise

#### Clip generator

Clip generator generates clips of video and corresponding audio for a given video and audio file.

In [None]:
'''Generate clips of video and corresponding audio for a given video and audio file.'''
class ClipGenerator(object):
    def __init__(self, video_fps=30, video_downsample_factor=5, audio_sr=16000, clip_length=2.87, hop_length=2):
        self.video_fps = video_fps
        self.video_downsample_factor = video_downsample_factor
        self.audio_sr = audio_sr
        self.clip_length = clip_length
        self.hop_length = hop_length

        self.n_video_frames = int(video_fps*clip_length)
        self.n_audio_samples = int(audio_sr*clip_length)
        
        self.n_video_frames_hop = int(video_fps*hop_length)

    def generator(self, videofile, audiofile, mask_ratio, noise=True):
        video, total_frames = load_video(videofile, mask_ratio, noise)
        audio = load_audio(audiofile)

        start_idx = 0
 

        while start_idx < total_frames - self.n_video_frames:
            yield self.get_clip(video, audio, start_idx)
            start_idx += self.n_video_frames_hop
            
        yield self.get_clip(video, audio, total_frames - self.n_video_frames, True)

    '''
    INPUT
    video: numpy array of video frames
    audio: numpy array of audio samples
    start_idx: starting video frame index for the clip
    last_clip: whether this is the last clip in the video

    OUTPUT
    dictionary {
      'start_frame': index of start video frame,
      'end_frame': index of end video frame,
      'start_audio_frame': index of start audio frame,
      'end_audio_frame': index of end audio frame,
      'video': video tensor,
      'audio': audio tensor, 
      'audio_sum_spec': spectogram for the sum of the two audio channels,
      'audio_diff_spec': spectogram for the diff of the two audio channels. (FxT) where T is the number of timesteps and F is the number of frequency bins
    }
    '''
    def get_clip(self, video, audio, start_idx, last_clip=False):
        clip = {}

        videoclip = video[start_idx : start_idx+self.n_video_frames : self.video_downsample_factor]
        videoclip = torch.from_numpy(videoclip).float()
        videoclip = videoclip.permute(3,0,1,2)
                
        if last_clip:
            audio_start_idx = audio.shape[1]-self.n_audio_samples
        else:
            audio_start_idx = int(start_idx*self.audio_sr/self.video_fps)
            
        audioclip = audio[:, audio_start_idx : audio_start_idx+self.n_audio_samples]
        
        left = audio[0, audio_start_idx : audio_start_idx+self.n_audio_samples]
        right = audio[1, audio_start_idx : audio_start_idx+self.n_audio_samples]
        mono = (left + right)/2
        monoclip = np.stack((mono, mono), axis=0)

        audio_sum = audioclip[0] + audioclip[1]
        audio_sum_spec = self._get_stft(audio_sum)
        audio_sum_spec = torch.from_numpy(audio_sum_spec).float().permute(0,2,1)

        audio_diff = audioclip[0] - audioclip[1]
        audio_diff_spec = self._get_stft(audio_diff)
        audio_diff_spec = torch.from_numpy(audio_diff_spec).float().permute(0,2,1)

        return {'start_frame': start_idx, 
                'end_frame': start_idx+self.n_video_frames, 
                'start_audio_frame': audio_start_idx, 
                'end_audio_frame': audio_start_idx+self.n_audio_samples,
                'video': videoclip.unsqueeze(0), 
                'audio': monoclip, 
                'audio_sum_spec': audio_sum_spec.unsqueeze(0),
                'audio_diff_spec': audio_diff_spec.unsqueeze(0)}

    '''returns a complex-valued spectrogram in the form of a numpy array.'''
    def _get_stft(self, raw):
        stft = librosa.core.stft(np.ascontiguousarray(raw), n_fft=512, hop_length=160, win_length=400, center=True)
        return np.stack((np.real(stft), np.imag(stft)))[:,:-1,:]

    '''converts a complex-valued spectrogram to a raw audio'''
    def stft_to_waveform(self, stft):
        stft = stft[0,:,:] + (1j * stft[1,:,:])
        raw = librosa.core.istft(stft, hop_length=160, win_length=400, center=True)
        return raw

Loading video as a tensor and adding noise

In [None]:
def load_video(videofile, mask_ratio, noise):
    capture = cv2.VideoCapture(videofile)
    cap_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) # total number of frames in the video

    v_tensor = []

    for idx in range(cap_frames):
        ret, frame = capture.read()
        if not ret:
            break
        v_tensor += [frame]

    v_tensor = [Image.fromarray(np.uint8(frame)).resize((224,224)) for frame in v_tensor] # convert numpy array to PIL Image, resized to 224x224
    v_tensor = np.stack(v_tensor)/255 # stacks the list of PIL images, normalize to [0,1]
    if noise: 
        # add random noise to video
        add_noise(v_tensor, cap_frames, mask_ratio)

    return v_tensor, cap_frames

'''set r between 0 and 1 (e.g., r=0.1 means masking out 10% of the frames)'''
def add_noise(v_tensor, cap_frames, r):
    '''Our contribution.'''
    epsilon = 0.2
    img_width = v_tensor.shape[1]
    img_height = v_tensor.shape[2]

    n = int(cap_frames * r) # number of frames to mask out
    random_integers = random.sample(range(0, cap_frames), n) # randomly sample frames to mask out

    for frame in random_integers:
        '''
        # adding noise 
        noise = np.random.uniform(size = (1, img_width, img_height, 3), low= -1* epsilon, high = epsilon)
        v_tensor[frame, :, :, :] = np.add(v_tensor[frame, :, :, :], noise)
        v_tensor[frame, :, :, :] = np.clip(v_tensor[frame, :, :, :], 0, 255)
        '''
        # masked non-sequential
        v_tensor[frame, :, :, :] = np.zeros((1, img_width, img_height, 3))
    # Uncomment below lines to generate a sample of masked video
    '''
    # Define the output video file name
    output_file = '/content/DEMOHERE.mp4'

    #Create a VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    fps = 30
    video_writer = cv2.VideoWriter(output_file, fourcc, fps, (img_width, img_height))

    # Iterate over each frame in the video array and write it to the video file
    for frame in v_tensor:
        frame = frame * 255
        frame = frame.astype(np.uint8)
        video_writer.write(frame)

    # Release the VideoWriter
    video_writer.release()
    '''

def load_audio(audiofile):
    # resample audio to a sampling rate of 16000 Hz
    # If the input audio has multiple channels and mono=True is specified, 
    # librosa.load() will take the mean across all channels to produce a mono audio signal.
    # To preserve the separate channels, set mono=False
    # print("audio")
    audio, _ = librosa.load(audiofile, mono=False, sr=16000)
    audio = audio/np.max(np.abs(audio))

    if len(audio.shape) == 1: # If the input audio is a single-channel audio, it is duplicated to form a two-channel audio array. 
        audio = np.stack((audio, audio), axis=0)

    # Return audio as a numpy array with shape (2, num_samples), where num_samples is the number of audio samples in the loaded audio file.
    # The number of audio samples in a digital audio file is simply the total number of amplitude values that make up the audio signal.
    
    # print(f"load audio of shape {audio.shape}")

    return audio

### Train the Model

In [None]:
folder_path = 'content/telling-left-from-right/fine_tune_processed_train/'
video_file_names = []

for file_name in os.listdir(folder_path):
    file_path = os.path.join(folder_path, file_name)
    if os.path.isfile(file_path):
        video_file_names.append(os.path.splitext(os.path.basename(file_name))[0])

print(f"{len(video_file_names)} videos appended to list.")

160 videos appended to list.


Train!

In [None]:
# initialization 
mask_ratio = 0.5 # set r between 0 and 1 (e.g., r=0.1 means masking out 10% of the frames)
criterion = nn.L1Loss()
v_count = 0
total_loss = 0
n_count = 0
clip_generator = ClipGenerator()
adv_model.train()
optimizer = torch.optim.SGD(adv_model.parameters(), lr=1e-3, momentum = 0.9)

for file_name in video_file_names:
    # print(f"{file_name}")
    video_input_path = "content/telling-left-from-right/fine_tune_processed_train/" + file_name + ".mp4"
    audio_input_path = "content/telling-left-from-right/fine_tune_processed_train/" + file_name + ".mp3"

    loader = clip_generator.generator(video_input_path, audio_input_path, mask_ratio, noise=True)
    for sample in loader:
      keys = adv_model.keys + ['audio_diff_spec']
      vars = {k: sample[k] for k in keys}
      vars = {k: vars[k].cuda() for k in keys}

      # compare the predicted with the ground truth
      out = adv_model(vars)
      loss = criterion(out['pred'], vars['audio_diff_spec'])

      # saves random stuffs
      start_frame = sample['start_frame']
      end_frame = sample['end_frame']
      start_audio_frame = sample['start_audio_frame']
      end_audio_frame = sample['end_audio_frame']
      
      total_loss += loss.item()
      n_count += 1

      # Backpropagation
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

      # free GPU memory usage
      for k in keys:
        vars[k] = vars[k].cpu()

      torch.cuda.empty_cache()

    v_count += 1


print("===================================================")
print(f"Masking ratio: {mask_ratio}")
print(f"Evaluation on {v_count} videos / {n_count} clips.")
print(f"total_loss: {total_loss}")
print(f"mean_loss: {total_loss/n_count}")

Masking ratio: 0.5
Evaluation on 160 videos / 800 clips.
total_loss: 107.32094969693571
mean_loss: 0.13415118712116963


### Test with the Best-Trained Model and the Adversarial Model

In [None]:
folder_path = 'content/telling-left-from-right/fine_tune_processed_test/'
video_file_names = []

for file_name in os.listdir(folder_path):
    file_path = os.path.join(folder_path, file_name)
    if os.path.isfile(file_path):
        video_file_names.append(os.path.splitext(os.path.basename(file_name))[0])

print(f"{len(video_file_names)} videos appended to list.")

48 videos appended to list.


In [None]:
# initialization 
mask_ratio = 0.5 # set r between 0 and 1 (e.g., r=0.1 means masking out 10% of the frames)
criterion = nn.L1Loss()
v_count = 0
total_loss = 0
n_count = 0
clip_generator = ClipGenerator()
adv_model.eval()

for file_name in video_file_names:
    # print(f"{file_name}")
    video_input_path = "content/telling-left-from-right/fine_tune_processed_test/" + file_name + ".mp4"
    audio_input_path = "content/telling-left-from-right/fine_tune_processed_test/" + file_name + ".mp3"

    loader = clip_generator.generator(video_input_path, audio_input_path, mask_ratio, noise=True)
    for sample in loader:
      keys = adv_model.keys + ['audio_diff_spec']
      vars = {k: sample[k] for k in keys}
      vars = {k: vars[k].cuda() for k in keys}

      # compare the predicted with the ground truth
      out = adv_model(vars)
      loss = criterion(out['pred'], vars['audio_diff_spec'])

      # saves random stuffs
      start_frame = sample['start_frame']
      end_frame = sample['end_frame']
      start_audio_frame = sample['start_audio_frame']
      end_audio_frame = sample['end_audio_frame']
      
      total_loss += loss.item()
      n_count += 1

      # free GPU memory usage
      for k in keys:
        vars[k] = vars[k].cpu()

      torch.cuda.empty_cache()

    v_count += 1


print("===================================================")
print(f"Masking ratio: {mask_ratio}")
print(f"Evaluation on {v_count} videos / {n_count} clips.")
print(f"total_loss: {total_loss}")
print(f"mean_loss: {total_loss/n_count}")

Masking ratio: 0.5
Evaluation on 48 videos / 240 clips.
total_loss: 33.74488641601056
mean_loss: 0.140603693400044


In [None]:
# initialization 
mask_ratio = 0.5 # set r between 0 and 1 (e.g., r=0.1 means masking out 10% of the frames)
criterion = nn.L1Loss()
v_count = 0
total_loss = 0
n_count = 0
clip_generator = ClipGenerator()
pre_model.eval()

for file_name in video_file_names:
    # print(f"{file_name}")
    video_input_path = "content/telling-left-from-right/fine_tune_processed_test/" + file_name + ".mp4"
    audio_input_path = "content/telling-left-from-right/fine_tune_processed_test/" + file_name + ".mp3"

    loader = clip_generator.generator(video_input_path, audio_input_path, mask_ratio, noise=True)
    for sample in loader:
      keys = pre_model.keys + ['audio_diff_spec']
      vars = {k: sample[k] for k in keys}
      vars = {k: vars[k].cuda() for k in keys}

      # compare the predicted with the ground truth
      out = pre_model(vars)
      loss = criterion(out['pred'], vars['audio_diff_spec'])

      # saves random stuffs
      start_frame = sample['start_frame']
      end_frame = sample['end_frame']
      start_audio_frame = sample['start_audio_frame']
      end_audio_frame = sample['end_audio_frame']
      
      total_loss += loss.item()
      n_count += 1

      # free GPU memory usage
      for k in keys:
        vars[k] = vars[k].cpu()

      torch.cuda.empty_cache()

    v_count += 1


print("===================================================")
print(f"Masking ratio: {mask_ratio}")
print(f"Evaluation on {v_count} videos / {n_count} clips.")
print(f"total_loss: {total_loss}")
print(f"mean_loss: {total_loss/n_count}")

Masking ratio: 0.5
Evaluation on 48 videos / 240 clips.
total_loss: 33.10120473615825
mean_loss: 0.13792168640065938
