In [None]:
from google.colab import drive
drive.mount('/content/drive')

%cd /content/drive/MyDrive/EECS_442 Final Proj/

Mounted at /content/drive
/content/drive/.shortcut-targets-by-id/1Sc7VSl-7PU4L0hxklxePzLVSFNQ7pnLE/EECS_442 Final Proj


In [None]:
import os
import torch
import numpy as np
import pandas as pd
import math
import cv2
import json
import glob
import torchvision.transforms as transforms
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import models as torch_models
from tqdm.auto import tqdm
from torchsummary import summary
import torch.optim as optim
from sklearn.metrics import accuracy_score

In [None]:
%cd /content/drive/MyDrive/EECS_442 Final Proj/WLASL/


/content/drive/.shortcut-targets-by-id/1Sc7VSl-7PU4L0hxklxePzLVSFNQ7pnLE/EECS_442 Final Proj/WLASL


In [None]:
if torch.cuda.is_available():
    print("GPU")
    device = 'cuda'
else:
    print("CPU")
    device = 'cpu'

GPU


# Import Dataset

In [None]:
from torch.utils.data import Dataset
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
import random


def labels2cat(label_encoder, list):
    return label_encoder.transform(list)

def compute_difference(x):
    diff = []

    for i, xx in enumerate(x):
        temp = []
        for j, xxx in enumerate(x):
            if i != j:
                temp.append(xx - xxx)

        diff.append(temp)

    return diff



class Sign_Dataset2(Dataset):
    def __init__(self, index_file_path, split, pose_root, sample_strategy='rnd_start', num_samples=25, num_copies=4,
                 img_transforms=None, video_transforms=None, test_index_file=None):
        assert os.path.exists(index_file_path), "Non-existent indexing file path: {}.".format(index_file_path)
        assert os.path.exists(pose_root), "Path to poses does not exist: {}.".format(pose_root)

        self.data = []
        self.label_encoder, self.onehot_encoder = LabelEncoder(), OneHotEncoder(categories='auto')

        if type(split) == 'str':
            split = [split]

        self.test_index_file = test_index_file
        self._make_dataset(index_file_path, pose_root, split)

        self.index_file_path = index_file_path
        self.pose_root = pose_root
        self.framename = 'image_{}_keypoints.json'
        self.sample_strategy = sample_strategy
        self.num_samples = num_samples

        self.img_transforms = img_transforms
        self.video_transforms = video_transforms

        self.num_copies = num_copies

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        video_id, gloss_cat, frame_start, frame_end = self.data[index]
        # frames of dimensions (T, H, W, C)
        x = self._load_poses(video_id, frame_start, frame_end, self.sample_strategy, self.num_samples)

        if self.video_transforms:
            x = self.video_transforms(x)

        y = gloss_cat

        

        return x, y, video_id

    def _make_dataset(self, index_file_path, pose_root, split):
        with open(index_file_path, 'r') as f:
            content = json.load(f)

        vid_ids = os.listdir(pose_root)
        
        glosses = sorted([gloss_entry['gloss'] for gloss_entry in content])

        self.label_encoder.fit(glosses)
        self.onehot_encoder.fit(self.label_encoder.transform(self.label_encoder.classes_).reshape(-1, 1))

        if self.test_index_file is not None:
            print('Trained on {}, tested on {}'.format(index_file_path, self.test_index_file))
            with open(self.test_index_file, 'r') as f:
                content = json.load(f)

        # make dataset
        for gloss_entry in content:
            gloss, instances = gloss_entry['gloss'], gloss_entry['instances']
            gloss_cat = labels2cat(self.label_encoder, [gloss])[0]

            for instance in instances:
                if instance['video_id'] not in vid_ids:
                    continue
                if instance['split'] not in split:
                    continue

                frame_end = instance['frame_end']
                frame_start = instance['frame_start']
                video_id = instance['video_id']

                instance_entry = video_id, gloss_cat, frame_start, frame_end
                self.data.append(instance_entry)

    def _load_poses(self, video_id, frame_start, frame_end, sample_strategy, num_samples):
        """ Load frames of a video. Start and end indices are provided just to avoid listing and sorting the directory unnecessarily.
         """
        poses = []

        if sample_strategy == 'rnd_start':
            frames_to_sample = rand_start_sampling(frame_start, frame_end, num_samples)

        else:
            raise NotImplementedError('Unimplemented sample strategy found: {}.'.format(sample_strategy))

        
        for i in frames_to_sample:
            pose_path = os.path.join(self.pose_root, video_id, self.framename.format(str(i).zfill(5)))
            # pose = cv2.imread(frame_path, cv2.COLOR_BGR2RGB)
            pose = read_pose_file(pose_path)
            #print(pose.size())

            if pose is not None:
                if self.img_transforms:
                    pose = self.img_transforms(pose)

                poses.append(torch.flatten(pose))
            else:
                try:
                    poses.append(poses[-1])
                except Exception:
                    print(pose_path)

        pad = None

       
        # if len(frames_to_sample) < num_samples:
        if len(poses) < num_samples:
            num_padding = num_samples - len(frames_to_sample)
            last_pose = poses[-1]
            pad = last_pose.repeat(1, num_padding)
        
        # poses_across_time = torch.flatten(poses, dim=1)

        # # poses_across_time = torch.cat(poses, dim=2)
        if pad is not None:
            poses = [poses, pad]

        return torch.Tensor(poses)

