In [1]:
import cv2
import numpy as np
import torch
from torchvision import transforms
from torch import nn
import time

VIDEOS_LOCATION = "C:\\Users\\trett\Documents\GitHub\ThirdYearProject\pytorch\\videos\\"

transform = transforms.ToTensor()

width = 640
height = 360

In [2]:
VIDEOS_LOCATION = "C:\\Users\\trett\Documents\GitHub\ThirdYearProject\pytorch\\videos\\"
VIDEOS_TRAIN = [
    {
        "file": "soldiers.mp4",
        "fps": None
    },
    {
        "file": 'running.mp4',
        "fps": None
    }
]

VIDEOS_TEST = [
    {
        "file": 'nato.mp4',
        "fps": None
    }
]

VIDEOS_SKI_TRAIN = [
    {
        "file": 'drone_shot.mp4',
        "fps": None
    },
    {
        "file": "crevace2.mp4",
        "fps": None
    }
]

VIDEOS_SKI_TEST = [
    {
        "file": "powder_maybe2.mp4",
        "fps": None
    }
]

class VideoDataLoader:
    video_index = 0
    videos = None
    cap = None
    frame_batch_buffer = []
    first_frame = None
    def __init__(self, videos):
        self.videos = videos
        self.cap = cv2.VideoCapture(VIDEOS_LOCATION + self.videos[self.video_index]["file"])
        self.videos[self.video_index]["fps"] = self.cap.get(cv2.CAP_PROP_FPS)
        if not self.cap.isOpened():
            print("Error: could not open video file")
        ret, first_frame = self.cap.read()
        for i in range(10):
            ret, middle_frame = self.cap.read()
            if not ret:
                break
            ret, last_frame = self.cap.read()
            if not ret:
                break
            batch_of_frames = [transform(first_frame), transform(middle_frame), transform(last_frame)]
            self.frame_batch_buffer.append(batch_of_frames)
            first_frame = last_frame
        self.first_frame = first_frame

    def hasNext(self):
        return len(self.frame_batch_buffer) != 0

    def nextFile(self):
        self.video_index += 1
        self.cap.release()
        try:
            self.cap = cv2.VideoCapture(VIDEOS_LOCATION + self.videos[self.video_index]["file"])
        except:
            batch = self.frame_batch_buffer.pop(0)
            self.frame_batch_buffer = []
            return batch

        self.videos[self.video_index]["fps"] = self.cap.get(cv2.CAP_PROP_FPS)
        if not self.cap.isOpened():
            print("Error: could not open video file")
        ret, first_frame = self.cap.read()
        for i in range(10):
            ret, middle_frame = self.cap.read()
            if not ret:
                break
            ret, last_frame = self.cap.read()
            if not ret:
                break
            batch_of_frames = [transform(first_frame), transform(middle_frame), transform(last_frame)]
            self.frame_batch_buffer.append(batch_of_frames)
            first_frame = last_frame
        self.first_frame = first_frame
        return self.frame_batch_buffer.pop(0)

    def getNext(self):
        if len(self.frame_batch_buffer) < 5:
            return self.nextFile()
        ret, middle_frame = self.cap.read()
        if not ret:
            return self.nextFile()
        ret, last_frame = self.cap.read()
        if not ret:
            return self.nextFile()

        batch_of_frames = [transform(self.first_frame), transform(middle_frame), transform(last_frame)]
        # batch_of_frames = [self.first_frame, middle_frame, last_frame]
        self.frame_batch_buffer.append(batch_of_frames)
        self.first_frame = last_frame
        # cv2.imshow('batch_of_frames', middle_frame)
        # cv2.imshow('batch_of_frames', last_frame)
        # if cv2.waitKey(25) & 0xFF == ord('q'):
        #     cv2.destroyAllWindows()
        #     self.frame_batch_buffer = []
        return self.frame_batch_buffer.pop(0)


