In [1]:
import math
import random

import numpy as np
import torch
from torch import nn
from torchvision.io import VideoReader
from torchvision.io import write_jpeg, read_image
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import transforms
import os
from note_detector.python.video_note_detector import generate_labels
import matplotlib.pyplot as plt

device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [2]:
# Function to convert videos to valid training data that we can use, in the structure shown in ./training_data.
# Calling this with a video on an existing subfolder will convert that video to training data and ADD it to the existing subfolder
# If the subfolder does not exist, the training data created from the video will be added to a new directory (indicated by out_subfolder_name)

def convert_video_to_training_data(vid_path, vid_name, out_subfolder_name, resize_h, resize_w):
    out_dir = "./training_data/%s" % out_subfolder_name
    out_npy_file = out_dir + "/labels.npy"
    out_frames_dir = out_dir + "/frames"
    
    start_num = 0
    cur_labels = []
    if os.path.isdir(out_dir):
        if os.path.isfile(out_npy_file):
            cur_labels = np.load(out_npy_file, allow_pickle=True).tolist()
            start_num = len(cur_labels)
        if not os.path.isdir(out_frames_dir):
            os.mkdir(out_frames_dir)
    else:
        os.mkdir(out_dir)
        os.mkdir(out_frames_dir)
    
    print("processing file", vid_name)
    
    # get frames of video and save sequentially in training_data/frames
    reader = VideoReader(vid_path + vid_name, "video")
    for frame in reader:
        cur_img = frame["data"]
        img_to_save = transforms.Resize((resize_h, resize_w))(cur_img)
        img_name = out_frames_dir + "/%s.jpg" % start_num
        start_num += 1
        write_jpeg(img_to_save, img_name)

    print("done w video read")

    # use library to get labels for each frame
    cur_video_labels, num_frames = generate_labels(vid_path, vid_name)
    tmp_label_aggregator = [[] for _ in range(num_frames)]
    for frame, note in cur_video_labels:
        tmp_label_aggregator[int(frame)].append(note)
    cur_labels.extend(tmp_label_aggregator)
    np.save(out_npy_file, np.array(cur_labels, dtype=object))

    print("done w labeling")

In [3]:
# Convert a list of videos to training data using the function above

def generate_training_data_set(prefix_video_path, list_of_vids, folder_subname, resize_h, resize_w):
    for vid in list_of_vids:
        convert_video_to_training_data(prefix_video_path, vid, folder_subname, resize_h, resize_w)
    print("!!! Done creating training set !!!")

In [4]:
# Call the above function to generate usable training data

FRAME_HEIGHT = 1080
FRAME_WIDTH = 1920

# set 1 contains all the training data we obtained
TRAINING_SET_1_NAME = "set_1"

answer = input("Which training set would you like to generate or modify? (#/n) ; Make sure the video list is up to date")

vids_path = "./vids/"
video_list = [ ] # modify before making a change to a training set

if answer == "1":
    generate_training_data_set(vids_path, video_list, TRAINING_SET_1_NAME, FRAME_HEIGHT, FRAME_WIDTH)
else:
    print("Cell passed")

Cell passed


In [5]:
# define the one hot encoding structure of the labels so it is consistent
# The 60 possible notes are C_2 - C_6, C#_2 - C#_6, ..., B_2 - B_6, and we encode these in a vector of length 60, where a 1 means the note is being played and 0 means it is not

# given an instance of the labeled training data, the corresponding encoded label (of 0s or 1s) can be returned, and vice versa

ordered_notes = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]
label_map = {}
label_num = 0
for note_letter in ordered_notes:
    for subdiv in range(2, 7):
        cur_label = str(note_letter) + "_" + str(subdiv)
        label_map[cur_label] = label_num
        label_num += 1
        

def convert_to_onehot(note_arr):
    ret = [0 for _ in range(len(label_map))]
    for note in note_arr:
        ret[label_map[note]] = 1
    return ret

