In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
from glob import glob
import shutil
import math
import json
import subprocess

In [2]:
mp_drawing = mp.solutions.drawing_utils #Drawing utilities
mp_holistic = mp.solutions.holistic   #Holistic model

In [12]:
frame_width, frame_height = (256,256)

In [18]:
num_frame_per_video = 50

In [16]:
with open("WLASL/start_kit/WLASL_v0.3.json", "r") as file:
    content = json.load(file)

In [3]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) #color conversion
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [4]:
def draw_landmark(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS)
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

In [5]:
def draw_styled_landmark(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                                mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1))
    
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2))
    
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                 mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2))
    
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                 mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2))

In [6]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(132)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(1404)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [7]:
def blank_keypoints():
    pose = np.zeros(132)
    face = np.zeros(1404)
    lh = np.zeros(21*3)
    rh = np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [8]:
def createFolder(directory):
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
    except OSError:
        print ('Error: Creating directory. ' +  directory)

In [9]:
def makedirbyname(path):
    try:
        os.mkdir(path)
    except:
        shutil.rmtree(path)
        os.mkdir(path)

In [11]:
def get_data(train_percent=0.90):
    train_sequences, train_labels = [], []
    test_sequences, test_labels = [], []
    words = glob("Data_Numpy/*")
    for word in words:
        actual_label = word.split("/")[-1]
        videos = glob(word + "/*")
        #print(f"-------------{actual_label}-------")
        train_proportion = math.ceil(round(train_percent * len(videos)))
        for index,video_ in enumerate(videos):
            #print(f"********---> Video   {video_} ----- ")
            window = []
            for frame_num in range(1,num_frame_per_video+1):
                np_file = f"{video_}/{frame_num}.npy"
                #print(np_file)
                res = np.load(np_file)
                window.append(res)
            if index <= train_proportion:
                train_sequences.append(window)
                train_labels.append(label_map[actual_label])
            else:
                test_sequences.append(window)
                test_labels.append(label_map[actual_label])
                
    train_sequences = np.array(train_sequences)
    train_labels = np.array(train_labels)
    test_sequences = np.array(test_sequences)
    test_labels = np.array(test_labels)
                
    return train_sequences, train_labels, test_sequences, test_labels


In [13]:
def get_num_frame(filename):
    video_frame_count = 0
    try:
        cap = cv2.VideoCapture(filename)
        video_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    except:
        pass
    finally:
        cap.release()
    return video_frame_count

In [14]:
def get_duration(filename):
    result = subprocess.run(["ffprobe", "-v", "error", "-show_entries",
                             "format=duration", "-of",
                             "default=noprint_wrappers=1:nokey=1", filename],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)
    return float(result.stdout)

In [15]:
def create_folders_video(content):
    for i in content:
        folder_path = f"Data/{i['gloss']}"
        createFolder(folder_path)
        for j in i['instances']:
            if os.path.exists(f"WLASL/start_kit/videos/{j['video_id']}.mp4"):
                shutil.copy(f"WLASL/start_kit/videos/{j['video_id']}.mp4", f"Data/{i['gloss']}/{j['video_id']}.mp4")

In [17]:
def get_actions(content):
    actions = []
    for i in content:
        actions.append(i['gloss'])
    return actions

## Generate numpy data for videos

In [None]:
words = glob("Data/*")
makedirbyname("Data_Numpy")
for word in words:
    videos = glob(word + "/*")
    for video in videos:
        video_path = "/".join("".join(video.split(".")[0:-1]).split("/")[1:])
        createFolder(f"Data_Numpy/{video_path}")
        print(f"Processing collection for... Data_Numpy/{video_path}")
        actual_video_frame = get_num_frame(video)
        if actual_video_frame >= 75:
            remind = actual_video_frame - 75
            to_start = int(remind / 3)
            j = 0
            index = 0
            try:
                cap = cv2.VideoCapture(video)
                with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                    while True:
                        ret, frame = cap.read()
                        j+=1
                        if j < to_start or j%3 != to_start%3:
                            continue
                        
                        index+=1
                        if index > 25:
                            break
                            
                        if ret == False:
                            cap.release()
                            break
                            
                        cv2.imshow(video, frame)
                        if cv2.waitKey(10) and 0xFF == ord("q"):
                            break
                            
                    cap.release()
                    cv2.destroyAllWindows()
            finally:
                cap.release()
                cv2.destroyAllWindows()
        elif actual_video_frame >=50:
            remind = actual_video_frame - 50
            to_start = int(remind / 3)
            j = 0
            index = 0
            try:
                cap = cv2.VideoCapture(video)
                with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                    while True:
                        ret, frame = cap.read()
                        j+=1

                        if j < to_start or j%2 != to_start%2:
                            continue

                        index+=1
                        if index > 25:
                            break

                        if ret == False:
                            cap.release()
                            break

                        cv2.imshow(video, frame)
                        if cv2.waitKey(10) and 0xFF == ord("q"):
                            break

                    cap.release()
                    cv2.destroyAllWindows()
            finally:
                cap.release()
                cv2.destroyAllWindows()
        elif actual_video_frame >=25:
            remind = actual_video_frame - 25
            to_start = int(remind / 3)
            j = 0
            index = 0
            try:
                cap = cv2.VideoCapture(video)
                with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                    while True:
                        ret, frame = cap.read()
                        j+=1

                        if j < to_start:
                            continue

                        index+=1
                        if index > 25:
                            break

                        if ret == False:
                            cap.release()
                            break

                        cv2.imshow(video, frame)
                        if cv2.waitKey(10) and 0xFF == ord("q"):
                            break

                    cap.release()
                    cv2.destroyAllWindows()
            finally:
                cap.release()
                cv2.destroyAllWindows()
        else:
            j = 0
            index = 0
            try:
                cap = cv2.VideoCapture(video)
                with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                    while True:
                        ret, frame = cap.read()
                        j+=1

                        if j < to_start:
                            continue

                        index+=1
                        if index > 25:
                            break

                        if ret == False:
                            cap.release()
                            break

                        cv2.imshow(video, frame)
                        last_frame = frame
                        if cv2.waitKey(10) and 0xFF == ord("q"):
                            break

                    cap.release()
                    cv2.destroyAllWindows()
            finally:
                while index <=25:
                    cv2.imshow(video, last_frame)
                    index+=1
                cap.release()
                cv2.destroyAllWindows()