## Instruction:
1. Make sure your computer webcam works properly. 
2. Run all code cells below sequentially.
3. Run the camera runner cell at the end to open the camera and test

In [3]:
import torch
from torch import nn
from PIL import Image
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader, random_split
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

import pandas as pd
import numpy as np
import random
import time
from tqdm import tqdm

import cv2
import tkinter as tk
from tkinter import Label
from PIL import Image, ImageTk
import matplotlib as plt

Check the device (CPU or GPU) and set it to variable device.

In [4]:
# Check if CUDA is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"CUDA is available. Using GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("CUDA is not available. Using CPU")

CUDA is not available. Using CPU


Seed randomness for reproducibility

In [5]:
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# Models

## Custom model

This is the custom model code. If you would like to run the camera code with custom model, run this code and skip the next two code cells, which belong to transfer models. 

In [15]:
class FactorizedConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding, stride=1):
        super(FactorizedConv2d, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=(1, kernel_size), stride=stride, padding=(0, padding), bias=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=(kernel_size, 1), stride=stride, padding=(padding, 0), bias=True)
      
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = nn.ReLU(inplace=True)(x)
        x = self.conv2(x)
        x = self.bn2(x)
        return nn.ReLU(inplace=True)(x)
class InceptionResNetBlock(nn.Module):
    def __init__(self, in_channels, scale=1.0):
        super(InceptionResNetBlock, self).__init__()
        self.scale = scale

        self.branch0 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1, bias=True),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )

        self.branch1 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1, bias=True),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            FactorizedConv2d(32, 48, kernel_size=3, padding=1),
            FactorizedConv2d(48, 64, kernel_size=5, padding=2),
            nn.Conv2d(64, 32, kernel_size=1, bias=True)
        )

        self.branch2 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1, bias=True),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            FactorizedConv2d(32, 48, kernel_size=3, padding=1),
            FactorizedConv2d(48, 64, kernel_size=7, padding=3),
            nn.Conv2d(64, 32, kernel_size=1, bias=True)
        )

        self.branch3 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1, bias=True),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            FactorizedConv2d(32, 48, kernel_size=5, padding=2),
            FactorizedConv2d(48, 64, kernel_size=7, padding=3),
            nn.Conv2d(64, 32, kernel_size=1, bias=True)
            
        )

        self.conv = nn.Conv2d(128, in_channels, kernel_size=1, bias=True)
        self.bn = nn.BatchNorm2d(in_channels)

    def forward(self, x):
        branch0 = self.branch0(x)
        branch1 = self.branch1(x)
        branch2 = self.branch2(x)
        branch3 = self.branch3(x)

        mixed = torch.cat([branch0, branch1, branch2, branch3], dim=1)
        up = self.conv(mixed)
        up = self.bn(up)

        x = x + self.scale * up
        return nn.ReLU(inplace=True)(x)

