# Importing dependencies

In [None]:
import pandas as pd
import numpy as np
import os
import sys
from matplotlib import pyplot as plt
# import dlib to deal with face recognition and lip detection for images
import dlib #(pip install dlib or create a virtual conda environement to install dlib)
import cv2 #(pip install opencv-python)
import time
import shutil
import imageio.v2 as imageio
from imutils import face_utils
import imutils
import torch
import torchvision.transforms as transform
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.model_selection import StratifiedShuffleSplit
import random
from skimage.transform import resize
from sklearn.utils import shuffle
import torch.nn.functional as F
import torch.nn as nn

# Face Detection

![Image of Keys to Words](miscellaneous/Key_Words_Data.png)

In [None]:
# name of all the people speaking in the dataset
people = ['F01','F02','F04','F05','F06','F07','F08','F09', 'F10','F11','M01','M02','M04','M07','M08']
# the data types we're dealing with 
data_types = ['words']
folder_enum = ['01','02','03','04','05','06','07','08', '09', '10']
instances = ['01','02','03','04','05','06','07','08', '09', '10']

In [None]:
# Create a dictionary that connect the words one is saying to an integer
words = ['Begin', 'Choose', 'Connection', 'Navigation', 'Next', 'Previous', 'Start', 'Stop', 'Hello', 'Web']          
words_di = {i:words[i] for i in range(len(words))}

# Extraction of lip image from a body image

