**Fine-Tuned ResNet Video Stream**

This model applies our face detection model to a live video stream. Specifically, it uses MTCNN to detect faces in every frame of the video stream, processes them with our model, and then prints the results onto the screen.

In [None]:
!pip install facenet_pytorch
!pip install mtcnn



In [None]:
from IPython.display import display, Javascript, Image, HTML, JSON
from google.colab import output
from google.colab.output import eval_js
from base64 import b64decode, b64encode
from mtcnn import MTCNN
from torchvision import transforms
from PIL import Image
from io import BytesIO
from functools import wraps
import sys
import io
import torch
import matplotlib.pyplot as plt
import cv2
import numpy as np
import PIL
import io
import html
import time
import dlib
from scipy.spatial import Delaunay
import warnings

# Suppress warnings
warnings.filterwarnings("ignore")

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
# Function to convert the JavaScript object into an OpenCV image

def js_to_image(js_reply):
  """
  Params: JavaScript object of image from webcam
  Returns: img: OpenCV BGR image
  """
  image_bytes = b64decode(js_reply.split(',')[1])
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  img = cv2.imdecode(jpg_as_np, flags=1)

  return img

# Function to convert OpenCV bounding box image into a base64 byte string
# this is used to overlay the bounding box over the video stream

def bbox_to_bytes(bbox_array):
  """
  Params: Numpy array to overlay on video stream.
  Returns: Base64 image byte string
  """
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  iobuf = io.BytesIO()
  bbox_PIL.save(iobuf, format='png')
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))

  return bbox_bytes

In [None]:
# Register the Python function to stop the video stream

def stop_video_stream():
    # Add the logic to stop the video stream here
    # For demonstration purposes, we'll just print a message
    print("Stopping video stream")

output.register_callback('stop_video_stream', stop_video_stream)

In [None]:
# JavaScript to properly create our live video stream using a webcam as the input

def video_stream():
    js = Javascript('''
        let video;
        let videoStream;
        let captureCanvas;

        let mainDiv = null;
        let imgElement;
        let labelElement;

        let pendingResolve = null;
        let shutdown = false;

        function stopVideoStream() {
            shutdown = true;
            removeDom();
        }

        function removeDom() {
            videoStream.getVideoTracks()[0].stop();
            video.remove();
            video = null;
            videoStream = null;
            captureCanvas = null;

            mainDiv.remove();
            mainDiv = null;
            imgElement = null;
            labelElement = null;
        }

        function onAnimationFrame() {
            if (!shutdown) {
                window.requestAnimationFrame(onAnimationFrame);
            }
            if (pendingResolve) {
                var result = "";
                if (!shutdown) {
                    captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
                    result = captureCanvas.toDataURL('image/jpeg', 0.8)
                }
                var lp = pendingResolve;
                pendingResolve = null;
                lp(result);
            }
        }

        async function createDom() {
            if (mainDiv !== null) {
                return videoStream;
            }

            mainDiv = document.createElement('div');
            mainDiv.style.padding = '4px';
            mainDiv.style.border = '2px solid white';
            mainDiv.style.maxWidth = '800px';
            mainDiv.style.width = '792px';
            document.body.appendChild(mainDiv);

            const modelOut = document.createElement('div');
            modelOut.innerHTML = "<span>Status:</span>";
            labelElement = document.createElement('span');
            labelElement.innerText = 'Data is empty';
            labelElement.style.fontWeight = 'bold';
            modelOut.appendChild(labelElement);
            mainDiv.appendChild(modelOut);

            video = document.createElement('video');
            video.style.display = 'block';
            video.width = window.innerWidth * 0.6;
            video.setAttribute('playsinline', '');
            video.onclick = () => { shutdown = true; };
            videoStream = await navigator.mediaDevices.getUserMedia(
                { video: { facingMode: "environment" } });
            mainDiv.appendChild(video);

            imgElement = document.createElement('img');
            imgElement.style.position = 'absolute';
            imgElement.style.zIndex = 1;
            imgElement.onclick = () => { shutdown = true; };
            mainDiv.appendChild(imgElement);

            const instruction = document.createElement('div');
            instruction.innerHTML =
                '<span style="color: white; font-weight: bold;">' +
                'Click here or on the video to stop live stream</span>';
            mainDiv.appendChild(instruction);

            // Add stop button
            const stopButton = document.createElement('button');
            stopButton.innerText = 'Stop Video Stream';
            stopButton.onclick = () => { shutdown = true; };
            mainDiv.appendChild(stopButton);

            instruction.onclick = () => { shutdown = true; };

            video.srcObject = videoStream;
            await video.play();

            captureCanvas = document.createElement('canvas');
            captureCanvas.width = 640;
            captureCanvas.height = 480;
            window.requestAnimationFrame(onAnimationFrame);

            return videoStream;
        }

        async function stream_frame(label, imgData) {
            if (shutdown) {
                removeDom();
                shutdown = false;
                return '';
            }

            var preCreate = Date.now();
            videoStream = await createDom();

            var preShow = Date.now();
            if (label != "") {
                labelElement.innerHTML = label;
            }

            if (imgData != "") {
                var videoRect = video.getClientRects()[0];
                imgElement.style.top = videoRect.top + "px";
                imgElement.style.left = videoRect.left + "px";
                imgElement.style.width = videoRect.width + "px";
                imgElement.style.height = videoRect.height + "px";
                imgElement.src = imgData;
            }

            var preCapture = Date.now();
            var result = await new Promise(function (resolve, reject) {
                pendingResolve = resolve;
            });
            shutdown = false;

            return {
                'create': preShow - preCreate,
                'show': preCapture - preShow,
                'capture': Date.now() - preCapture,
                'img': result
            };
        }
    ''')

    display(js)

