In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torchvision import transforms, datasets, models
from torchvision.datasets import ImageFolder
from torch.utils.data import random_split, DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm
import mediapipe as mp
import numpy as np
import cv2 as cv2

In [2]:
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

In [3]:
# import zipfile
# with zipfile.ZipFile("train.zip","r") as zip_ref:
#     zip_ref.extractall("train")

In [4]:
train_dir = 'train/asl_alphabet_train/asl_alphabet_train'
test_dir = 'test/asl-alphabet-test'

In [5]:
STANDARD_HEIGHT = 200
STANDARD_WIDTH = 200
MIN_CONFIDENCE_LEVEL = 0.7

class MediaPipe(object):
    def __call__(self, sample):
        image = np.array(sample)
        mp_hands = mp.solutions.hands
        with mp_hands.Hands(static_image_mode = True,max_num_hands = 2,
            min_detection_confidence = MIN_CONFIDENCE_LEVEL) as hands:

            #For training change this line, don't need to flip (since images appear to be from back-facing camera) 
            #Convert cv2 BGR image to RGB image and flip (since image coming from front-facing camera)  
            processed = hands.process(cv2.flip(image, 1))

            #No hand detected (Figure out how we want to handle, 126 vector with all 0s?): 
            if not processed.multi_hand_landmarks: 
                zeros = torch.tensor(np.array([0] * 126), dtype=torch.float32)
                return zeros

            feature_vector = [] 
            #Could have one or two hands: 
            for hand in processed.multi_hand_landmarks: 
                for curr_landmark in hand.landmark: 
                    x = curr_landmark.x 
                    feature_vector.append(x)

                    y = curr_landmark.y 
                    feature_vector.append(y)

                    z = curr_landmark.z
                    feature_vector.append(z)

            #If we have just one hand, zero out the remaining (to ensure constant vector size of 126)
            #Might cause problems in one-hand case if we care which hand is visible/showing sign language
            #Solution to this is to use processed.multi_handedness
            if (len(feature_vector) == 63):
                zero_vector = [0] * 63 
                feature_vector.extend(zero_vector)
            
            output = torch.tensor(np.array(feature_vector), dtype=torch.float32)

            return output

class ExtractHandFeatures: 
    def __call__(self, sample):
        image = np.array(sample)
        mp_hands = mp.solutions.hands
        with mp_hands.Hands(static_image_mode = True,max_num_hands = 2,
            min_detection_confidence = MIN_CONFIDENCE_LEVEL) as hands:
            
            #For training change this line, don't need to flip (since images appear to be from back-facing camera) 
            #Convert cv2 BGR image to RGB image and flip (since image coming from front-facing camera)  
            processed = hands.process(cv2.flip(cv2.cvtColor(image, cv2.COLOR_BGR2RGB), 1))
            # processed = hands.process(cv2.cvtColor(self.raw_image, cv2.COLOR_BGR2RGB)) 

            #No hand detected (Figure out how we want to handle, 126 vector with all 0s?): 
            if not processed.multi_hand_landmarks: 
                zeros = torch.tensor(np.array([0] * 126), dtype=torch.float32)
                return zeros
            
            feature_vector = []         
            hands = [] 

            for idx, hand_handedness in enumerate(processed.multi_handedness):
                hands.append(hand_handedness.classification[0].label)
                

            #Left hand is first 63, Right hand is last 63
            #LEFT HAND ONLY CASE: 
            if (len(hands) == 1 and hands[0] == "Left"):
                for hand in processed.multi_hand_landmarks: 
                    for curr_landmark in hand.landmark: 
                        x = curr_landmark.x 
                        feature_vector.append(x)

                        y = curr_landmark.y 
                        feature_vector.append(y)

                        z = curr_landmark.z
                        feature_vector.append(z)
                zero_vector = [0] * 63 
                feature_vector.extend(zero_vector)

            #RIGHT HAND ONLY CASE: 
            if (len(hands) == 1 and hands[0] == "Right"):