You must first download dataset from: |![Link to dataset](https://www.kaggle.com/datasets/apoorvwatsky/miraclvc1)

In [None]:
# get the body image from the image path, detect the face, and extract the lip image from the face
def crop_and_save_image(img_path, write_img_path, img_name):
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor('dlib_shape_predictor_model/shape_predictor_68_face_landmarks.dat')
    # load the input image, resize it, and convert it to grayscale

    # read image and tranform it into grayscale
    image = cv2.imread(img_path)
    image = imutils.resize(image, width=500)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # detect faces in the grayscale image
    rects = detector(gray, 1)
    if len(rects) > 1:
    	print( "ERROR: more than one face detected")
    	return
    if len(rects) < 1:
    	print( "ERROR: no faces detected")
    	return

    for (i, rect) in enumerate(rects):
        shape = predictor(gray, rect)
        shape = face_utils.shape_to_np(shape)
        name, i, j = 'mouth', 48, 68


        (x, y, w, h) = cv2.boundingRect(np.array([shape[i:j]]))        
        roi = gray[y:y+h, x:x+w]
        roi = imutils.resize(roi, width = 250, inter=cv2.INTER_CUBIC) 
        cv2.imwrite('lip_cropped/' + write_img_path, roi)

In [None]:
save_folder = 'lip_cropped'
os.mkdir(save_folder)
i = 1
for person_ID in people:
    start_time = time.time()
    if not os.path.exists(f'{save_folder}/{person_ID}' ):
        os.mkdir(f'{save_folder}/{person_ID}/')

    for data_type in data_types:
        if not os.path.exists(f'{save_folder}/{person_ID}/{data_type}'):
            os.mkdir(f'{save_folder}/{person_ID}/{data_type}')

        for phrase_ID in folder_enum:
            if not os.path.exists(f'{save_folder}/{person_ID}/{data_type}/{phrase_ID}'):

                os.mkdir(f'{save_folder}/{person_ID}/{data_type}/{phrase_ID}')

            for instance_ID in instances:
                directory = 'dataset' + "/" + person_ID + "/" + data_type + "/" + phrase_ID + "/" + instance_ID + "/"
                dir_temp = person_ID + "/" + data_type + "/" + phrase_ID + "/" + instance_ID + "/"
                filelist = os.listdir(directory)
                if not os.path.exists(f'{save_folder}/{person_ID}/{data_type}/{phrase_ID}/{instance_ID}'):
                    os.mkdir(f'{save_folder}/{person_ID}/{data_type}/{phrase_ID}/{instance_ID}')

                    for img_name in filelist:
                        if img_name.startswith('color'):
                            crop_and_save_image(directory + '' + img_name,
                                                dir_temp + '' + img_name, img_name)
    end_time = time.time()                     
    print(f'Iteration : {i}. Time taken: {end_time - start_time}')
    i += 1                   

#### Cropped lip images of a person saying a word

In [None]:
for images in os.listdir('lip_cropped/F01/words/01/01'):
    plt.figure()
    plt.imshow(cv2.imread(f'lip_cropped/F01/words/01/01/{images}'))

# Feature extraction and normalization

In [None]:
max_seq_length = 22
X_train = []
y_train = []
X_val = []
y_val = []
X_test = []
y_test = []
MAX_WIDTH = 100
MAX_HEIGHT = 100

In [None]:
# randomly split between train set and test set
test_ds = random.sample(people, 2)
train_ds = people.copy()
for test in test_ds:
    train_ds.remove(test)
val_ds = random.sample(train_ds, 1)
train_ds.remove(val_ds[0])

In [None]:
t1 = time.time()
for person_id in people:
    tx1 = time.time()
    for data_type in data_types:
        for word_index, word in enumerate(folder_enum):
            print(f"Word : '{words[word_index]}'")
            for iteration in instances:
                path = os.path.join('lip_cropped', person_id, data_type, word, iteration)
                filelist = sorted(os.listdir(path + '/'))
                sequence = [] 
                for img_name in filelist:
                    if img_name.startswith('color'):
                        image = imageio.imread(path + '/' + img_name)
                        image = resize(image, (MAX_WIDTH, MAX_HEIGHT))
                        image = 255 * image
                        # Convert to integer data type pixels.
                        image = image.astype(np.uint8)
                        sequence.append(image)
                # adding padding to create samesequence of the same size
                pad_array = [np.zeros((MAX_WIDTH, MAX_HEIGHT))]     
                # addding paddings of zeros
                sequence.extend(pad_array * (max_seq_length - len(sequence)))
                sequence = np.array(sequence)
                                
                if person_id in test_ds:
                    X_test.append(sequence)
                    y_test.append(word_index)
                elif person_id in val_ds:
                    X_val.append(sequence)
                    y_val.append(word_index)
                else:
                    X_train.append(sequence)
                    y_train.append(word_index)  
    tx2 = time.time()
    print(f'Finished reading images for person {person_id}. Time taken : {tx2 - tx1} secs.')    
    
t2 = time.time()
print(f"Time taken for creating constant size 3D Tensors from the cross lip images : {t2 - t1} secs.")

##### Normalization of X_value to fit between 0 and 1

In [None]:
def normalize(X):
    v_min = X.min(axis=(2, 3), keepdims=True)
    v_max = X.max(axis=(2, 3), keepdims=True)
    X = (X - v_min)/(v_max - v_min)
    X = np.nan_to_num(X)
    return X

In [None]:
# Normalization of X values
X_train = normalize(np.array(X_train))
X_val = normalize(np.array(X_val))
X_test = normalize(np.array(X_test))

# Shuffle x and corresponding y value for better training
X_train, y_train = shuffle(X_train, y_train, random_state=0)
X_test, y_test = shuffle(X_test, y_test, random_state=0)
X_val, y_val = shuffle(X_val, y_val, random_state=0)

In [None]:
X_train = np.expand_dims(X_train, axis=4)
X_val = np.expand_dims(X_val, axis=4)
X_test = np.expand_dims(X_test, axis=4)

In [None]:
from keras.layers.convolutional import Conv3D, MaxPooling3D
from keras.layers.core import Dense, Dropout, Flatten
from keras.models import Sequential
from keras.layers import Activation, ZeroPadding3D, TimeDistributed, LSTM, GRU, Reshape
from keras.utils import plot_model
from keras.utils import to_categorical

In [None]:
y_train = to_categorical(y_train, 10)
y_test = to_categorical(y_test, 10)
y_val = to_categorical(y_val, 10)

In [None]:
model = Sequential()

# 1st layer group
model.add(Conv3D(64, (3, 3, 3), strides = 1, input_shape=(22, 100, 100, 1), activation='relu', padding='valid'))
model.add(MaxPooling3D(pool_size=(2, 2, 2), strides=2))

model.add(Conv3D(128, (3, 3, 3), activation='relu', strides=1))
model.add(MaxPooling3D(pool_size=(2, 2, 2), strides=2))

model.add(Conv3D(256, (2, 2, 2), activation='relu', strides=1))
model.add(MaxPooling3D(pool_size=(2, 2, 2), strides=2))

model.add((Flatten()))

# # Functional Connection Layer
model.add(Dense(4096, activation='relu'))
model.add(Dropout(.5))
model.add(Dense(2048, activation='relu'))
model.add(Dropout(.5))

# Output return
model.add(Dense(10, activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='Adagrad', metrics=['accuracy'])
model.summary()

In [None]:
t1 = time.time()
history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=45)
t2 = time.time()
print()
print(f"Training time : {t2 - t1} secs.")

In [None]:
# # convert from arrays to torch tensor
# train_input = torch.tensor(X_train)
# train_output = torch.tensor(y_train)
# validation_input = torch.tensor(X_val)
# validation_output = torch.tensor(y_val)
# test_input = torch.tensor(X_test)
# test_output = torch.tensor(y_test)

##### Load train, validation, and test dataset

In [None]:
# class LipCroppedDataset(Dataset):
#     def __init__(self, x, y):
#         super(Dataset)
#         self.x = x
#         self.y = y
    
#     def __len__(self):
#         return len(self.y)
    
#     def __getitem__(self, index):
#         return self.x[index], self.y[index]

In [None]:
# # load train and test_dataset
# train_dataset = LipCroppedDataset(train_input, train_output)
# test_dataset = LipCroppedDataset(test_input, test_output)
# validation_dataset = LipCroppedDataset(validation_input, validation_output)
# train_loader = torch.utils.data.DataLoader(dataset = train_dataset,
#                                           batch_size = 500,
#                                           shuffle = True) 
# test_loader = torch.utils.data.DataLoader(dataset = test_dataset,
#                                           batch_size = len(test_dataset),
#                                           shuffle = True)
# validation_loader = torch.utils.data.DataLoader(dataset = validation_dataset,
#                                                batch_size = len(validation_dataset),
#                                                shuffle = True)

In [None]:
# class ConvNet(nn.Module):
#     def __init__(self):
#         super(ConvNet, self).__init__()
#         self.conv1 = nn.Conv3d(22, 64, 3, stride = 1)
#         self.pool = nn.MaxPool3d(2, stride = 2)
#         self.conv2 = nn.Conv3d(64 , 128, 3, stride = 1)
#         self.conv3 = nn.Conv3d(128, 256, 3, stride = 1)
#         self.fc1 = nn.Linear(256 * 3 * 3, 120)
#         self.fc2 = nn.Linear(120, 84)
#         self.fc3 = nn.Linear(84, 10)
#         self.softmax = nn.Softmax(dim = 1)
        
#     def forward(self, x):
#         out = self.pool(F.relu(self.conv1(x)))
#         out = self.pool(F.relu(self.conv2(out)))
#         out = self.pool(F.relu(self.conv3(out)))
#         out = out.view(-1, 256)
#         out = F.relu(self.fc1(out))
#         out = F.relu(self.fc2(out))
#         out = self.fc3(out)

#         return self.softmax(out)

In [None]:
# model = ConvNet()
# criterion = nn.CrossEntropyLoss()
# learning_rate = 0.1
# optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)

In [None]:
# def train_one_epoch(model, data, optimizer, loss_fnc):
#     for inputs, targets in data:
        
#         inputs = inputs.to(torch.float32)

#         # forward pass and calculate loss
#         predictions = model(inputs)
#         loss = loss_fnc(predictions, targets)
        
#         # back propagate and update weights
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()


#     print(f"Loss = {loss.item()}")

In [None]:
# for epoch in range(1):
#     print(f"Epoch: {epoch + 1}")
#     train_one_epoch(model, train_dataset, optimizer, learning_rate)