In [3]:
class Autoencoder(nn.Module):

    def __init__(self):
        super().__init__()

        self.lefthand_encoder = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(in_channels=64, out_channels=16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True)
        )

        self.righthand_encoder = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(in_channels=64, out_channels=16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True)
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(in_channels=32, out_channels=16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(in_channels=16, out_channels=64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(in_channels=64, out_channels=3, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(True)
        )


    def forward(self, left, right):
        left = self.lefthand_encoder(left)
        right = self.lefthand_encoder(right)
        encoded = torch.cat((left, right), 0)
        x = self.decoder(encoded)
        return x


In [4]:
loss_fn = torch.nn.MSELoss()
lr= 0.0001
# torch.manual_seed(0)
autoencoder = Autoencoder()
params_to_optimize = [
    {'params': autoencoder.parameters()}
]

optim = torch.optim.Adam(params_to_optimize, lr=lr, weight_decay=1e-08)

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

autoencoder = autoencoder.to(device)

Selected device: cuda


In [5]:
autoencoder.train()
epochs = 5
train_loss_epochs = []
for i in range(epochs):
    train_loss = []
    start = time.time()
    videoDataLoader = VideoDataLoader(VIDEOS_TRAIN)
    while videoDataLoader.hasNext():
        batch = videoDataLoader.getNext()
        first_frame = batch[0].to(device)
        middle_frame = batch[1].to(device)
        last_frame = batch[2].to(device)

        res_frame = autoencoder(first_frame, last_frame)
        # image_np = torchvision.transforms.functional.invert(res_frame.cpu().data).numpy()

        # Evaluate loss
        loss = loss_fn(res_frame, middle_frame)
        # Backward pass
        optim.zero_grad()
        loss.backward()
        optim.step()
        # Print batch loss
        print('\t %d partial train loss (single batch): %f' % (i, loss.data))
        train_loss.append(loss.detach().cpu().numpy())
    train_loss_epochs.append(train_loss)

	 0 partial train loss (single batch): 0.238477
	 0 partial train loss (single batch): 0.237833
	 0 partial train loss (single batch): 0.237531
	 0 partial train loss (single batch): 0.237068
	 0 partial train loss (single batch): 0.236734
	 0 partial train loss (single batch): 0.236251
	 0 partial train loss (single batch): 0.235790
	 0 partial train loss (single batch): 0.235126
	 0 partial train loss (single batch): 0.234466
	 0 partial train loss (single batch): 0.233775
	 0 partial train loss (single batch): 0.233368
	 0 partial train loss (single batch): 0.232657
	 0 partial train loss (single batch): 0.232327
	 0 partial train loss (single batch): 0.231742
	 0 partial train loss (single batch): 0.231298
	 0 partial train loss (single batch): 0.230579
	 0 partial train loss (single batch): 0.230434
	 0 partial train loss (single batch): 0.230080
	 0 partial train loss (single batch): 0.229541
	 0 partial train loss (single batch): 0.228697
	 0 partial train loss (single batch): 0

In [22]:
videoDataLoader = VideoDataLoader(VIDEOS_SKI_TEST)
fps = videoDataLoader.videos[0]["fps"]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# out = cv2.VideoWriter('24fpsFromModel.mp4', fourcc, fps, (width, height), isColor=True)
autoencoder.eval()
i = 0
with torch.no_grad():
    test_loss = []
    while videoDataLoader.hasNext():
        batch = videoDataLoader.getNext()
        first_f = np.transpose(batch[0].numpy(), (1, 2, 0))
        original_f = np.transpose(batch[1].numpy(), (1, 2, 0))

        first_frame = batch[0].to(device)
        middle_frame = batch[1].to(device)
        last_frame = batch[2].to(device)

        res_frame = autoencoder(first_frame, last_frame)
        # image_np = torchvision.transforms.functional.invert(res_frame.cpu().data).numpy()
        middle_f = np.transpose(res_frame.cpu().data.numpy(), (1, 2, 0))
        cv2.imshow('generated frame', cv2.hconcat([first_f, first_f]))
        cv2.imshow('generated frame', cv2.hconcat([first_f, middle_f]))
        # out.write(cv2.hconcat([first_f, first_f]))
        # out.write(cv2.hconcat([first_f, middle_f]))
        # out.write(first_f)
        # out.write(middle_f)
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break

        loss = loss_fn(res_frame, middle_frame)
        print('\t %d partial test loss (single batch): %f' % (i, loss.data))
        i = i + 1
        test_loss.append(loss.detach().cpu().numpy())
    cv2.destroyAllWindows()
# out.release()

	 0 partial test loss (single batch): 0.015459
	 1 partial test loss (single batch): 0.015469
	 2 partial test loss (single batch): 0.015116
	 3 partial test loss (single batch): 0.014673
	 4 partial test loss (single batch): 0.014595
	 5 partial test loss (single batch): 0.014959
	 6 partial test loss (single batch): 0.017008
	 7 partial test loss (single batch): 0.020824
	 8 partial test loss (single batch): 0.021248
	 9 partial test loss (single batch): 0.020254
	 10 partial test loss (single batch): 0.017925
	 11 partial test loss (single batch): 0.015959
	 12 partial test loss (single batch): 0.014515
	 13 partial test loss (single batch): 0.014320
	 14 partial test loss (single batch): 0.013801
	 15 partial test loss (single batch): 0.013829
	 16 partial test loss (single batch): 0.014609
	 17 partial test loss (single batch): 0.015271
	 18 partial test loss (single batch): 0.015427
	 19 partial test loss (single batch): 0.015177
	 20 partial test loss (single batch): 0.014783
	 