#                 print("Detected only right hand")
                for hand in processed.multi_hand_landmarks: 
                    for curr_landmark in hand.landmark: 
                        x = curr_landmark.x 
                        feature_vector.append(x)

                        y = curr_landmark.y 
                        feature_vector.append(y)

                        z = curr_landmark.z
                        feature_vector.append(z)
                zero_vector = [0] * 63 
                feature_vector = zero_vector + feature_vector
            
            #BOTH HANDS CASE: 
            if (len(hands) == 2):
                # print("Detected both hands")
                zeros = torch.tensor(np.array([0] * 126), dtype=torch.float32)
                return zeros

            output = torch.tensor(np.array(feature_vector), dtype=torch.float32)
            #print(output)
            return output


def get_angle_between_vectors(u: np.ndarray, v: np.ndarray) -> float:
    dot_product = np.dot(u, v)
    norm = np.linalg.norm(u) * np.linalg.norm(v)
    return np.arccos(dot_product / norm)        

class ExtractAngleFeatures: 
    def __call__(self, sample):
        image = np.array(sample)
        mp_hands = mp.solutions.hands
        with mp_hands.Hands(static_image_mode = True,max_num_hands = 2,
            min_detection_confidence = MIN_CONFIDENCE_LEVEL) as hands:

            #For training change this line, don't need to flip (since images appear to be from back-facing camera) 
            #Convert cv2 BGR image to RGB image and flip (since image coming from front-facing camera)  
            processed = hands.process(cv2.flip(cv2.cvtColor(image, cv2.COLOR_BGR2RGB), 1))
            #processed = hands.process(cv2.cvtColor(raw_image, cv2.COLOR_BGR2RGB)) #CHANGE THIS TO self.raw_image 

            #No hand detected (Figure out how we want to handle, 882 vector with all 0s?): 
            if not processed.multi_hand_landmarks: 
#                 print("no hand")
                zeros = torch.tensor(np.array([0] * 882), dtype=torch.float32)
                return zeros

            angles_list = []         
            hands = [] 

            for idx, hand_handedness in enumerate(processed.multi_handedness):
                hands.append(hand_handedness.classification[0].label)

            #LEFT HAND ONLY CASE: 
            if (len(hands) == 1 and hands[0] == "Left"):

                landmarks = np.zeros((21, 3))
                index = 0 
#                 print("Detected only left hand")
                for hand in processed.multi_hand_landmarks:   
                    for curr_landmark in hand.landmark: 
                        x = curr_landmark.x 
                        y = curr_landmark.y 
                        z = curr_landmark.z
                        landmarks[index] = [x, y, z]
                        index += 1

                # print("Landmarks is:")
                # print(landmarks)

                connections = mp_hands.HAND_CONNECTIONS
                # print(connections)
                # print(len(connections))

                difference_connect_vector = list(map(lambda t: landmarks[t[1]] - landmarks[t[0]], connections))
                # print(difference_connect_vector)
                # print(len(difference_connect_vector))

                for connection_from in difference_connect_vector:
                    for connection_to in difference_connect_vector:
                        angle = get_angle_between_vectors(connection_from, connection_to)
                        # If the angle is not null we store it else we store 0
                        if angle == angle:
                            angles_list.append(angle)
                        else:
                            angles_list.append(0)
                # print("Angles list is:")
                # print(angles_list)
                # print(len(angles_list))
                zero_vector = [0] * 441 
                angles_list.extend(zero_vector)

            #RIGHT HAND ONLY CASE: 
            if (len(hands) == 1 and hands[0] == "Right"):
                landmarks = np.zeros((21, 3))
                index = 0 