def video_frame(label, bbox):
    data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
    if 'error' in data:
        # Handle the error, e.g., video stream has been stopped
        print(data['error'])
        return None
    return data


<h1><strong> Video Stream Capture </strong></h1>

<h4><strong> Face detection model </strong></h4>
There are two options for face detection from video stream: <br />
1) Haar cascade classifier <br />
2) MTCNN <br />
<br />
Haar cascade classifier is significantly quicker and is able to track your face more frequently, however it is fairly inaccurate. For example, it does not detect faces which are slightly tilted. MTCNN is far slower (roughly 1.5 frames per second) than Haar but is more accurate - it allows for tilted faces. <br />
<br />
To use the Haar cascade classifier, set useHaar to true. To use MTCNN, set useHaar to false.

In [None]:
useHaar = True

<strong> Loading the fine-tuned model </strong>

In [None]:
"""
EDIT HERE: Change model_path to point to your face_recog_2.h5 path
"""
model_path = '/content/drive/My Drive/581 Final Project/Code Workspace/face_recog_2.h5'
model = torch.load(model_path)
model.eval()

NameError: ignored

<strong> MTCNN Model </strong>

In [None]:
"""
MTCNN produces output which moves our video stream
To prevent this, we use a wrapper function to capture output
"""
def capture_output(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        old_stdout = sys.stdout
        new_stdout = io.StringIO()
        sys.stdout = new_stdout
        try:
            return func(*args, **kwargs)
        finally:
            sys.stdout = old_stdout

    return wrapper

In [None]:
# Return the faces that are confidently faces
def faceDetection(frame):
    detector = MTCNN()
    intermediate_func = capture_output(detector.detect_faces)
    boxes = intermediate_func(frame)
    if boxes:
        confident_boxes = [single_box['box'] for single_box in boxes if single_box['confidence'] > 0.9]
        return confident_boxes
    else:
        return []

<strong> Haar Cascade Classifer Model </strong>

In [None]:
# Initialize the Haar Cascade face detection model
face_cascade = cv2.CascadeClassifier(cv2.samples.findFile(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'))

<strong> Video Stream </strong>

In [None]:
label_html = 'Capturing...'
video_stream()

# Initialize bounding box to empty
bbox = ''

captured_img = None

transform = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor(),
])

while True:
    predictions = []
    js_reply = video_frame(label_html, bbox)
    if not js_reply:
        break
    if js_reply == None:
        print("js_reply is none")
        break

    faces = []
    bbox_array = np.zeros([480, 640, 4], dtype=np.uint8)

    if useHaar:
      # If using Haar, detect with face_cascade
      img = js_to_image(js_reply["img"])
      gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
      faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(50, 50))
    else:
      # If using MTCNN, detect with faceDetection MTCNN
      image_bytes = b64decode(js_reply['img'].split(',')[1])
      img = Image.open(BytesIO(image_bytes))
      frame = np.array(img.convert('RGB'), 'f')
      faces = faceDetection(frame)

    # If we've detected a face, display it
    if len(faces) > 0:
      for (x, y, w, h) in faces:
          if useHaar:
            # If using Haar, crop from the numpy image
            cropped_face = Image.fromarray(img[y:y + h, x:x + w])
          else:
            # If using MTCNN, crop from PIL
            cropped_face = img.crop([x, y, x + w, y + h])

          # Transform before predicting
          input_tensor = transform(cropped_face)
          input_tensor = input_tensor.unsqueeze(0)
          input_tensor = input_tensor.to("cpu")
          with torch.no_grad():
            output = model(input_tensor)
            leo, not_leo = torch.sigmoid(output[0])

          # If the face is more likely to be leo, display leo
          if leo > not_leo:
            display_string = f'Leonardo DiCaprio ({leo * 100:.2f}%)'
          else:
            display_string = f'NOT Leonardo DiCaprio ({not_leo * 100:.2f}%)'

          bbox_array = cv2.rectangle(bbox_array, (x, y), (x + w, y + h), (255, 255, 255), 2)
          cv2.putText(bbox_array, display_string, (x, y - 10), cv2.FONT_HERSHEY_TRIPLEX, 0.6, (255, 255, 255), 1)


    bbox_array[:, :, 3] = (bbox_array.max(axis=2) > 0).astype(int) * 255

    bbox_bytes = bbox_to_bytes(bbox_array)
    bbox = bbox_bytes

    captured_img = js_to_image(js_reply["img"])

<IPython.core.display.Javascript object>

KeyboardInterrupt: ignored