def convert_from_onehot(onehot):
    ret = []
    for idx, val in enumerate(onehot):
        if val == 1:
            ret.append(list(label_map.keys())[list(label_map.values()).index(idx)])
    return ret

In [6]:
# define the dataset based on the structure of the training data generation above
# The labels are converted to length 60 vectors as described in the cell above

class NoteDataset(Dataset):
    def __init__(self, train_dataset_name):
        label_path = "./training_data/%s/labels.npy" % train_dataset_name
        self.frames_path = "./training_data/%s/frames" % train_dataset_name
        self.labels = np.load(label_path, allow_pickle=True).tolist()
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return read_image(self.frames_path + "/%s.jpg" % idx).float(), torch.tensor(convert_to_onehot(self.labels[idx])).float()

In [7]:
# define the CNN

# each input to the network is 3x1080x1920

class NeuralNetwork(nn.Module):
    def __init__(self, drop_prob):
        super().__init__()
        self.stack = nn.Sequential(
            # input 3x1080x1920
            nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # nn.BatchNorm2d(32),
            nn.Dropout2d(drop_prob),
            
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # nn.BatchNorm2d(64),
            nn.Dropout2d(drop_prob),
            
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # nn.BatchNorm2d(128),
            nn.Dropout2d(drop_prob),
            
            # input 128x17x30
            nn.Flatten(),
            # input one single array of values of length 128 * 17 * 30
            nn.Linear(128 * 17 * 30, 512),
            nn.ReLU(),
            nn.Dropout(drop_prob),
            nn.Linear(512, 60)
        )

    def forward(self, x):
        logits = self.stack(x)
        return logits

In [8]:
# Since there are 60 labels, and most of them are 0 for any given frame, the model can get great accuracy by predicting all 0s (i.e. no notes being played) for every frame.
# To prevent this, we use custom weights in the BCE loss function (weights defined by this function) to heavily penalize an incorrect prediction when the true label is 1.
# This method works, and forces the network to know that it needs to predict 1-3 notes at any given time unless there is truly nothing being played

def find_weights_from_batch(batch):
    HIGH_WEIGHT_VAL = 5
    LOW_WEIGHT_VAL = 1
    
    copy = batch.cpu().numpy()
    sums = np.sum(copy, axis=0)
    weights = []
    for s in sums:
        if s > 0:
            weights.append(HIGH_WEIGHT_VAL)
        else:
            weights.append(LOW_WEIGHT_VAL)
    return torch.tensor(weights).float().to(device)

In [9]:
# Train the given model using an SGD optimizer and custom BCE loss, as described in the cell above

def train_model(model, dataloader, epochs, lr):
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    
    loss_over_time = []
    cu_loss = 0
    
    for e in range(epochs):
        model.train()
        
        for batch, (X, y) in enumerate(dataloader):
            X = X.to(device)
            y = y.to(device)
            
            wts = find_weights_from_batch(y)
            loss_fnc = nn.BCEWithLogitsLoss(pos_weight=wts)
            
            pred = model(X)
            
            loss_val = loss_fnc(pred, y)
            loss_val.backward()
            
            optimizer.step()
            optimizer.zero_grad()
            
            loss_over_time.append(loss_val.item())
            cu_loss += loss_val.item()
            
            if batch % int(128/len(X)) == 0:
                loss, current = cu_loss, (batch + 1) * len(X)
                cu_loss = 0
                size = len(dataloader.dataset)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
                
    return loss_over_time

In [10]:
# Incrementally train a model given a set of data generated using the pipeline at the start of this notebook
# If the model exists, it is loaded, trained, and re-saved. Otherwise, a new model is created and saved

