In [3]:
from __future__ import unicode_literals, print_function, division
import sys

In [1]:
import itertools
import numpy
import torch
import pickle
from scipy import ndimage as ndimage
from sklearn.utils import shuffle
import time
import math

In [19]:
try:
    from tensorboardX import SummaryWriter
except:
    # tensorboardX is not installed, just fail silently
    class SummaryWriter():
        def __init__(self):
            pass
        def add_scalar(self, tag, scalar_value, global_step=None, walltime=None):
            pass

In [4]:
print('Using python {}.{}, with modules versions'.format(sys.version_info.major, sys.version_info.minor))
print('-'*40)
print('numpy == {}'.format(numpy.__version__))
print('torch == {}'.format(torch.__version__))

Using python 3.8, with modules versions
----------------------------------------
numpy == 1.20.1
torch == 1.8.1


In [14]:
def load_data(filepath='./shrec_data.pckl'):
    """
    Returns hand gesture sequences (X) and their associated labels (Y).
    Each sequence has two different labels.
    The first label  Y describes the gesture class out of 14 possible gestures (e.g. swiping your hand to the right).
    The second label Y describes the gesture class out of 28 possible gestures (e.g. swiping your hand to the right with your index pointed, or not pointed).
    """
    file = open(filepath, 'rb')
    data = pickle.load(file, encoding='latin1')  # <<---- change to 'latin1' to 'utf8' if the data does not load
    file.close()
    return data['x_train'], data['x_test'], data['y_train_14'], data['y_train_28'], data['y_test_14'], data['y_test_28']

In [7]:
def resize_sequences_length(x_train, x_test, final_length=100):
    """
    Resize the time series by interpolating them to the same length
    """
    # please use python3. if you still use python2, important note: redefine the classic division operator / by importing it from the __future__ module
    x_train = numpy.array([numpy.array([ndimage.zoom(x_i.T[j], final_length / len(x_i), mode='reflect') for j in range(numpy.size(x_i, 1))]).T for x_i in x_train])
    x_test  = numpy.array([numpy.array([ndimage.zoom(x_i.T[j], final_length / len(x_i), mode='reflect') for j in range(numpy.size(x_i, 1)) ]).T for x_i in x_test])
    return x_train, x_test

In [8]:
def shuffle_dataset(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28):
    """Shuffle the train/test data consistently."""
    # note: add random_state=0 for reproducibility
    x_train, y_train_14, y_train_28 = shuffle(x_train, y_train_14, y_train_28)
    x_test,  y_test_14,  y_test_28  = shuffle(x_test,  y_test_14,  y_test_28)
    return x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28

In [9]:
def preprocess_data(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28):
    """
    Preprocess the data as you want: update as you want!
        - possible improvement idea: make a PCA here
    """
    x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = shuffle_dataset(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28)
    x_train, x_test = resize_sequences_length(x_train, x_test, final_length=100)
    return x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28

In [10]:
def convert_to_pytorch_tensors(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28):
    # as numpy
    y_train_14, y_train_28, y_test_14, y_test_28 = numpy.array(y_train_14), numpy.array(y_train_28), numpy.array(y_test_14), numpy.array(y_test_28)
    
    # -- REQUIRED by the pytorch loss function implementation --
    # Remove 1 to all classes items (1-14 => 0-13 and 1-28 => 0-27)
    y_train_14, y_train_28, y_test_14, y_test_28 = y_train_14 - 1, y_train_28 - 1, y_test_14 - 1, y_test_28 - 1
    
    # as torch
    x_train, x_test = torch.from_numpy(x_train), torch.from_numpy(x_test)
    y_train_14, y_train_28, y_test_14, y_test_28 = torch.from_numpy(y_train_14), torch.from_numpy(y_train_28), torch.from_numpy(y_test_14), torch.from_numpy(y_test_28)

    # -- REQUIRED by the pytorch loss function implementation --
    # correct the data type (for the loss function used)
    x_train, x_test = x_train.type(torch.FloatTensor), x_test.type(torch.FloatTensor)
    y_train_14, y_train_28, y_test_14, y_test_28 = y_train_14.type(torch.LongTensor), y_train_28.type(torch.LongTensor), y_test_14.type(torch.LongTensor), y_test_28.type(torch.LongTensor)
    
    return x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28