class InceptionResNet(nn.Module):
    def __init__(self, num_classes=3):
        super(InceptionResNet, self).__init__()
        self.model = None
        self.stem = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=0, bias=True),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            # nn.Conv2d(32, 32, kernel_size=3, padding=0, bias=True),
            # nn.BatchNorm2d(32),
            # nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=3, padding=1, bias=True),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=0),
            nn.Conv2d(64, 80, kernel_size=1, padding=0, bias=True),
            nn.BatchNorm2d(80),
            nn.ReLU(inplace=True),
            nn.Conv2d(80, 192, kernel_size=3, padding=0, bias=True),
            nn.BatchNorm2d(192),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=0)
        )

        self.inception_resnet_a = nn.Sequential(
            InceptionResNetBlock(192, scale=0.2),
            InceptionResNetBlock(192, scale=0.2),
            # InceptionResNetBlock(192, scale=0.2),
            # InceptionResNetBlock(192, scale=0.2)
        )

        self.reduction_a = nn.Sequential(
            nn.Conv2d(192, 384, kernel_size=3, stride=2, padding=0, bias=True),
            nn.BatchNorm2d(384),
            nn.ReLU(inplace=True)
        )

        self.inception_resnet_b = nn.Sequential(
            InceptionResNetBlock(384, scale=0.2),
            InceptionResNetBlock(384, scale=0.2),
            # InceptionResNetBlock(384, scale=0.2),
            # InceptionResNetBlock(384, scale=0.2)
        )

        self.reduction_b = nn.Sequential(
            nn.Conv2d(384, 1024, kernel_size=3, stride=2, padding=0, bias=True),
            nn.BatchNorm2d(1024),
            nn.ReLU(inplace=True)
        )

        self.inception_resnet_c = nn.Sequential(
            InceptionResNetBlock(1024, scale=0.2),
            InceptionResNetBlock(1024, scale=0.2),
            # InceptionResNetBlock(1024, scale=0.2),
            # InceptionResNetBlock(1024, scale=0.2)
        )

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.stem(x)
        x = self.inception_resnet_a(x)
        x = self.reduction_a(x)
        x = self.inception_resnet_b(x)
        x = self.reduction_b(x)
        x = self.inception_resnet_c(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.dropout(x)
        x = self.fc(x)
        return x
    def get_model(self):
        # Define the model, loss function, and optimizer
        self.model = InceptionResNet()
        self.model = self.model.to(device)
        return self.model

## Efficient Net

This is a code for EfficientNetv2 transfer learning model. It downloads the pretrained model from torchvision for fine-tuining. If you would like to run the camera code with EfficientNet model, run this code cell and skip the next and above code cells.

In [17]:
class EfficientNetV2Regression(nn.Module):
    def __init__(self, num_outputs=3):
        super(EfficientNetV2Regression, self).__init__()
        self.model = models.efficientnet_v2_s(weights=models.EfficientNet_V2_S_Weights.IMAGENET1K_V1)
       
        # Replace the classifier layer to output `num_outputs` regression targets
        self.model.classifier[1] = nn.Linear(self.model.classifier[1].in_features, num_outputs)
    def forward(self, x):
        return self.model(x)
    def get_model(self, channels):
        # Instantiate the model
        # model = EfficientNetV2Regression(num_outputs=3)
        # model = model.to(device)
        return self.model(channels)

## Inception ResNet

This is a code for Inception-ResNet transfer learning model. It downloads the pretrained model with timm library for fine-tuining. If you would like to run the camera code with Inception-ResNet model, run this code cell and skip the above two code cells.

In [8]:
import timm
def incepResNet(out_channels):
    model = timm.create_model('inception_resnet_v2', pretrained=True)
    model = model.to(device)
    
    in_features = model.classif.in_features
    model.classif = nn.Linear(in_features, out_channels)
    model.classif = model.classif.to(device)
    
    # model.load_state_dict(torch.load(
    #     "../../HEAD_POSE/FINE_TUNE/TRANSFER/IncepResNetv2/ALL_DATA/head_pose_checkpoint_at_epoch_24_loss_1_79185119.pth", 
    #                                  map_location=device)['model_state_dict'])
    return model

The following code cell sets up the model to be used with the camera application. It loads the saved checkpoints to the models and return the model. Change the path to the checkpoint in your file system.

In [28]:
def get_model(out_channels = 3, type=""):
    model = None
    
    path_to_headpose_checkpoint = "path/to/head pose/model/checkpoint" # Change the path to your checkpoints path
    path_to_eyegaze_checkpoint = "path/to/eye gaze/model/checkpoint" # Change the path to your checkpoints path

    if type == "custom":
        model = InceptionResNet(out_channels)
        if out_channels == 3:
            model.load_state_dict(torch.load(path_to_headpose_checkpoint, 
                                             map_location=device)['model_state_dict'])
        else:
            model.load_state_dict(torch.load(path_to_eyegaze_checkpoint, 
                                             map_location=device)['model_state_dict'])
            
    elif type == "incep":
        model = incepResNet(out_channels)
        if out_channels == 3: 
            model.load_state_dict(torch.load(path_to_headpose_checkpoint, 
                                         map_location=device)['model_state_dict'])
        else:
            model.load_state_dict(torch.load(path_to_eyegaze_checkpoint, 
                                         map_location=device)['model_state_dict'])
            
    elif type == "effinet":
        model = EfficientNetV2Regression(out_channels)
        if out_channels == 3: 
            model.load_state_dict(torch.load(path_to_headpose_checkpoint, 
                                         map_location=device)['model_state_dict'])
        else:
            model.load_state_dict(torch.load(path_to_eyegaze_checkpoint, 
                                         map_location=device)['model_state_dict'])
            
    return model

# Camera

This is the camera main code. It holds the a class with all the functions to capture the frame, preprocess the input image, predict the head pose and eye gaze, etc.

In [29]:
import cv2
import tkinter as tk
from tkinter import Label
from PIL import Image, ImageTk
import torch
from torchvision import transforms
import torch.nn.functional as F

class CameraApp:
    def __init__(self, window, window_title, model, gaze_model, head_threshold, eye_threshold):
        self.window = window
        self.window.title(window_title)
        self.video_source = 0  # Default camera
        self.model = model
        # self.model.eval()  # Set model to evaluation mode
        self.gaze_model = gaze_model
        # self.gaze_model.eval()  # Set gaze model to evaluation mode

        self.head_threshold = head_threshold
        self.eye_threshold = eye_threshold

        # Load Haar Cascade for face and eye detection
        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')

        if self.face_cascade.empty() or self.eye_cascade.empty():
            raise IOError("Failed to load one or more cascade classifiers. Check the paths to the XML files.")

        # Open the video source
        self.vid = cv2.VideoCapture(self.video_source)
        if not self.vid.isOpened():
            raise ValueError("Unable to open video source", self.video_source)

        # Create a frame to hold the video feed and buttons
        self.video_frame = tk.Frame(window)
        self.video_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        # Create a canvas that can fit the video source size
        self.canvas = tk.Canvas(self.video_frame, width=self.vid.get(cv2.CAP_PROP_FRAME_WIDTH), height=self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.canvas.pack()

        # Button to snapshot
        self.btn_snapshot = tk.Button(self.video_frame, text="Capture", width=50, command=self.snapshot)
        self.btn_snapshot.pack(anchor=tk.CENTER, expand=True)

        # Button to close
        self.btn_close = tk.Button(self.video_frame, text="Close", width=50, command=self.on_closing)
        self.btn_close.pack(anchor=tk.CENTER, expand=True)


        # Create a frame to hold the snapshot and status message
        self.right_frame = tk.Frame(window)
        self.right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)

        # Snapshot display areas
        self.snapshot_area = tk.Label(self.right_frame)
        self.snapshot_area.pack(side=tk.TOP, expand=True)

        self.face_snapshot_area = tk.Label(self.right_frame)
        self.face_snapshot_area.pack(side=tk.TOP, expand=True)

        self.eye_snapshot_area = tk.Label(self.right_frame)
        self.eye_snapshot_area.pack(side=tk.TOP, expand=True)

        # Label to display status messages
        self.status_label = tk.Label(self.right_frame, text="Ready", font=("Helvetica", 12))
        self.status_label.pack(side=tk.BOTTOM, fill=tk.X)

        self.head_pose_threshold = tk.Label(self.right_frame)
        self.head_pose_threshold.pack(pady=5)
        self.head_pose_threshold.config(text=f"Head Pose Threshold: {self.head_threshold}")

        self.eye_gaze_threshold = tk.Label(self.right_frame)
        self.eye_gaze_threshold.pack(pady=5)
        self.eye_gaze_threshold.config(text=f"Eye-Gaze Threshold: {self.eye_threshold}")
        
        self.head_pose_pred = tk.Label(self.right_frame, text="Head Pose Angles Prediction: ")
        self.head_pose_pred.pack(pady=5)
        
        self.eye_gaze_pred = tk.Label(self.right_frame, text="Eye-gaze Predictions: ")
        self.eye_gaze_pred.pack(pady=5)

        

        # After it is called once, the update method will be automatically called every delay milliseconds
        self.delay = 15

        # Store the latest image to avoid garbage collection
        self.photo = None

        self.update()
        self.window.protocol("WM_DELETE_WINDOW", self.on_closing)
        self.window.mainloop()

    def snapshot(self):
        # Get a frame from the video source
        ret, frame = self.vid.read()

        if ret:
            # Flip the frame horizontally for mirror effect
            frame = cv2.flip(frame, 1)

            # Add a green border to the frame
            border_color_green = (0, 255, 0)  # Green
            border_color_red = (0, 0, 255)  # Red
            border_size = 10

            # Detect and crop the face region
            face_image = self.detect_and_crop_face(frame)
            if face_image is not None:
                # Convert the cropped face region to PIL format and display it
                face_img_name = "cropped_face-frame-" + str(int(self.vid.get(cv2.CAP_PROP_POS_FRAMES))) + ".jpg"
                cv2.imwrite(face_img_name, face_image)
                face_img = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
                face_img_pil = Image.fromarray(face_img)

                # Transform the image and make a prediction
                image_tensor = self.transform_image_for_pose(face_img_pil)
                prediction = self.predict_pose(image_tensor)
                prediction = np.round(prediction.numpy(), 2)
                self.head_pose_pred.config(text=f"Predicted Head Pose: (Yaw: {prediction[0][0]:.2f}, Roll: {prediction[0][1]:.2f}, Pitch: {prediction[0][2]:.2f})")
                print("Head Pose Prediction: ", prediction)
                head_pose_is_right = True

                frame_with_border = cv2.copyMakeBorder(frame, border_size, border_size, border_size, border_size, cv2.BORDER_CONSTANT, value=border_color_green)
                self.log("Photo Captured Successfully")
                if (prediction[0][0] < -1 * self.head_threshold or prediction[0][0] > self.head_threshold) or (prediction[0][1] < - 1 * self.head_threshold or prediction[0][1] > self.head_threshold) or (prediction[0][2] < -1 * self.head_threshold or prediction[0][2] > self.head_threshold):
                    frame_with_border = cv2.copyMakeBorder(frame, border_size, border_size, border_size, border_size, cv2.BORDER_CONSTANT, value=border_color_red)
                    self.log("Adjust Your Head Position!".title())
                    head_pose_is_right = False
                # Detect and crop the eye regions
                eye = self.detect_and_crop_eyes(face_image)

                if eye is not None and head_pose_is_right:
                    # Convert the cropped eye region to PIL format and display it
                    eye_img = cv2.cvtColor(eye, cv2.COLOR_BGR2RGB)
                    eye_img_pil = Image.fromarray(eye_img)
                    # eye_img_tk = ImageTk.PhotoImage(image=eye_img_pil)
                    # self.eye_snapshot_area.config(image=eye_img_tk)
                    # self.eye_snapshot_area.image = eye_img_tk

                    # Transform the image and make a gaze prediction
                    gaze_tensor = self.transform_image_for_gaze(eye_img_pil)
                    gaze_prediction = self.predict_gaze(gaze_tensor)

                    gaze_prediction = np.round(gaze_prediction.numpy(), 2)
                    self.eye_gaze_pred.config(text=f"Eye-gaze Prediction: (X: {gaze_prediction[0][0]:.2f}, Y: {gaze_prediction[0][1]:.2f})")
                
                    print(f'Eye Gaze Prediction: {gaze_prediction}')

                    if (gaze_prediction[0][0] < -1 * self.eye_threshold or gaze_prediction[0][0] > self.eye_threshold) or (gaze_prediction[0][1] < -1 * self.eye_threshold or gaze_prediction[0][1] > self.eye_threshold):
                        frame_with_border = cv2.copyMakeBorder(frame, border_size, border_size, border_size, border_size, cv2.BORDER_CONSTANT, value=border_color_red)
                        self.log("Look Straight at the Camera!".title())
                # else:
                #     self.status_label.config(text=f"Status: Eyes not detected!")
                        
                # Save the captured image
                img_name = "frame-" + str(int(self.vid.get(cv2.CAP_PROP_POS_FRAMES))) + ".jpg"
                cv2.imwrite(img_name, frame_with_border)

                # Convert the image to PIL format and display it on the side
                img = cv2.cvtColor(frame_with_border, cv2.COLOR_BGR2RGB)
                img_pil = Image.fromarray(img)
                img_tk = ImageTk.PhotoImage(image=img_pil)
                self.snapshot_area.config(image=img_tk)
                self.snapshot_area.image = img_tk

                # print(f'Prediction: {prediction}')
            else:
                print("Face not detected!")
                # Set a default image or handle the case where the face is not detected
                self.snapshot_area.config(image=None)
                self.snapshot_area.image = None

    def detect_and_crop_face(self, image):
        # Convert the image to grayscale
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        # Detect face in the image
        faces = self.face_cascade.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

        if len(faces) > 0:
            # Assume the first detected face is the main face
            x, y, w, h = faces[0]

            # Increase the face region in the y-dimension
            margin_y = int(h * 0.10)  # Add 20% margin to the height
            y1 = max(0, y - margin_y)  # Ensure y does not go below 0
            h1 = h + margin_y  # Increase the height

            # Crop the face region with extended margin
            face_region = image[y1:y + h1, x:x + w]
            return face_region
        else:
            return None

    def detect_and_crop_eyes(self, face_image):
        # Convert the face image to grayscale
        gray_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)

        # Detect eyes in the face region
        eyes = self.eye_cascade.detectMultiScale(gray_face, scaleFactor=1.1, minNeighbors=10, minSize=(30, 30))

        if len(eyes) == 2:
            # Sort detected eyes by x coordinate to ensure left-to-right order
            eyes = sorted(eyes, key=lambda x: x[0])

            # Get the coordinates for the combined bounding box
            x1 = min(eyes[0][0], eyes[1][0])
            y1 = min(eyes[0][1], eyes[1][1])
            x2 = max(eyes[0][0] + eyes[0][2], eyes[1][0] + eyes[1][2])
            y2 = max(eyes[0][1] + eyes[0][3], eyes[1][1] + eyes[1][3])

            # Add some margin to the bounding box
            margin_y = int((y2 - y1) * 0.20)  # 20% of the height of the bounding box

            x1 = max(0, x1)
            y1 = max(0, y1 + margin_y)
            x2 = min(face_image.shape[1], x2)
            y2 = min(face_image.shape[0], y2 - margin_y)

            # Crop the region containing both eyes with margin
            eye_region = face_image[y1:y2, x1:x2]
            return eye_region

        return None

    def transform_image_for_pose(self, image):
        # Transform the image to tensor and normalize it
        transform = transforms.Compose([
            transforms.Resize((299, 299)),
            transforms.ToTensor(),
            transforms.Normalize((0.4702, 0.3964, 0.3711), (0.2337, 0.2362, 0.2483))
        ])
        image = transform(image)
        image = image.unsqueeze(0)  # Add batch dimension
        return image

    def transform_image_for_gaze(self, image):
        # Transform the image to tensor and normalize it for gaze model
        transform = transforms.Compose([
            transforms.Resize((229, 229)),
            transforms.ToTensor(),
            transforms.Normalize((0.6965, 0.5065, 0.40670), (0.2380, 0.2134, 0.1928))
        ])
        image = transform(image)
        image = image.unsqueeze(0)  # Add batch dimension
        return image

    def predict_pose(self, image):
        # Make a prediction with the pose model
        self.model.eval()
        with torch.no_grad():
            output = self.model(image)
        # print(output)
        return output

    def predict_gaze(self, image):
        # Make a prediction with the gaze model
        self.gaze_model.eval()
        with torch.no_grad():
            output = self.gaze_model(image)
        # print(output)
        return output

    def update(self):
        # Get a frame from the video source
        ret, frame = self.vid.read()

        if ret:
            # Flip the frame horizontally for mirror effect
            frame = cv2.flip(frame, 1)

            # Convert the image to PIL format
            pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            self.photo = ImageTk.PhotoImage(image=pil_image)  # Store the photo as an instance variable
            self.canvas.create_image(0, 0, image=self.photo, anchor=tk.NW)

        self.window.after(self.delay, self.update)

    def on_closing(self):
        # Release the video source when the object is destroyed
        if self.vid.isOpened():
            self.vid.release()
        self.window.destroy()

    def log(self, message):
        """Update the text of the status label."""
        self.status_label.config(text=f"Status: {message}")