def incrementally_train(model_name, dataset_name, shuffle=False, batch_size=4, epochs=2, lr=.001, dropout=.2):
    model_file_name = "./models/%s.pth" % model_name
    cur_model = None
    
    if os.path.isfile(model_file_name):
        cur_model = torch.load(model_file_name).to(device)
    else:
        cur_model = NeuralNetwork(dropout).to(device)

    cur_dataset = NoteDataset(dataset_name)
    dataloader = DataLoader(cur_dataset, batch_size=batch_size, shuffle=shuffle)
    
    loss_graph = train_model(cur_model, dataloader, epochs, lr)
    
    torch.save(cur_model, model_file_name)
    
    loss_name = "./losses/%s_%s_%s" % (model_name, random.randint(0, 99999), random.randint(0, 99999))
    np.save(loss_name, np.array(loss_graph))
    
    print("!!! Done training model !!!")

In [11]:
MODEL_NAME_MAIN = "main"

In [12]:
answer = input("Train the main model on training set 1?")

if answer == "y":
    incrementally_train(MODEL_NAME_MAIN, TRAINING_SET_1_NAME, shuffle=True, batch_size=1, epochs=1, lr=.001, dropout=.35)

In [13]:
# make test data (stored in training_data folder, but the set name contains "TEST")

TEST_SET_NAME_1 = "TEST_mary_had_a_little_lamb"
TEST_SET_NAME_2 = "TEST_fast"
TEST_SET_NAME_3 = "TEST_robust"

answer = input("Which test set would you like to generate or modify? (#/n) ; Make sure the video list is up to date")

vids_path_test = "./vids/"
video_list_test = [ ] # modify before making a change to a test set

if answer == "1":
    generate_training_data_set(vids_path_test, video_list_test, TEST_SET_NAME_1, FRAME_HEIGHT, FRAME_WIDTH)
elif answer == "2":
    generate_training_data_set(vids_path_test, video_list_test, TEST_SET_NAME_2, FRAME_HEIGHT, FRAME_WIDTH)
elif answer == "3":
    generate_training_data_set(vids_path_test, video_list_test, TEST_SET_NAME_3, FRAME_HEIGHT, FRAME_WIDTH)
else:
    print("Cell passed")

Cell passed


In [14]:
# Test the model on a given converted video, and output predictions for each frame along with the true labels for each frame

def test(model_name, test_set_name, show_losses=False):
    eval_model = torch.load("./models/%s.pth" % model_name)

    test_dataset = NoteDataset(test_set_name)
    test_dataloader = DataLoader(test_dataset, batch_size=15, shuffle=False)
    test_loss_function = nn.BCEWithLogitsLoss()
    eval_model.eval()
    test_predictions = []
    test_gt = []
    with torch.no_grad():
        for batch_num, (Xt, yt) in enumerate(test_dataloader):
            Xt = Xt.to(device)
            yt = yt.to(device)
            
            cur_prediction = eval_model(Xt)
            
            sigmoid = torch.nn.Sigmoid()
            
            # BCEWithLogitsLoss() automatically applies sigmoid, during testing we need to apply it (and clamp the values to 0 or 1)
            cur_prediction_activated = sigmoid(cur_prediction).cpu().numpy().tolist()
            cur_ground_truth = yt.cpu().numpy().tolist()
            for z in range(len(cur_prediction_activated)):
                clamped_predictions = [1 if i > .5 else 0 for i in cur_prediction_activated[z]]
                test_predictions.append(convert_from_onehot(clamped_predictions))
                test_gt.append(convert_from_onehot(cur_ground_truth[z]))
            
            # loss again computed by directly passing logits to BCEWithLogitsLoss()
            if show_losses:
                print("loss: ", test_loss_function(cur_prediction, yt).item())
    
    print("Done!")
    return [test_predictions, test_gt]

In [15]:
import pandas as pd

# nicely display the predictions and the true values for the notes being played in each frame side-by-side

def display_raw_comparisons(predictions, ground_truths):
    df = pd.DataFrame({"Actual": ground_truths, "Predictions": predictions})
    df.index.name = "Frame #"
    print(df.to_string())

In [16]:
# Show a more digestible comparison of the predictions vs the true values by aggregating predictions over each set of 10 frames, and cutting out low-probability predicitons