In [11]:
def batch(tensor, batch_size=32):
    """Return a list of (mini) batches"""
    tensor_list = []
    length = tensor.shape[0]
    i = 0
    while True:
        if (i + 1) * batch_size >= length:
            tensor_list.append(tensor[i * batch_size: length])
            return tensor_list
        tensor_list.append(tensor[i * batch_size: (i + 1) * batch_size])
        i += 1


def time_since(since):
    now = time.time()
    s = now - since
    m = math.floor(s / 60)
    s -= m * 60
    return '{:02d}m {:02d}s'.format(int(m), int(s))


def get_accuracy(model, x, y_ref):
    """Get the accuracy of the pytorch model on a batch"""
    acc = 0.
    model.eval()
    with torch.no_grad():
        predicted = model(x)
        _, predicted = predicted.max(dim=1)
        acc = 1.0 * (predicted == y_ref).sum().item() / y_ref.shape[0]

    return acc

In [12]:
class HandGestureNet(torch.nn.Module):
    """
    [Devineau et al., 2018] Deep Learning for Hand Gesture Recognition on Skeletal Data

    Summary
    -------
        Deep Learning Model for Hand Gesture classification using pose data only (no need for RGBD)
        The model computes a succession of [convolutions and pooling] over time independently on each of the 66 (= 22 * 3) sequence channels.
        Each of these computations are actually done at two different resolutions, that are later merged by concatenation
        with the (pooled) original sequence channel.
        Finally, a multi-layer perceptron merges all of the processed channels and outputs a classification.
    
    TL;DR:
    ------
        input ------------------------------------------------> split into n_channels channels [channel_i]
            channel_i ----------------------------------------> 3x [conv/pool/dropout] low_resolution_i
            channel_i ----------------------------------------> 3x [conv/pool/dropout] high_resolution_i
            channel_i ----------------------------------------> pooled_i
            low_resolution_i, high_resolution_i, pooled_i ----> output_channel_i
        MLP(n_channels x [output_channel_i]) -------------------------> classification

    Article / PDF:
    --------------
        https://ieeexplore.ieee.org/document/8373818

    Please cite:
    ------------
        @inproceedings{devineau2018deep,
            title={Deep learning for hand gesture recognition on skeletal data},
            author={Devineau, Guillaume and Moutarde, Fabien and Xi, Wang and Yang, Jie},
            booktitle={2018 13th IEEE International Conference on Automatic Face \& Gesture Recognition (FG 2018)},
            pages={106--113},
            year={2018},
            organization={IEEE}
        }
    """
    
    def __init__(self, n_channels=66, n_classes=14, dropout_probability=0.2):

        super(HandGestureNet, self).__init__()
        
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.dropout_probability = dropout_probability

        # Layers ----------------------------------------------
        self.all_conv_high = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        ) for joint in range(n_channels)])

        self.all_conv_low = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        ) for joint in range(n_channels)])

        self.all_residual = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2)
        ) for joint in range(n_channels)])

        self.fc = torch.nn.Sequential(
            torch.nn.Linear(in_features=9 * n_channels * 12, out_features=1936),  # <-- 12: depends of the sequences lengths (cf. below)
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=1936, out_features=n_classes)
        )

        # Initialization --------------------------------------
        # Xavier init
        for module in itertools.chain(self.all_conv_high, self.all_conv_low, self.all_residual):
            for layer in module:
                if layer.__class__.__name__ == "Conv1d":
                    torch.nn.init.xavier_uniform_(layer.weight, gain=torch.nn.init.calculate_gain('relu'))
                    torch.nn.init.constant_(layer.bias, 0.1)

        for layer in self.fc:
            if layer.__class__.__name__ == "Linear":
                torch.nn.init.xavier_uniform_(layer.weight, gain=torch.nn.init.calculate_gain('relu'))
                torch.nn.init.constant_(layer.bias, 0.1)

    def forward(self, input):
        """
        This function performs the actual computations of the network for a forward pass.

        Arguments
        ---------
            input: a tensor of gestures of shape (batch_size, duration, n_channels)
                   (where n_channels = 3 * n_joints for 3D pose data)
        """

        # Work on each channel separately
        all_features = []

        for channel in range(0, self.n_channels):
            input_channel = input[:, :, channel]

            # Add a dummy (spatial) dimension for the time convolutions
            # Conv1D format : (batch_size, n_feature_maps, duration)
            input_channel = input_channel.unsqueeze(1)

            high = self.all_conv_high[channel](input_channel)
            low = self.all_conv_low[channel](input_channel)
            ap_residual = self.all_residual[channel](input_channel)

            # Time convolutions are concatenated along the feature maps axis
            output_channel = torch.cat([
                high,
                low,
                ap_residual
            ], dim=1)
            all_features.append(output_channel)

        # Concatenate along the feature maps axis
        all_features = torch.cat(all_features, dim=1)
        
        # Flatten for the Linear layers
        all_features = all_features.view(-1, 9 * self.n_channels * 12)  # <-- 12: depends of the initial sequence length (100).
        # If you have shorter/longer sequences, you probably do NOT even need to modify the modify the network architecture:
        # resampling your input gesture from T timesteps to 100 timesteps will (surprisingly) probably actually work as well!

        # Fully-Connected Layers
        output = self.fc(all_features)

        return output

