# Lip Reader


In [66]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [67]:
!pip install -q mediapipe
!wget -O face_landmarker_v2_with_blendshapes.task -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task

In [68]:
import os
import cv2
import time
import random
import pickle
import warnings
import numpy as np
import mediapipe as mp
from base64 import b64encode
from IPython.display import HTML
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from google.colab.patches import cv2_imshow

import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import LabelEncoder

# Suppress specific deprecation warnings
warnings.filterwarnings("ignore", message="SymbolDatabase.GetPrototype() is deprecated")

In [85]:
def show_video(input_path):
    # Compressed video path
    compressed_path = input_path.split('.')[0].split('/')[-1]+'.mp4'
    print (compressed_path)

    os.system(f"ffmpeg -i {input_path} -vcodec libx264 {compressed_path}")

    # Show video
    mp4 = open(compressed_path,'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML("""
    <video width=400 controls>
          <source src="%s" type="video/mp4">
    </video>
    """ % data_url)

# input_path = '/content/drive/MyDrive/lipread_mp4/SOMETHING/test/SOMETHING_00050.mp4'
# show_video(input_path)

In [70]:
base_options = python.BaseOptions(model_asset_path='face_landmarker_v2_with_blendshapes.task')
options = vision.FaceLandmarkerOptions(base_options=base_options,
                                       output_face_blendshapes=True,
                                       output_facial_transformation_matrixes=True,
                                       num_faces=1)
detector = vision.FaceLandmarker.create_from_options(options)

def get_lip_landmark_mediapipe(img, visualize=True):
    """
    Using MediaPipe to extract lip landmark features.
    Number of points: 40
    :param img:
    :return:
    """
    # Convert the color space from BGR to RGB
    image_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_rgb)
    detection_result = detector.detect(image)

    if visualize:
        # Prepare to collect landmark data
        visualize_lip_points = []

    lip_points = []

    lip_connections = [
        (61, 146), (146, 91), (91, 181), (181, 84), (84, 17),
        (17, 314), (314, 405), (405, 321), (321, 375), (375, 291),
        (61, 185), (185, 40), (40, 39), (39, 37), (37, 0),
        (0, 267), (267, 269), (269, 270), (270, 409), (409, 291),
        (78, 95), (95, 88), (88, 178), (178, 87), (87, 14),
        (14, 317), (317, 402), (402, 318), (318, 324), (324, 308),
        (78, 191), (191, 80), (80, 81), (81, 82), (82, 13),
        (13, 312), (312, 311), (311, 310), (310, 415), (415, 308)
    ]
    unique_indices = set([idx for pair in lip_connections for idx in pair])  # Flatten and remove duplicates

    if len(detection_result.face_landmarks) != 0:
        for i, landmark in enumerate(detection_result.face_landmarks[0]):
            if i in unique_indices:
                lip_points.append((landmark.x, landmark.y))

                if visualize:
                    # Draw lip landmarks on the lip
                    visualize_x = landmark.x * img.shape[1]
                    visualize_y = landmark.y * img.shape[0]
                    cv2.circle(img, (int(visualize_x), int(visualize_y)), 2, (0, 255, 0), -1)

        # Min-max scale lip points
        min_x = min(lip_points, key=lambda p: p[0])[0]
        max_x = max(lip_points, key=lambda p: p[0])[0]
        min_y = min(lip_points, key=lambda p: p[1])[1]
        max_y = max(lip_points, key=lambda p: p[1])[1]
        scaled_lip_points = [
            (
                (x - min_x) / (max_x - min_x) if max_x != min_x else 0,
                (y - min_y) / (max_y - min_y) if max_y != min_y else 0,
            )
            for (x, y) in lip_points
        ]

        # Draw scaled lip points
        scale = 100
        for i, (scaled_x, scaled_y) in enumerate(scaled_lip_points):
            cv2.circle(img, (int(scaled_x * scale), int(scaled_y * scale)), 3, (255, 255, 0), -1)

        return lip_points, scaled_lip_points, img
    else:
        return None, None, None

In [71]:
def systematic_sampling(data, sample_count=29):
    n = len(data)
    k = n // sample_count
    start = np.random.randint(k)  # Random start within the first interval
    return [data[i] for i in range(start, n, k)[:sample_count]]

# Example usage
# data = list(range(100))  # An example list of 100 elements
# sampled_elements = systematic_sampling(data)
# print(sampled_elements)

In [72]:
def get_lip_points_video(video_path, average_sampling=False):
    # Open the video file
    cap = cv2.VideoCapture(video_path)

    # Check if the video opened successfully
    if not cap.isOpened():
        print("Error: Could not open video.")
        return
    lip_points_video = []
    img_list = []
    missing_frame = False
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Save individual sampled frames to the specified directory
        img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        _, lip_points_frame, img = get_lip_landmark_mediapipe(img, visualize=True)
        # _, cropped_img = detect_face(img)
        # lip_points_frame, img = get_lip_landmark_mediapipe(cropped_img, visualize=True)
        if lip_points_frame is None:
            missing_frame = True
            break
        else:
            img_list.append(img)
            lip_points_video.extend(lip_points_frame)
    # Release the video capture object
    cap.release()
    if not missing_frame:
        if average_sampling:
            img_list = systematic_sampling(img_list)
        return lip_points_video, img_list
    else:
        return None, None

In [73]:
def get_frame_number(file_path):
    # Open the video file
    cap = cv2.VideoCapture(file_path)

    if not cap.isOpened():
        print("Error: Could not open video.")
        return None

    frame_count = 0

    # Read frames one by one
    while True:
        ret, frame = cap.read()
        if not ret:  # If no frame is returned, then we have reached the end of the video
            break
        frame_count += 1

    # print(f"Total number of frames counted manually: {frame_count}")

    # Release the video capture object
    cap.release()

    return frame_count

In [74]:
label_classes = ['ABOUT', 'ANSWER', 'FAMILY', 'FRIDAY', 'MIDDLE', 'PRICE', 'RIGHT',
       'SEVEN', 'SOMETHING', 'THEIR']

# LSTM

In [75]:
def set_seed(seed_value=42):
    """Set seed for reproducibility."""
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)  # if using CUDA

In [76]:
class LipReadingLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, num_layers):
        super(LipReadingLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        output, (h_n, c_n) = self.lstm(x)
        out = self.fc(output[:, -1, :])
        return out

In [88]:
def predict_video(video_path):

    # Prepare data
    start = time.time()
    lip_points, _ = get_lip_points_video(video_path)
    end_frame = time.time()
    flat_coordinates = [coord for pair in lip_points for coord in pair]
    reshaped_coordinates = np.array(flat_coordinates).reshape(29, 80)
    feature_tensor = torch.tensor(reshaped_coordinates, dtype=torch.float32)
    feature_tensor = feature_tensor.unsqueeze(0)

    # Model inference
    lstm_model.eval()
    with torch.no_grad():
        logits = lstm_model(feature_tensor)
        probabilities = F.softmax(logits, dim=1)
        end_inference = time.time()
        print (f'Total Time: {end_inference - start:.2f}s, Preprocess: {end_frame-start:.2f}s, Inference: {end_inference-end_frame:.2f}s')
        predicted_index = probabilities.argmax(dim=1)
        prob = probabilities[0, predicted_index].item()
        predicted_label = label_classes[predicted_index]
        return predicted_label, prob

In [78]:
lstm_model = LipReadingLSTM(input_size=80, hidden_size=128, num_classes=len(label_classes), num_layers=1)
lstm_model.load_state_dict(torch.load('/content/drive/MyDrive/lipreader_models/lip_reading_model_lstm_fixed_len_128_1.pth', map_location=torch.device('cpu')))

<All keys matched successfully>

# Round 1

In [86]:
input_path_1 = '/content/drive/MyDrive/lipreader_test_videos/1.mp4'
show_video(input_path_1)

1.mp4


In [92]:
label, prob = predict_video(input_path_1)
print (f'label is {label}, prob is {prob:.2f}')

Total Time: 1.12s, Preprocess: 1.12s, Inference: 0.00s
label is FRIDAY, prob is 0.97


# Round 2

In [93]:
input_path_2 = '/content/drive/MyDrive/lipreader_test_videos/2.mp4'
show_video(input_path_2)

2.mp4


In [94]:
label, prob = predict_video(input_path_2)
print (f'label is {label}, prob is {prob:.2f}')

Total Time: 1.12s, Preprocess: 1.11s, Inference: 0.00s
label is SOMETHING, prob is 0.96


In [83]:
#@title Record Video
from IPython.display import display, Javascript,HTML
from google.colab.output import eval_js
from base64 import b64decode

def record_video(filename):
  js=Javascript("""
    async function recordVideo() {
      const options = { mimeType: "video/webm; codecs=vp9" };
      const div = document.createElement('div');
      const capture = document.createElement('button');
      const stopCapture = document.createElement("button");

      capture.textContent = "Start Recording";
      capture.style.background = "orange";
      capture.style.color = "white";

      stopCapture.textContent = "Stop Recording";
      stopCapture.style.background = "red";
      stopCapture.style.color = "white";
      div.appendChild(capture);

      const video = document.createElement('video');
      const recordingVid = document.createElement("video");
      video.style.display = 'block';

      const stream = await navigator.mediaDevices.getUserMedia({audio:true, video: true});

      let recorder = new MediaRecorder(stream, options);
      document.body.appendChild(div);
      div.appendChild(video);

      video.srcObject = stream;
      video.muted = true;

      await video.play();

      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      await new Promise((resolve) => {
        capture.onclick = resolve;
      });
      recorder.start();
      capture.replaceWith(stopCapture);

      await new Promise((resolve) => stopCapture.onclick = resolve);
      recorder.stop();
      let recData = await new Promise((resolve) => recorder.ondataavailable = resolve);
      let arrBuff = await recData.data.arrayBuffer();

      // stop the stream and remove the video element
      stream.getVideoTracks()[0].stop();
      div.remove();

      let binaryString = "";
      let bytes = new Uint8Array(arrBuff);
      bytes.forEach((byte) => {
        binaryString += String.fromCharCode(byte);
      })
    return btoa(binaryString);
    }
  """)
  try:
    display(js)
    data=eval_js('recordVideo({})')
    binary=b64decode(data)
    with open(filename,"wb") as video_file:
      video_file.write(binary)
    print(f"Finished recording video at:{filename}")
  except Exception as err:
    print(str(err))

from IPython.display import HTML
from base64 import b64encode


def predict_video(video_path):

    frame_count = get_frame_number(video_path)
    if frame_count > 29:
      average_sampling=True
    else:
      average_sampling=False
    # Prepare data
    start = time.time()
    lip_points, img_list = get_lip_points_video(video_path, average_sampling=average_sampling)
    for i, img in enumerate(img_list):
        img_name = video_path.split('.')[0] + '_' + str(i) + '.jpg'
        cv2.imwrite(img_name, img)
    end_frame = time.time()
    flat_coordinates = [coord for pair in lip_points for coord in pair]
    reshaped_coordinates = np.array(flat_coordinates).reshape(-1, 80)  # -1 allows for variable length
    sequence_length = len(reshaped_coordinates)
    feature_tensor = torch.tensor(reshaped_coordinates, dtype=torch.float32)
    feature_tensor = feature_tensor.unsqueeze(0)

    # Model inference
    lstm_model.eval()
    with torch.no_grad():
        # logits = lstm_model(feature_tensor)
        # probabilities = F.softmax(logits, dim=1)
        # end_inference = time.time()
        # print (f'Total Time: {end_inference - start:.2f}s, Preprocess: {end_frame-start:.2f}s, Inference: {end_inference-end_frame:.2f}s')
        # predicted_index = probabilities.argmax(dim=1)
        # prob = probabilities[0, predicted_index].item()
        # predicted_label = label_classes[predicted_index]
        # return predicted_label, prob


        logits = lstm_model(feature_tensor)
        probabilities = F.softmax(logits, dim=1)

        # Get the indices of the top 3 probabilities
        top3_prob, top3_indices = torch.topk(probabilities, 3, dim=1, largest=True, sorted=True)
        # Fetch labels and probabilities for the top 3 predictions
        top3_labels = [label_classes[idx] for idx in top3_indices[0]]
        top3_probabilities = top3_prob[0].tolist()

        # Return top 3 labels and their probabilities
        return list(zip(top3_labels, top3_probabilities))


def show_video(video_path, video_width = 600):

  video_file = open(video_path, "r+b").read()

  video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
  return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

video_path = "test.mp4"
record_video(video_path)

<IPython.core.display.Javascript object>

KeyboardInterrupt: 

In [None]:
show_video(video_path)

In [None]:
show_video(video_path)

lstm_model = LipReadingLSTM(input_size=80, hidden_size=128, num_classes=len(label_classes), num_layers=1)
lstm_model.load_state_dict(torch.load('/content/drive/MyDrive/lipreader_models/lip_reading_model_lstm_fixed_len_128_1.pth', map_location=torch.device('cpu')))
print (predict_video(video_path))