#                 print("Detected only right hand")
                for hand in processed.multi_hand_landmarks:   
                    for curr_landmark in hand.landmark: 
                        x = curr_landmark.x 
                        y = curr_landmark.y 
                        z = curr_landmark.z
                        landmarks[index] = [x, y, z]
                        index += 1

                # print("Landmarks is:")
                # print(landmarks)
                connections = mp_hands.HAND_CONNECTIONS
                # print(connections)
                # print(len(connections))

                difference_connect_vector = list(map(lambda t: landmarks[t[1]] - landmarks[t[0]], connections))
                # print(difference_connect_vector)
                # print(len(difference_connect_vector))

                for connection_from in difference_connect_vector:
                    for connection_to in difference_connect_vector:
                        angle = get_angle_between_vectors(connection_from, connection_to)
                        # If the angle is not null we store it else we store 0
                        if angle == angle:
                            angles_list.append(angle)
                        else:
                            angles_list.append(0)
                zero_vector = [0] * 441 
                angles_list = zero_vector + angles_list

            #BOTH HANDS CASE: 
            if (len(hands) == 2):
                print("Detected both hands")
                zeros = torch.tensor(np.array([0] * 882), dtype=torch.float32)
                return zeros

            output = torch.tensor(np.array(angles_list), dtype=torch.float32)
            return output

In [6]:
transform = transforms.Compose([
    ExtractAngleFeatures(),
])
test_transform = transforms.Compose([
    transforms.Resize((200,200)),
    ExtractAngleFeatures(),
])
dataset = ImageFolder(train_dir, transform=transform)
test = ImageFolder(test_dir, transform=test_transform)

In [7]:
dataset[6]

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


(tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.

In [8]:
dataset_len = len(dataset)
train_len_proportion = 0.9
train_len = int(train_len_proportion * dataset_len)
val_len = dataset_len - train_len
train_dataset, val_dataset = random_split(dataset, [train_len, val_len])

In [9]:
len(train_dataset), len(val_dataset)

(78300, 8700)

In [10]:
batch_size = 50
train_dl = DataLoader(train_dataset, batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_dl = DataLoader(val_dataset, batch_size*2, num_workers=2, pin_memory=True)

In [11]:
classes = dataset.classes
print(len(classes))

29


In [12]:
# iter_data = iter(train_dl)
# fig, axes = plt.subplots(figsize=(12, 12), ncols=5)
# for i in range(5):
#     img, label = next(iter_data)
#     ax = axes[i]
#     ax.imshow(img[0].permute(1, 2, 0))
#     ax.title.set_text(''.join('%5s' % classes[label[0]]))
# plt.show()

In [13]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)
            
    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [14]:
device = get_default_device()
device

device(type='cuda')

In [15]:
train_dl = DeviceDataLoader(train_dl, device)
valid_dl = DeviceDataLoader(val_dl, device)

In [16]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

In [17]:
class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], val_loss: {:.4f}, val_acc: {:.4f}".format(epoch, result['val_loss'], result['val_acc']))

In [18]:
class ASLResnet(ImageClassificationBase):
    def __init__(self):
        super().__init__()        
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(882, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(p=0.25),
            nn.Linear(128, 29),            
        )
        
        self.network = self.linear_relu_stack
    
    def forward(self, xb):
        return self.network(xb)
    
    def freeze(self):
        # To freeze the residual layers
        for param in self.network.parameters():
            param.require_grad = False
#         for param in self.network.fc.parameters():
#             param.require_grad = True
    
    def unfreeze(self):
        # Unfreeze all layers
        for param in self.network.parameters():
            param.require_grad = True

In [19]:
model = to_device(ASLResnet(), device)
model

ASLResnet(
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=882, out_features=256, bias=True)
    (1): ReLU()
    (2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Linear(in_features=256, out_features=256, bias=True)
    (4): ReLU()
    (5): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): Linear(in_features=256, out_features=256, bias=True)
    (7): ReLU()
    (8): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (9): Linear(in_features=256, out_features=128, bias=True)
    (10): ReLU()
    (11): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (12): Dropout(p=0.25, inplace=False)
    (13): Linear(in_features=128, out_features=29, bias=True)
  )
  (network): Sequential(
    (0): Linear(in_features=882, out_features=256, bias=True)
    (1): ReLU()
    (2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_

In [20]:
from tqdm import tqdm
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in tqdm(val_loader)]
    return model.validation_epoch_end(outputs)

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_one_cycle(epochs, max_lr, model, train_loader, val_loader, 
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):
    torch.cuda.empty_cache()
    history = []
    
    # Set up custom optimizer with weight decay
    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # Set up one-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_loader))
    
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        lrs = []
        for batch in tqdm(train_loader):
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            
            # Gradient clipping
            if grad_clip: 
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)
            
            optimizer.step()
            optimizer.zero_grad()
            
            # Record & update learning rate
            lrs.append(get_lr(optimizer))
            sched.step()
        
        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['lrs'] = lrs
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [21]:
history = [evaluate(model, valid_dl)]
history