# Run the camera

This cell is the camera app runner. It creates objects for head pose, eye gaze models, and the camera class and sends the two models as parameters to the camera object. When you create the chosen model object from the three models, correctly set the "type" parameter: `custom` for the custom model, `effinet` for the EfficientNetv2 model, and `incep` for the inception-resnet model.

Set the required threshold for the optimal range of both head pose and eye gaze estimation. For example, if the threshold value for head pose is 10, it means that the predicted yaw, roll, and pitch values of head pose are considered optimal when they fall between -10 and 10.

In [30]:
# Load your trained models (replace with your model's loading code)
pose_model = get_model(out_channels=3, type="effinet")
# pose_model.load_state_dict(torch.load("pose_model.pth", map_location=torch.device('cpu')))

gaze_model = get_model(out_channels=2, type="effinet")
# gaze_model.load_state_dict(torch.load("gaze_model.pth", map_location=torch.device('cpu')))

head_threshold = 10.0
eye_threshold = 8.0
# Create a window and pass it to the Application object
root = tk.Tk()
app = CameraApp(root, "MSC Project Experimental camera", pose_model, gaze_model, head_threshold, eye_threshold)


Head Pose Prediction:  [[ 0.5   1.82 22.4 ]]
Head Pose Prediction:  [[-1.85 -3.76 12.08]]
Head Pose Prediction:  [[ 0.21  6.45 13.42]]
Head Pose Prediction:  [[-1.69 10.19  3.8 ]]