In [15]:
# -------------
# Data
# -------------

# Load the dataset
x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = load_data()

# Shuffle sequences and resize sequences
x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = preprocess_data(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28)

# Convert to pytorch variables
x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = convert_to_pytorch_tensors(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28)

In [16]:
# -------------
# Network instantiation
# -------------
model = HandGestureNet(n_channels=66, n_classes=14)

In [21]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-3)

In [26]:
# -------------
# Training
# -------------


def train(model, criterion, optimizer,
          x_train, y_train, x_test, y_test,
          force_cpu=False, num_epochs=5):
    
    # use a GPU (for speed) if you have one
    device = torch.device("cuda") if torch.cuda.is_available() and not force_cpu else torch.device("cpu")
    model = model.to(device)
    x_train, y_train, x_test, y_test = x_train.to(device), y_train.to(device), x_test.to(device), y_test.to(device)
    
    # (bonus) log accuracy values to visualize them in tensorboard:
    writer = SummaryWriter()
    
    # Prepare all mini-batches
    x_train_batches = batch(x_train)
    y_train_batches = batch(y_train)
    
    # Training starting time
    start = time.time()

    print('[INFO] Started to train the model.')
    print('Training the model on {}.'.format('GPU' if device == torch.device('cuda') else 'CPU'))
    
    for ep in range(num_epochs):

        # Ensure we're still in training mode
        model.train()

        current_loss = 0.0

        for idx_batch, train_batches in enumerate(zip(x_train_batches, y_train_batches)):

            # get a mini-batch of sequences
            x_train_batch, y_train_batch = train_batches

            # zero the gradient parameters
            optimizer.zero_grad()

            # forward
            outputs = model(x_train_batch)

            # backward + optimize
            # backward
            loss = criterion(outputs, y_train_batch)
            loss.backward()
            # optimize
            optimizer.step()
            # for an easy access
            current_loss += loss.item()
        
        train_acc = get_accuracy(model, x_train, y_train)
        test_acc = get_accuracy(model, x_test, y_test)
        
        writer.add_scalar('data/accuracy_train', train_acc, ep)
        writer.add_scalar('data/accuracy_test', test_acc, ep)
        print('Epoch #{:03d} | Time elapsed : {} | Loss : {:.4e} | Accuracy_train : {:.4e} | Accuracy_test : {:.4e}'.format(
                ep + 1, time_since(start), current_loss, train_acc, test_acc))

    print('[INFO] Finished training the model. Total time : {}.'.format(time_since(start)))

In [27]:
num_epochs = 20