100%|███████████████████████████████████████████| 87/87 [10:38<00:00,  7.34s/it]


[{'val_loss': 3.3680648803710938, 'val_acc': 0.03735632076859474}]

In [22]:
# model.freeze()
epochs = 5
max_lr = 1e-4
grad_clip = 0.1
weight_decay = 1e-4
opt_func = torch.optim.Adam

In [None]:
%%time
history += fit_one_cycle(epochs, max_lr, model, train_dl, valid_dl, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)

 87%|████████████████████████████████▏    | 1362/1566 [1:19:25<11:36,  3.41s/it]

Detected both hands


 97%|███████████████████████████████████▉ | 1520/1566 [1:28:25<02:37,  3.42s/it]

Detected both hands


100%|█████████████████████████████████████| 1566/1566 [1:31:02<00:00,  3.49s/it]
100%|███████████████████████████████████████████| 87/87 [10:07<00:00,  6.98s/it]


Epoch [0], val_loss: 3.7160, val_acc: 0.1040


  7%|██▌                                   | 105/1566 [06:13<1:25:24,  3.51s/it]

Detected both hands


 67%|████████████████████████▉            | 1053/1566 [1:00:26<29:17,  3.43s/it]

Detected both hands


100%|█████████████████████████████████████| 1566/1566 [1:29:40<00:00,  3.44s/it]
100%|███████████████████████████████████████████| 87/87 [10:09<00:00,  7.00s/it]


Epoch [1], val_loss: 3.9499, val_acc: 0.1029


  2%|▉                                      | 39/1566 [02:19<1:28:49,  3.49s/it]

Detected both hands


 59%|███████████████████████▋                | 927/1566 [53:39<37:38,  3.53s/it]

Detected both hands


100%|█████████████████████████████████████| 1566/1566 [1:30:26<00:00,  3.47s/it]
100%|███████████████████████████████████████████| 87/87 [10:04<00:00,  6.95s/it]


Epoch [2], val_loss: 3.6111, val_acc: 0.1267


 16%|█████▉                                | 243/1566 [14:03<1:16:57,  3.49s/it]

Detected both hands


 67%|██████████████████████████             | 1044/1566 [59:37<29:19,  3.37s/it]

Detected both hands


100%|█████████████████████████████████████| 1566/1566 [1:29:00<00:00,  3.41s/it]
100%|███████████████████████████████████████████| 87/87 [10:15<00:00,  7.07s/it]


Epoch [3], val_loss: 3.5559, val_acc: 0.1325


 26%|█████████▊                            | 402/1566 [23:03<1:06:15,  3.42s/it]

Detected both hands


 67%|██████████████████████████             | 1044/1566 [59:34<29:27,  3.39s/it]

In [None]:
torch.save(model.state_dict(), 'asl-colored-extractangle-mvp3.pth')

In [None]:
### Test
def predict_image(img, model):
    # Convert to a batch of 1
    xb = to_device(img.unsqueeze(0), device)
    # Get predictions from model
    yb = model(xb)
    # Pick index with highest probability
    _, preds  = torch.max(yb, dim=1)
    # Retrieve the class label
    return dataset.classes[preds[0].item()]
len(test)
test_dl = DataLoader(test, batch_size, num_workers=4, pin_memory=True)
test_dl = DeviceDataLoader(test_dl, device)
evaluate(model, test_dl)

In [None]:
# Sanity Check
model2 = to_device(ASLResnet(), device)
model2.load_state_dict(torch.load('asl-colored-extractangle-mvp3.pth'))
evaluate(model2, valid_dl)