def read_pose_file(filepath):
    body_pose_exclude = {9, 10, 11, 22, 23, 24, 12, 13, 14, 19, 20, 21}
        
    path_parts = os.path.split(filepath)

    frame_id = path_parts[1][:11]
    vid = os.path.split(path_parts[0])[-1]

    save_to = os.path.join('./data/features', vid)
    
    try:
        ft = torch.load(os.path.join(save_to, frame_id + '_ft.pt'))
        xy = ft[:, :2]
  
        return xy

    except FileNotFoundError:

        try:
            #print(filepath)
          content = json.load(open(filepath))["people"][0]
        except Exception:
          return None
        
        body_pose = content["pose_keypoints_2d"]
        left_hand_pose = content["hand_left_keypoints_2d"]
        right_hand_pose = content["hand_right_keypoints_2d"]
    
        body_pose.extend(left_hand_pose)
        body_pose.extend(right_hand_pose)
    
        x = [v for i, v in enumerate(body_pose) if i % 3 == 0 and i // 3 not in body_pose_exclude]
        y = [v for i, v in enumerate(body_pose) if i % 3 == 1 and i // 3 not in body_pose_exclude]
        # conf = [v for i, v in enumerate(body_pose) if i % 3 == 2 and i // 3 not in body_pose_exclude]
    
        x = 2 * ((torch.FloatTensor(x) / 256.0) - 0.5)
        y = 2 * ((torch.FloatTensor(y) / 256.0) - 0.5)
        # conf = torch.FloatTensor(conf)
    
        x_diff = torch.FloatTensor(compute_difference(x)) / 2
        y_diff = torch.FloatTensor(compute_difference(y)) / 2
    
        zero_indices = (x_diff == 0).nonzero()
    
        orient = y_diff / x_diff
        orient[zero_indices] = 0
    
        xy = torch.stack([x, y]).transpose_(0, 1)
    
        ft = torch.cat([xy, x_diff, y_diff, orient], dim=1)
    
    
        xy = ft[:, :2]
        
        if not os.path.exists(save_to):
            os.mkdir(save_to)
        torch.save(ft, os.path.join(save_to, frame_id + '_ft.pt'))
        #print("Saving",save_to, frame_id + '_ft.pt')
        return xy


def rand_start_sampling(frame_start, frame_end, num_samples):
    """Randomly select a starting point and return the continuous ${num_samples} frames."""
    num_frames = frame_end - frame_start + 1

    if num_frames > num_samples:
        select_from = range(frame_start, frame_end - num_samples + 1)
        sample_start = random.choice(select_from)
        frames_to_sample = list(range(sample_start, sample_start + num_samples))
    else:
        frames_to_sample = list(range(frame_start, frame_end + 1))

    return frames_to_sample


In [None]:
from torch.utils.data import Dataset
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
import random


def labels2cat(label_encoder, list):
    return label_encoder.transform(list)

def compute_difference(x):
    diff = []

    for i, xx in enumerate(x):
        temp = []
        for j, xxx in enumerate(x):
            if i != j:
                temp.append(xx - xxx)

        diff.append(temp)

    return diff



class Sign_Dataset(Dataset):
    def __init__(self, index_file_path, split, pose_root, sample_strategy='rnd_start', num_samples=25, num_copies=4,
                 img_transforms=None, video_transforms=None, test_index_file=None):
        assert os.path.exists(index_file_path), "Non-existent indexing file path: {}.".format(index_file_path)
        assert os.path.exists(pose_root), "Path to poses does not exist: {}.".format(pose_root)

        self.data = []
        self.label_encoder, self.onehot_encoder = LabelEncoder(), OneHotEncoder(categories='auto')

        if type(split) == 'str':
            split = [split]

        self.test_index_file = test_index_file
        self._make_dataset(index_file_path, pose_root, split)

        self.index_file_path = index_file_path
        self.pose_root = pose_root
        self.framename = 'image_{}_keypoints.json'
        self.sample_strategy = sample_strategy
        self.num_samples = num_samples

        self.img_transforms = img_transforms
        self.video_transforms = video_transforms

        self.num_copies = num_copies

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        video_id, gloss_cat, frame_start, frame_end = self.data[index]
        # frames of dimensions (T, H, W, C)
        x = self._load_poses(video_id, frame_start, frame_end, self.sample_strategy, self.num_samples)

        if self.video_transforms:
            x = self.video_transforms(x)

        y = gloss_cat

        

        return x, y, video_id

    def _make_dataset(self, index_file_path, pose_root, split):
        with open(index_file_path, 'r') as f:
            content = json.load(f)

        vid_ids = os.listdir(pose_root)
        
        glosses = sorted([gloss_entry['gloss'] for gloss_entry in content])

        self.label_encoder.fit(glosses)
        self.onehot_encoder.fit(self.label_encoder.transform(self.label_encoder.classes_).reshape(-1, 1))

        if self.test_index_file is not None:
            print('Trained on {}, tested on {}'.format(index_file_path, self.test_index_file))
            with open(self.test_index_file, 'r') as f:
                content = json.load(f)

        # make dataset
        for gloss_entry in content:
            gloss, instances = gloss_entry['gloss'], gloss_entry['instances']
            gloss_cat = labels2cat(self.label_encoder, [gloss])[0]

            for instance in instances:
                if instance['video_id'] not in vid_ids:
                    continue
                if instance['split'] not in split:
                    continue

                frame_end = instance['frame_end']
                frame_start = instance['frame_start']
                video_id = instance['video_id']

                instance_entry = video_id, gloss_cat, frame_start, frame_end
                self.data.append(instance_entry)

    def _load_poses(self, video_id, frame_start, frame_end, sample_strategy, num_samples):
        """ Load frames of a video. Start and end indices are provided just to avoid listing and sorting the directory unnecessarily.
         """
        poses = []

        if sample_strategy == 'rnd_start':
            frames_to_sample = rand_start_sampling(frame_start, frame_end, num_samples)

        else:
            raise NotImplementedError('Unimplemented sample strategy found: {}.'.format(sample_strategy))

        
        for i in frames_to_sample:
            pose_path = os.path.join(self.pose_root, video_id, self.framename.format(str(i).zfill(5)))
            # pose = cv2.imread(frame_path, cv2.COLOR_BGR2RGB)
            pose = read_pose_file(pose_path)
            #print(pose.size())

            if pose is not None:
                if self.img_transforms:
                    pose = self.img_transforms(pose)

                poses.append(pose)
            else:
                try:
                    poses.append(poses[-1])
                except Exception:
                    print(pose_path)

        pad = None

        # if len(frames_to_sample) < num_samples:
        if len(poses) < num_samples:
            num_padding = num_samples - len(frames_to_sample)
            last_pose = poses[-1]
            pad = last_pose.repeat(1, num_padding)

        poses_across_time = torch.cat(poses, dim=1)
        if pad is not None:
            poses_across_time = torch.cat([poses_across_time, pad], dim=1)

        return poses_across_time

def read_pose_file(filepath):
    body_pose_exclude = {9, 10, 11, 22, 23, 24, 12, 13, 14, 19, 20, 21}
        
    path_parts = os.path.split(filepath)

    frame_id = path_parts[1][:11]
    vid = os.path.split(path_parts[0])[-1]

    save_to = os.path.join('./data/features', vid)
    
    try:
        ft = torch.load(os.path.join(save_to, frame_id + '_ft.pt'))
        xy = ft[:, :2]
  
        return xy

    except FileNotFoundError:

        try:
            #print(filepath)
          content = json.load(open(filepath))["people"][0]
        except Exception:
          return None
        
        body_pose = content["pose_keypoints_2d"]
        left_hand_pose = content["hand_left_keypoints_2d"]
        right_hand_pose = content["hand_right_keypoints_2d"]
    
        body_pose.extend(left_hand_pose)
        body_pose.extend(right_hand_pose)
    
        x = [v for i, v in enumerate(body_pose) if i % 3 == 0 and i // 3 not in body_pose_exclude]
        y = [v for i, v in enumerate(body_pose) if i % 3 == 1 and i // 3 not in body_pose_exclude]
        # conf = [v for i, v in enumerate(body_pose) if i % 3 == 2 and i // 3 not in body_pose_exclude]
    
        x = 2 * ((torch.FloatTensor(x) / 256.0) - 0.5)
        y = 2 * ((torch.FloatTensor(y) / 256.0) - 0.5)
        # conf = torch.FloatTensor(conf)
    
        x_diff = torch.FloatTensor(compute_difference(x)) / 2
        y_diff = torch.FloatTensor(compute_difference(y)) / 2
    
        zero_indices = (x_diff == 0).nonzero()
    
        orient = y_diff / x_diff
        orient[zero_indices] = 0
    
        xy = torch.stack([x, y]).transpose_(0, 1)
    
        ft = torch.cat([xy, x_diff, y_diff, orient], dim=1)
    
    
        xy = ft[:, :2]
        
        if not os.path.exists(save_to):
            os.mkdir(save_to)
        torch.save(ft, os.path.join(save_to, frame_id + '_ft.pt'))
        #print("Saving",save_to, frame_id + '_ft.pt')
        return xy


def rand_start_sampling(frame_start, frame_end, num_samples):
    """Randomly select a starting point and return the continuous ${num_samples} frames."""
    num_frames = frame_end - frame_start + 1

    if num_frames > num_samples:
        select_from = range(frame_start, frame_end - num_samples + 1)
        sample_start = random.choice(select_from)
        frames_to_sample = list(range(sample_start, sample_start + num_samples))
    else:
        frames_to_sample = list(range(frame_start, frame_end + 1))

    return frames_to_sample


# Training Split

In [None]:
split_file = './data/splits/asl100.json'
keypoint_dir = './data/pose_per_individual_videos/'
num_samples = 50

train_dataset = Sign_Dataset(index_file_path=split_file, split='train', pose_root=keypoint_dir,
                                img_transforms=None, video_transforms=None, num_samples=num_samples)
val_dataset = Sign_Dataset(index_file_path=split_file, split='val', pose_root=keypoint_dir,
                                img_transforms=None, video_transforms=None, num_samples=num_samples)
test_dataset = Sign_Dataset(index_file_path=split_file, split='test', pose_root=keypoint_dir,
                                img_transforms=None, video_transforms=None, num_samples=num_samples)

# train_dataset = Sign_Dataset2(index_file_path=split_file, split='train', pose_root=keypoint_dir,
#                                 img_transforms=None, video_transforms=None, num_samples=num_samples)
# val_dataset = Sign_Dataset2(index_file_path=split_file, split='val', pose_root=keypoint_dir,
#                                 img_transforms=None, video_transforms=None, num_samples=num_samples)
# test_dataset = Sign_Dataset2(index_file_path=split_file, split='test', pose_root=keypoint_dir,
#                                 img_transforms=None, video_transforms=None, num_samples=num_samples)

In [None]:
train_dataset[2][0].size()

torch.Size([55, 100])

In [None]:
batch_size = 32

train_loader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True, num_workers = 4, pin_memory= 4)
val_loader = DataLoader(val_dataset,batch_size=batch_size,shuffle=True, num_workers = 4, pin_memory= 4)
test_loader = DataLoader(test_dataset,batch_size=batch_size,shuffle=True, num_workers = 4, pin_memory= 4)

  cpuset_checked))


In [None]:
NUM_CLASSES = len(train_dataset.label_encoder.classes_)
NUM_FEATURES = 100



```
# This is formatted as code
```

# My Models

In [None]:
class PoseGRU(nn.Module):
  def __init__(self,input_size=100,hidden_size = 128,output_size = 100, num_layers =2, drop = 0.0):
    super().__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.num_layers = num_layers
    self.drop = drop

    self.gru1 = nn.GRU(input_size = self.input_size, hidden_size = 64, num_layers = self.num_layers, batch_first = True)
    self.gru2 = nn.GRU(64, hidden_size = 64, num_layers = self.num_layers, batch_first = True)
    self.gru3 = nn.GRU(64, hidden_size = 128, num_layers = self.num_layers, batch_first = True)
    self.gru4 = nn.GRU(128, hidden_size = 128, num_layers = self.num_layers, batch_first = True)

    self.dropout = nn.Dropout(p=self.drop)
    self.fc1 = nn.Linear(self.hidden_size,self.input_size//2)
    self.fc2 = nn.Linear(self.input_size//2,self.output_size)
    self.relu = nn.ReLU()
    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x):
    print("1")
    output,h1 = self.gru1(x,None)
    print("1")
    output,h2 = self.gru2(output,h1)
    print("1")
    output,h3 = self.gru3(output,h2)
    print("1")
    output,h = self.gru4(output,h3)
    print("1")
    output = self.dropout(output)

    output = self.fc1(output[:,-1,:])
    return self.softmax(self.fc2(output))


In [None]:
class PoseGRU2(nn.Module):
  def __init__(self,input_size=55,hidden_size = 128,output_size = 100, num_layers =2, drop = 0.0):
    super().__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.num_layers = num_layers
    self.drop = drop

    self.lstm = nn.GRU(input_size = self.input_size, hidden_size = self.hidden_size, num_layers = self.num_layers, batch_first = True)
    self.dropout = nn.Dropout(p=self.drop)
    self.fc1 = nn.Linear(self.hidden_size*2,self.output_size)

    self.relu = nn.ReLU()

  def forward(self, x):
    output,self.h = self.lstm(x,None)
    avg_pool = F.adaptive_avg_pool1d(output,1)
    max_pool = F.adaptive_max_pool1d(output,1)
    print(avg_pool.size(),max_pool.size())
    print("here")
    outp = self.fc1(torch.cat([avg_pool,max_pool],dim=0))             
    return F.softmax(outp, dim=-1) 


# Train


In [None]:
model = PoseGRU(drop = 0.3,input_size=100,output_size=NUM_CLASSES).to(device)
#model2 = PoseGRU2(drop = 0.3,input_size=100,output_size=NUM_CLASSES).to(device)
#model3 = LSTM(drop_p = 0.3, lstm_input_size=110,num_classes=100).to(device)

summary(model, (50,110), device=device)

In [None]:
x = torch.ones([2, 55, 100]).to(device)
model(x).size()

In [None]:
EPOCHS = 200

LR = 0.001
EPS = 1e-3

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR, eps = EPS)

In [None]:
def evaluate(model, loader):  # Evaluate accuracy on validation / test set
    model.eval()  # Set the model to evaluation mode
    correct = 0
    with torch.no_grad():  # Do not calculate grident to speed up computation
        for batch, (pose,y,id) in tqdm(loader):
            pose,y,id = pose.to(device),y.to(device).view(-1,),id
            correct += (torch.argmax(pred, dim=1) == y).sum().item()
        acc = correct/len(loader.dataset)
        print("\n Evaluation accuracy: {}".format(acc))
        return acc

In [None]:
all_labels = []
all_preds = []
for e in range(EPOCHS):
    tot_loss = 0.0
    tot_loss2 = 0.0
    labels = []
    preds = []
    count = 0

    model.train()
    for batch,data in enumerate(train_loader):
        pose,y,id = data[0].to(device),data[1].to(device).view(-1,),data[2]

        # print(pose.size())
        # print(y)
        optimizer.zero_grad()
        print(pose.size())
        out = model(pose)
        # out2 = model2(pose)

        loss = criterion(out, y)
        #loss2 = criterion(out2, y)
        tot_loss += loss
        #tot_loss2 += loss

        # pred = torch.argmax(out, dim=1)
        pred = torch.max(out, 1)[1]

       # print(pred,y)

        labels.extend(y.cpu().data.squeeze().tolist())
        preds.extend(pred.cpu().data.squeeze().tolist())
        
        acc = accuracy_score(y.cpu().data.squeeze().tolist(),pred.cpu().data.squeeze().tolist())
        # acc2 = accuracy_score(y.cpu().data.squeeze().tolist(),pred2.cpu().data.squeeze().tolist())

        print("Batch:{} Accuracy :{}".format(batch,acc))

        count +=1

        loss.backward()
        # loss2.backward()
        optimizer.step()

    #evaluate(model,val_loader)
    accuracy = accuracy_score(labels,preds)
    all_labels.extend(labels)
    all_preds.extend(preds)
    print("\nEpoch:{} Training loss:{} Training Accuracy:{}".format(e+1,tot_loss/count,accuracy))





  