def display_final_tabs(predictions, ground_truths):
    frame_divisions = 10
    probability_cutoff = .2
    predicted_tab = []
    gt_tab = []
    
    cur_pred_dict = {}
    cur_gt_dict = {}
    cur_pred_tot = 0
    cur_gt_tot = 0
    for i in range(len(predictions)):
        for note in predictions[i]:
            cur_pred_dict[note] = 1 if note not in cur_pred_dict else cur_pred_dict[note] + 1
            cur_pred_tot += 1
        for note in ground_truths[i]:
            cur_gt_dict[note] = 1 if note not in cur_gt_dict else cur_gt_dict[note] + 1
            cur_gt_tot += 1
        
        if i != 0 and i % frame_divisions == 0:
            probs_predictions = sorted(cur_pred_dict.keys(), key=cur_pred_dict.get, reverse=True)
            probs_gt = sorted(cur_gt_dict.keys(), key=cur_gt_dict.get, reverse=True)
            
            probs_predictions = [(note, round(cur_pred_dict[note] / cur_pred_tot, 3)) for note in probs_predictions]
            probs_gt = [(note, round(cur_gt_dict[note] / cur_gt_tot, 3)) for note in probs_gt]
            
            probs_predictions = [t for t in probs_predictions if t[1] > probability_cutoff]
            probs_gt = [t for t in probs_gt if t[1] > probability_cutoff]
            
            predicted_tab.append(probs_predictions)
            gt_tab.append(probs_gt)
            
            cur_pred_dict = {}
            cur_gt_dict = {}
            cur_pred_tot = 0
            cur_gt_tot = 0
    
    df = pd.DataFrame({"Actual": gt_tab, "Predictions": predicted_tab})
    df.index.name = "Frame set (of 10)"
    print(df.to_string())
    return predicted_tab, gt_tab
                

In [17]:
# calculate the average probability error as described in our project report

def batch_frame_metric(prediction_probability_batches, gt_probability_batches):
    metric = 0
    # do not count the first and last couple of batches to account for un-trimmed videos
    # for 30fps videos, 3 batches is 1 second if batch size is 10
    range_of_batches = range(3, len(prediction_probability_batches) - 3)
    for i in range_of_batches:
        cur_frame_pred_probs = prediction_probability_batches[i]
        cur_frame_gt_probs = gt_probability_batches[i]
        for note, gt_prob in cur_frame_gt_probs:
            pred_prob = 0
            for other_note, other_prob in cur_frame_pred_probs:
                if other_note == note:
                    pred_prob = other_prob
                    break
            metric += abs(gt_prob - pred_prob)
    return round(metric / len(range_of_batches), 4)

In [18]:
# test model and show results

preds_1, gt_1 = test(MODEL_NAME_MAIN, TEST_SET_NAME_1, False)
pred_tab_1, gt_tab_1 = display_final_tabs(preds_1, gt_1)
print("\nAverage probability error across frame batches:")
print(batch_frame_metric(pred_tab_1, gt_tab_1))

Done!
                                                                     Actual                                    Predictions
