# Imports and setup

In [2]:
# Import modules

import os
import time
import cv2
import numpy as np
import subprocess as sp

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T

from PIL import Image
from matplotlib import pyplot as plt

In [3]:
# Set device
device = torch.device("cpu")

# def set_device():
#     if torch.cuda.is_available():
#         dev = "cuda:0"
#     else:
#         dev = "cpu"
#     return torch.device(dev)

# device = set_device()
print(device)

cpu


In [4]:
# Define parameters
model_path = './model/best_model.pth'
video_path = "./test2.mp4"
NUM_CLASSES = 2
WIDTH = 1920
HEIGHT = 1080
classes = ['open', 'close']

# Load the model

In [5]:
class Net(nn.Module):
    
    def __init__(self):
        super().__init__() 
        self.conv1 = nn.Conv2d(3, 32, 5) 
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.conv3 = nn.Conv2d(64, 128, 3)
        
        x = torch.randn(3,224,224).view(-1,3,224,224)
        self._to_linear = None
        self.convs(x)

        self.fc1 = nn.Linear(self._to_linear, 512)
        self.fc2 = nn.Linear(512, NUM_CLASSES)

    def convs(self, x):
        # max pooling over 2x2
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv2(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv3(x)), (2, 2))
        
        if self._to_linear is None:
            self._to_linear = x[0].shape[0]*x[0].shape[1]*x[0].shape[2]
        return x

    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, self._to_linear) 
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        if NUM_CLASSES == 2:
            return F.sigmoid(x)
        else:
            return F.softmax(x, dim=1)

In [6]:
model = Net()
state_dict = torch.load(model_path)
model.load_state_dict(state_dict)
model.to(device)
model = model.eval()

# Prediction

In [7]:
# Utility to apply transforms
def get_transform():
    resize = T.Resize((224,224))
    mean = (127.5)
    std = (127.5)
    normalize = T.Normalize(mean=mean, std=std)
    return T.Compose([resize, normalize])

# Print function (for testing)
def print_text(
    img,
    text: str,
    org=(100, 100),
    fontFace=cv2.FONT_HERSHEY_SIMPLEX,
    fontScale=1.5,
    color=(0, 255, 0),
    thickness=2,
):
    cv2.putText(
        img,
        text,
        org=org,
        fontFace=fontFace,
        fontScale=fontScale,
        color=color,
        thickness=thickness,
    )

In [8]:
# Prediction function
def classify(model, image_transforms, img, classes):
    img = torch.from_numpy(img)
    img = img.permute(2, 0, 1)    
    img = img.unsqueeze(0)
    img = img.float()
    img = image_transforms(img)

    output = model(img)
    _, prediction = torch.max(output.data, 1)
    predicted_class = classes[prediction.item()]
            
    return predicted_class

In [9]:
workdir = os.getcwd()
command = ['ffmpeg', 
           '-i', f'{video_path}', 
           '-f', 'image2pipe',
           '-pix_fmt', 'rgb24',
           '-vcodec', 'rawvideo', '-']

process = sp.Popen(command, stderr=sp.PIPE, stdout=sp.PIPE)

while True:
    
    ### INPUT ###
    arr = np.frombuffer(process.stdout.read(WIDTH*HEIGHT*3), dtype=np.uint8)
    
    if len(arr) == 0:
        break
        
    frame = arr.reshape((HEIGHT,WIDTH,3))    
    img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    ### PREDICTION ###
    
    # Crop the image
    img_right = img[300:900, 1450:1700]
    img_left = cv2.flip(img, 1)
    img_left = img_left[300:900, 1450:1700] 
    result_right = classify(model, get_transform(), img_right, classes)
    result_left = classify(model, get_transform(), img_left, classes)
    
    print_text(img, str(result_right), org=(1600,400))
    print_text(img, str(result_left), org=(300,400))
    
    cv2.imshow("Image", img)
    k = cv2.waitKey(0)
    if k == 27:  # wait for ESC key to exit
        break
        
    process.stdout.flush()
    
    
cv2.destroyAllWindows()
process.terminate()