train(model=model, criterion=criterion, optimizer=optimizer,
      x_train=x_train, y_train=y_train_14, x_test=x_test, y_test=y_test_14,
      num_epochs=num_epochs)

[INFO] Started to train the model.
Training the model on GPU.
Epoch #001 | Time elapsed : 00m 16s | Loss : 8.7516e+04 | Accuracy_train : 9.5765e-01 | Accuracy_test : 8.1481e-01
Epoch #002 | Time elapsed : 00m 33s | Loss : 8.5650e+04 | Accuracy_train : 9.5102e-01 | Accuracy_test : 7.9570e-01


KeyboardInterrupt: 

In [23]:
torch.save(model.state_dict(), './model_20_epochs.pth')

In [28]:
import cv2
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands

In [None]:
cap = cv2.VideoCapture(0)
with mp_hands.Hands(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as hands:
  while cap.isOpened():
    success, image = cap.read()
    if not success:
      print("Ignoring empty camera frame.")
      # If loading a video, use 'break' instead of 'continue'.
      continue

    # Flip the image horizontally for a later selfie-view display, and convert
    # the BGR image to RGB.
    image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
    # To improve performance, optionally mark the image as not writeable to
    # pass by reference.
    image.flags.writeable = False
    results = hands.process(image)

    # Draw the hand annotations on the image.
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    if results.multi_hand_landmarks:
      for hand_landmarks in results.multi_hand_landmarks:
        mp_drawing.draw_landmarks(
            image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
    cv2.imshow('MediaPipe Hands', image)
    if cv2.waitKey(5) & 0xFF == ord('q'):
      break
cap.release()

In [30]:
!

Collecting facenet_pytorch
  Downloading facenet_pytorch-2.5.2-py3-none-any.whl (1.9 MB)
Installing collected packages: facenet-pytorch
Successfully installed facenet-pytorch-2.5.2


In [49]:
import numpy as np
from facenet_pytorch import MTCNN
from PIL import Image
import mediapipe as mp
# mp_drawing = mp.solutions.drawing_utils
# mp_hands = mp.solutions.hands
 

# Класс детектирования и обработки лица с веб-камеры 
class FaceDetector(object):

    def __init__(self, mtcnn, mp, resnet,channels=1):
        # Создаем объект для считывания потока с веб-камеры(обычно вебкамера идет под номером 0. иногда 1)
        self.cap = cv2.VideoCapture(0) 
        self.mtcnn = mtcnn
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.emodel = resnet
        self.channels = channels
        self.mp = mp

    # Функция рисования найденных параметров на кадре
    def _draw(self, frame, boxes, probs, landmarks):
        try:
            for box, prob, ld in zip(boxes, probs, landmarks):
                # Рисуем обрамляющий прямоугольник лица на кадре
                cv2.rectangle(frame,
                              (int(box[0]), int(box[1])),
                              (int(box[2]), int(box[3])),
                              (0, 0, 255),
                              thickness=2)

                # пишем на кадре какая эмоция распознана
#                 cv2.putText(frame, 
#                      (int(box[2]), int(box[3])), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

                # Рисуем особенные точки
                cv2.circle(frame, (int(ld[0][0]),int(ld[0][1])), 5, (0, 0, 255), -1)
                cv2.circle(frame, (int(ld[1][0]),int(ld[1][1])), 5, (0, 0, 255), -1)
                cv2.circle(frame, (int(ld[2][0]),int(ld[2][1])), 5, (0, 0, 255), -1)
                cv2.circle(frame, (int(ld[3][0]),int(ld[3][1])), 5, (0, 0, 255), -1)
                cv2.circle(frame, (int(ld[4][0]),int(ld[4][1])), 5, (0, 0, 255), -1)
        except Exception as e:
            print('Something wrong im draw function!')
            print(f'error : {e}')

        return frame
    
    # Функция для вырезания лиц с кадра
    @staticmethod
    def crop_faces(frame, boxes):
        faces = []
        for i, box in enumerate(boxes):
            faces.append(frame[int(box[1]-40):int(box[3]+40), 
                int(box[0]-40):int(box[2]+40)])
        return faces
    
    @staticmethod
    def digit_to_classname(digit):
        if digit == 0:
            return 'sad'
        elif digit == 1:
            return 'disgust'
        elif digit == 2:
            return 'happy'
        elif digit == 3:
            return 'surprise'
        elif digit == 4:
            return 'neutral'
        elif digit == 5:
            return 'fear'
        elif digit == 6:
            return 'angry'
       
    # Функция в которой будет происходить процесс считывания и обработки каждого кадра
    def run(self):
        mp_drawing = self.mp.solutions.drawing_utils
        mp_hands = self.mp.solutions.hands
        with mp_hands.Hands(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as hands:
        # Заходим в бесконечный цикл
            while True:
                # Считываем каждый новый кадр - frame
                # ret - логическая переменая. Смысл - считали ли мы кадр с потока или нет
                ret, frame = self.cap.read()
                try:
                    # детектируем расположение лица на кадре, вероятности на сколько это лицо
                    # и особенные точки лица
                    boxes, probs, landmarks = self.mtcnn.detect(frame, landmarks=True)


    #                 # Вырезаем лицо из кадра
    #                 face = self.crop_faces(frame, boxes)[0]
    #                 # Меняем размер изображения лица для входа в нейронную сеть
    #                 face_img = cv2.resize(face,(48,48))
    #                 face = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
    #                 # Превращаем в 1-канальное серое изображение
    #                 face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)

    #                 # Далее мы подготавливаем наш кадр для считывания нс
    #                 # Для этого перегоним его в формат pil_image
    #                 face = Image.fromarray(face)
    #                 #face = face.resize((48,48))
    #                 face = np.asarray(face).astype('float')
    #                 face = torch.as_tensor(face)


    #                 # Превращаем numpy-картинку вырезанного лица в pytorch-тензор
    #                 torch_face = face.unsqueeze(0).to(self.device).float()
    #                 # Загужаем наш тензор лица в нейронную сеть и получаем предсказание
    #                 emotion = self.emodel(torch_face[None, ...])
    #                 # Интерпретируем предсказание как строку нашей эмоции
    #                 emotion = self.digit_to_classname(emotion[0].argmax().item())

                    # Рисуем на кадре
                    self._draw(frame, boxes, probs, landmarks)

                    frame = cv2.cvtColor(cv2.flip(frame, 1), cv2.COLOR_BGR2RGB)
                    # To improve performance, optionally mark the image as not writeable to
                    # pass by reference.
                    frame.flags.writeable = False
                    results = hands.process(frame)

                    # Draw the hand annotations on the image.
                    frame.flags.writeable = True
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                    if results.multi_hand_landmarks:
                      for hand_landmarks in results.multi_hand_landmarks:
                        mp_drawing.draw_landmarks(
                            frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

#                     cv2.imshow("Gray face", face_img)

                except Exception as e:
                    print('Something wrong im main cycle!')
                    print(f'error : {e}')

                # Показываем кадр в окне, и назвываем его(окно) - 'Face Detection'
                cv2.imshow('Face Detection', frame)


                # Функция, которая проверяет нажатие на клавишу 'q'
                # Если нажатие произошло - выход из цикла. Конец работы приложения
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            # Очищаем все объекты opencv, что мы создали
        self.cap.release()
        cv2.destroyAllWindows()

In [50]:
mtcnn = MTCNN()
device = torch.device("cuda")
ourResNet = HandGestureNet(66, 14).to(device)
ourResNet.load_state_dict(torch.load('model_20_epochs.pth'))

# ourResNet = FERModel(1, 7).to(device)
# ourResNet.load_state_dict(torch.load('./models/model2_50_epochs.pth'))


ourResNet.eval()
# Создаем объект нашего класса приложения
fcd = FaceDetector(mtcnn, mp, HandGestureNet)
# Запускаем
fcd.run()

Something wrong im draw function!
error : 'NoneType' object is not iterable
Something wrong im draw function!
error : 'NoneType' object is not iterable
Something wrong im draw function!
error : 'NoneType' object is not iterable