Frame set (of 10)                                                                                                         
0                                                                        []                  [(F_2, 0.222), (G#_4, 0.222)]
1                                             [(F_2, 0.429), (F#_2, 0.286)]                                 [(F_2, 0.258)]
2                                              [(D_2, 0.417), (B_2, 0.417)]                   [(B_2, 0.444), (D_2, 0.389)]
3                  [(E_2, 0.231), (E_4, 0.231), (F_3, 0.231), (A_2, 0.231)]                                             []
4                                                [(F#_2, 0.5), (F#_3, 0.5)]                   [(F#_2, 0.4), (F#_3, 0.333)]
5                                                              [(D_2, 1.0)]                                   [(D_2, 1.0)]
6         

In [19]:
preds_2, gt_2 = test(MODEL_NAME_MAIN, TEST_SET_NAME_2, False)
pred_tab_2, gt_tab_2 = display_final_tabs(preds_2, gt_2)
print("\nAverage probability error across frame batches:")
print(batch_frame_metric(pred_tab_2, gt_tab_2))

Done!
                                                                      Actual                                               Predictions
Frame set (of 10)                                                                                                                     
0                                   [(B_2, 0.5), (F_2, 0.273), (E_2, 0.227)]                   [(B_2, 0.44), (F_2, 0.32), (E_2, 0.24)]
1                                                             [(E_2, 0.909)]                              [(E_2, 0.769), (B_2, 0.231)]
2                                                             [(E_2, 0.357)]                              [(E_2, 0.294), (F_3, 0.235)]
3                                               [(E_2, 0.474), (B_2, 0.316)]                              [(E_2, 0.385), (B_2, 0.346)]
4                                                  [(G#_2, 0.6), (F_2, 0.4)]                                            [(G#_2, 0.35)]
5                      [(G_2, 0.25), (G#_3, 0.25)

In [20]:
preds_3, gt_3 = test(MODEL_NAME_MAIN, TEST_SET_NAME_3, False)
pred_tab_3, gt_tab_3 = display_final_tabs(preds_3, gt_3)
print("\nAverage probability error across frame batches:")
print(batch_frame_metric(pred_tab_3, gt_tab_3))

Done!
                                                                         Actual                   Predictions
Frame set (of 10)                                                                                            
0                                                                  [(E_2, 1.0)]                  [(B_2, 1.0)]
1                                                                  [(E_2, 1.0)]                  [(B_2, 1.0)]
2                                                                  [(E_2, 1.0)]                  [(B_2, 1.0)]
3                                                      [(D_2, 0.7), (E_2, 0.3)]                  [(B_2, 1.0)]
4                                                      [(F_2, 0.6), (D_2, 0.4)]                            []
5                                                                  [(F_2, 1.0)]                            []
6                                                                [(F_2, 0.385)]                            []
7   

In [49]:
import cv2
from moviepy.editor import VideoFileClip, VideoClip
from moviepy.video.tools.cuts import FramesMatches

def annotate_video(input_video_path, output_video_path, frame_batch_predictions):
    # clip = VideoFileClip(input_video_path)
    # new_frames = []
    # text_to_write = ""
    # cur_idx = 0
    # for i, frame in enumerate(clip.iter_frames(fps=clip.fps)):
    #     if i % 10 == 0:
    #         if cur_idx >= len(frame_batch_predictions):
    #             cur_idx = len(frame_batch_predictions) - 1
    #         text_to_write = str(frame_batch_predictions[cur_idx])
    #         cur_idx += 1
    #     font = cv2.FONT_HERSHEY_SIMPLEX
    #     position = (100, 100)
    #     font_scale = 1
    #     font_color = (0, 0, 0)  # White color
    #     thickness = 2
    # 
    #     cv2.putText(frame, text_to_write, position, font, font_scale, font_color, thickness, cv2.LINE_AA)
    #     new_frames.append(frame)
    #     
    # new_frames = np.array(new_frames)
    # 
    # output_clip = VideoClip(duration=clip.duration).set_audio(clip.audio)
    # # output_clip = output_clip.set_duration(len(new_frames) / clip.fps)
    # output_clip = output_clip.set_fps(clip.fps)
    # for i in range(int(output_clip.duration)):
    #     output_clip.set_frame(i, new_frames[i])
    # output_clip.write_videofile(output_video_path, codec="libx264", audio_codec="aac", temp_audiofile="temp-audio.m4a", remove_temp=True)
    # 
    
    
    cap = cv2.VideoCapture(input_video_path)

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    temp_file_name = output_video_path + "_tmp.mp4"
    final_file_name = output_video_path + ".mp4"
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(temp_file_name, fourcc, fps, (width, height))

    frame_counter = 0
    text_to_write = ""
    cur_idx = 0
    while True:
        ret, frame = cap.read()

        if not ret:
            break

        if frame_counter % 10 == 0:
            if cur_idx >= len(frame_batch_predictions):
                cur_idx = len(frame_batch_predictions) - 1
            text_to_write = str(frame_batch_predictions[cur_idx])
            cur_idx += 1

        font = cv2.FONT_HERSHEY_SIMPLEX
        position = (100, 100)
        font_scale = 1
        font_color = (0, 0, 0)
        thickness = 2

        cv2.putText(frame, text_to_write, position, font, font_scale, font_color, thickness, cv2.LINE_AA)

        frame_counter += 1

        out.write(frame)

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    
    video_clip_in = VideoFileClip(input_video_path)
    video_clip_out = VideoFileClip(temp_file_name)
    video_clip_out = video_clip_out.set_audio(video_clip_in.audio)
    video_clip_out.write_videofile(final_file_name)
    
    os.remove(temp_file_name)

In [50]:
annotate_video("./vids/mary.MOV", "./annotated_vids/out_mary", pred_tab_1)

Moviepy - Building video ./annotated_vids/out_mary.mp4.
MoviePy - Writing audio in out_maryTEMP_MPY_wvf_snd.mp3


                                                                   

MoviePy - Done.
Moviepy - Writing video ./annotated_vids/out_mary.mp4


                                                              

Moviepy - Done !
Moviepy - video ready ./annotated_vids/out_mary.mp4


In [None]:
# quick debug commands:

# vid_path, vid_name = "./vids/", "dumb_scale_youtube.mp4"
# # cur_video_labels, num_frames = generate_labels(vid_path, vid_name)
# reader = VideoReader(vid_path + vid_name, "video")
# frame = next(reader)["data"]
# print(frame.shape)

In [None]:
# a = np.load("training_data/set_1/labels.npy", allow_pickle=True)
# print(len(a))
# print(a[235])

In [None]:
# test dataset

# tmp_d = NoteDataset(TRAINING_SET_1_NAME)
# f, l = tmp_d[0]
# print(len(tmp_d))
# print(type(f))
# print(type(l))
# print(f.shape)
# print(l.shape)
# print(l)
# print(convert_from_onehot(l))

In [None]:
# test dataloader

# tmp_d = NoteDataset(TRAINING_SET_1_NAME)
# tmp_dl = DataLoader(tmp_d, batch_size=64, shuffle=True)
# b, (X, y) = next(enumerate(tmp_dl))
# print(b)
# print(X.shape)
# print(y.shape)

In [None]:
# processed_X = []
# processed_y = []
# 
# tmp_frames = []
# mn_H = math.inf
# mn_W = math.inf
# 
# # loop over each training video to assign a label to each frame and aggregate them all in one training array(s)
# for file in os.listdir("./training_data"):
#     print("processing file", file)
#     
#     # get frames of video and update the dimensions to resize to later
#     v_frames, _, _ = read_video("./training_data/%s" % file, output_format="TCHW")
#     mn_H = min(mn_H, v_frames.shape[2])
#     mn_W = min(mn_W, v_frames.shape[3])
#     for frame_num in range(v_frames.shape[0]):
#         tmp_frames.append(v_frames[frame_num])
#     
#     print("done w video read")
# 
#     
#     # use library to get labels for each frame
#     cur_video_labels, num_frames = generate_labels("./training_data/", file)
#     tmp_label_aggregator = [[] for i in range(num_frames)]
#     for frame, note in cur_video_labels:
#         tmp_label_aggregator[int(frame)].append(note)
#     processed_y.extend(tmp_label_aggregator)
#     
#     print("done w labeling")
# 
# # resize all frames to be the same size so the NN can handle them
# for frame in tmp_frames:
#     tnsr = transforms.Resize((mn_H, mn_W))(frame)
#     processed_X.append(tnsr.numpy())
# 
# processed_X = np.array(processed_X)
# processed_y = np.array(processed_y, dtype=object)
# 
# np.save("processed_frame_data", processed_X)
# np.save("processed_labels", processed_y)