# Explore skeletons

https://www.section.io/engineering-education/handpose-detection-using-mediapipe-and-python/

In [None]:
!pip install tensorflow_docs
!pip install mediapipe opencv-python

Collecting tensorflow_docs
  Downloading tensorflow_docs-2023.5.24.56664-py3-none-any.whl (183 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/183.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━[0m [32m92.2/183.6 kB[0m [31m2.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m183.6/183.6 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Collecting astor (from tensorflow_docs)
  Downloading astor-0.8.1-py2.py3-none-any.whl (27 kB)
Installing collected packages: astor, tensorflow_docs
Successfully installed astor-0.8.1 tensorflow_docs-2023.5.24.56664
Collecting mediapipe
  Downloading mediapipe-0.10.8-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (34.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.5/34.5 MB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading soundd

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import mediapipe as mp
from google.colab.patches import cv2_imshow
import uuid
import cv2
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm

In [None]:
mp_drawing = mp.solutions.drawing_utils # drawing utility to help us draw all the landmarks on our hands
mp_holistic = mp.solutions.holistic # Holistic model

In [None]:
IMG_SIZE = 224

def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

def mediapipe_detection(image, mp_model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writable
    results = mp_model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR CONVERSION RGB 2 BGR
    return image, results


def load_video(path, begin, end, max_frames=0):
    cap = cv2.VideoCapture(path)
    results = []
    frames = []
    hand_landmarks = []

    frame_index=begin+1
    try:
        while True and frame_index <= end:
            with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
                ret, frame = cap.read()
                if not ret:
                    break
                frame = crop_center_square(frame)
                frame = cv2.resize(frame, (IMG_SIZE,IMG_SIZE))
                frame, result = mediapipe_detection(frame, holistic)
                hand_landmark = [[data_point.x, data_point.y] for data_point in result.left_hand_landmarks.landmark] if result.left_hand_landmarks is not None else [[0.0,0.0] for i in range(21)]
                right_hand_landmarks = [[data_point.x, data_point.y] for data_point in result.right_hand_landmarks.landmark] if result.right_hand_landmarks is not None else [[0.0,0.0] for i in range(21)]
                hand_landmark.extend(right_hand_landmarks)

                #self.draw_styled_landmarks(frame, result)

                results.append(result)
                frames.append(frame)
                hand_landmarks.append(torch.tensor(hand_landmark).view(1, -1))
            frame_index+=1

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return results, hand_landmarks

In [None]:
results, hand_landmarks = load_video(path='/content/drive/MyDrive/slovo/animals/0cc94c7b-2f1b-498c-96af-33c195b07083.mp4', begin=19, end=98, max_frames=0)

In [None]:
results[0]

mediapipe.python.solution_base.SolutionOutputs

In [None]:
# https://github.com/google/mediapipe/blob/master/docs/solutions/holistic.md

#results[10].left_hand_landmarks #21 landmark
#results[10].right_hand_landmarks
#results[10].pose_landmarks
#results[10].face_landmarks

# left_hand_landmarks = [[(data_point.x, data_point.y) for data_point in results[i].left_hand_landmarks.landmark] if results[i].left_hand_landmarks is not None
#                   else [(0,0) for i in range(21)] for i in range(len(results) )
# ]

In [None]:
len(hand_landmarks), len(hand_landmarks[0]),len(hand_landmarks[0][0])

(79, 1, 84)

In [None]:
torch.stack(hand_landmarks)

tensor([[[0.6688, 0.9022, 0.6400,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.6653, 0.7932, 0.6179,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.6615, 0.7333, 0.6375,  ..., 0.0000, 0.0000, 0.0000]],

        ...,

        [[0.7499, 0.2916, 0.7005,  ..., 0.7028, 0.8638, 0.7001]],

        [[0.7613, 0.2913, 0.7141,  ..., 0.7059, 0.8737, 0.7052]],

        [[0.7641, 0.2911, 0.7150,  ..., 0.7080, 0.8765, 0.7079]]])

In [None]:
class BasicVideoDataset(Dataset):
    def __init__(self, labels_list, labels_df, video_dir, IMG_SIZE):
        self.video_labels = labels_df
        self.video_dir = video_dir
        self.IMG_SIZE = IMG_SIZE
        self.labels_list = labels_list


    def __len__(self):
        return len(self.video_labels)

    def crop_center_square(self, frame):
        y, x = frame.shape[0:2]
        min_dim = min(y, x)
        start_x = (x // 2) - (min_dim // 2)
        start_y = (y // 2) - (min_dim // 2)
        return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

    def mediapipe_detection(self, image, mp_model):
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
        image.flags.writeable = False                  # Image is no longer writable
        results = mp_model.process(image)                 # Make prediction
        image.flags.writeable = True                   # Image is now writable
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR CONVERSION RGB 2 BGR
        return image, results

    def load_video(self, path, begin, end, max_frames=0):
        cap = cv2.VideoCapture(path)
        results = []
        frames = []
        hand_landmarks = []

        frame_index=begin+1
        try:
            while True and frame_index <= end:
                with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
                    ret, frame = cap.read()
                    if not ret:
                        break
                    frame = self.crop_center_square(frame)
                    frame = cv2.resize(frame, (self.IMG_SIZE , self.IMG_SIZE))
                    frame, result = self.mediapipe_detection(frame, holistic)
                    hand_landmark = [[data_point.x, data_point.y] for data_point in result.left_hand_landmarks.landmark] if result.left_hand_landmarks is not None else [[0,0] for i in range(21)]
                    right_hand_landmarks = [[data_point.x, data_point.y] for data_point in result.right_hand_landmarks.landmark] if result.right_hand_landmarks is not None else [[0,0] for i in range(21)]
                    hand_landmark.extend(right_hand_landmarks)

                    #self.draw_styled_landmarks(frame, result)

                    results.append(result)
                    frames.append(frame)
                    hand_landmarks.append(torch.tensor(hand_landmark).view(1, -1))
                frame_index+=1

                if len(frames) == max_frames:
                    break
        finally:
            cap.release()
        #return torch.from_numpy(np.array(frames))
        return torch.stack(hand_landmarks).to(torch.float32)

    def __getitem__(self, idx):
        filename  = os.path.join(self.video_dir, self.video_labels.iloc[idx]['attachment_id']+".mp4")
        label = self.video_labels.iloc[idx]['text']
        begin = self.video_labels.iloc[idx]['begin']
        end = self.video_labels.iloc[idx]['end']
        #frames = self.load_video(filename, begin, end, resize=(self.IMG_SIZE, self.IMG_SIZE)) # Загрузка видео!!!!
        #return frames, label
        hand_landmarks =  self.load_video(filename, begin, end)
        return hand_landmarks, torch.from_numpy(np.array([1 if l==label else 0 for l in labels_list]))


* The __init__ function is run once when instantiating the Dataset object. We initialize the directory containing the images, the annotations file, and both transforms (covered in more detail in the next section).
* The __len__ function returns the number of samples in our dataset.
* The __getitem__ function loads and returns a sample from the dataset at the given index idx.

In [None]:
annotations_file = "/content/drive/MyDrive/slovo/SLOVO_DF_SHORT.tsv" #"/home/jupyter/mnt/s3/rsl-videos/slovo/slovo_annotations/SLOVO_DATAFRAME.tsv"
video_dir = "/content/drive/MyDrive/slovo/animals" #"/home/jupyter/mnt/s3/rsl-videos/slovo/slovo"
IMG_SIZE = 224
BATCH_SIZE = 1

In [None]:
video_labels = pd.read_csv(annotations_file, sep='\t')
video_labels['group_rank'] = video_labels.groupby(['text']).cumcount()+1;
video_labels['dataset'] = np.where(video_labels['group_rank']<16,'train', np.where(video_labels['group_rank']<19,'val', 'test'))
video_labels.head(5)

Unnamed: 0.1,Unnamed: 0,attachment_id,text,begin,end,group_rank,dataset
0,0,8f4d3be1-3a09-4d76-94ef-f8b1dbfa686b,пингвин,29,100,1,train
1,1,4f9e3cb5-b9de-48bc-a51d-875b8fea8e10,пингвин,21,79,2,train
2,2,1de7b5b0-ce08-419f-aeed-e7e480da953d,пингвин,7,59,3,train
3,3,72f70640-6931-4f57-8c72-a68e48032cfb,пингвин,22,87,4,train
4,4,6933a0f1-a0e1-48d8-91be-b445ca6c80ce,пингвин,9,64,5,train


In [None]:
labels_list = list(video_labels['text'].unique())
num_classes = len(labels_list)
labels_list[:5]

['пингвин', 'жираф', 'лягушка', 'бегемот', 'козел']

In [None]:
training_data = BasicVideoDataset(labels_list=labels_list, video_dir=video_dir, IMG_SIZE=IMG_SIZE, labels_df=video_labels[video_labels['dataset']=='train'])
val_data = BasicVideoDataset(labels_list=labels_list, video_dir=video_dir, IMG_SIZE=IMG_SIZE, labels_df=video_labels[video_labels['dataset']=='val'])
test_data = BasicVideoDataset(labels_list=labels_list, video_dir=video_dir, IMG_SIZE=IMG_SIZE, labels_df=video_labels[video_labels['dataset']=='test'])

In [None]:
train_dataloader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
train_frames, train_labels = next(iter(train_dataloader))

In [None]:
train_frames.shape[1]

84

In [None]:
train_labels.shape

torch.Size([1, 30])

In [None]:
class LSTMTagger(torch.nn.Module):

    def __init__(self, embedding_dim, hidden_dim, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim
        self.lstm = torch.nn.LSTM(embedding_dim, hidden_dim)
        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = torch.nn.Linear(hidden_dim, tagset_size)

    def forward(self, landmarks):
        lstm_out,(hidden_state,cell_state) = self.lstm(landmarks.view(landmarks.shape[1], 1, -1))
        tag_space = self.hidden2tag(hidden_state[-1,:,:])
        tag_scores = torch.nn.functional.log_softmax(tag_space, dim=1)
        return tag_scores

In [None]:
embedding_dim = 84
hidden_dim = 10
tagset_size = 30

model = LSTMTagger(embedding_dim, hidden_dim, tagset_size)

In [None]:
model(train_frames)[-1].shape

torch.Size([30])

In [None]:
train_labels[-1].shape

torch.Size([30])

In [None]:
def check_some_predictions(n):
  for i in range(n):
    test_dl = iter(train_dataloader)
    with torch.no_grad():
      frames, label = next(test_dl)
      # frames=frames.to(device)
      # label=label.to(device)
      true_label = labels_list[(label[-1] == max(label[-1])).nonzero(as_tuple=False)[0][0].item()]
      label_scores = model(frames)
      pred_label = labels_list[(label_scores[-1] == max(label_scores[-1])).nonzero(as_tuple=False)[0][0].item()]
      print(f"Label: {true_label}; Predicted: {pred_label}")
check_some_predictions(5)

Label: бегемот; Predicted: кролик
Label: козел; Predicted: кролик
Label: собака; Predicted: кролик
Label: кролик; Predicted: кролик
Label: лев; Predicted: кролик


In [None]:
device = "cpu"

In [None]:
previous_epochs = 0
epoches = 15
losses=[]
test_losses=[]
save = True

loss_function = torch.nn.NLLLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

for epoch in range(previous_epochs+1, epoches+1, 1):
    print(f"epoch: {epoch}", end=":")
    total_loss = []
    train_dl = iter(train_dataloader)
    pbar = tqdm(train_dataloader, desc=f'Train Epoch{epoch}/{epoches}')
    # train
    for frames, label in pbar:
        if frames.shape==torch.Size([1, 0]):
            pass
        else:
            frames=frames.to(device)
            label=label.to(device)
            model.zero_grad()
            label_scores = model(frames)
            loss = loss_function(label_scores, torch.argmax(label, 1))
            loss.backward()
            optimizer.step()
            total_loss.append(loss.item())
            pbar.set_description(f'Train Epoch:{epoch}/{epoches} train_loss:{round(np.mean(total_loss), 4)}')
    losses.append(np.mean(total_loss))
    #test
    total_loss = []
    test_loss = 0
    correct = 0
    total = 0
    pbar = tqdm(test_dataloader, desc=f'Test Epoch{epoch}/{epoches}', mininterval=0.3)
    for frames, label in pbar:
      if frames.shape==torch.Size([1, 0]):
            pass
      else:
            frames=frames.to(device)
            label=label.to(device)
            with torch.no_grad():
              output = model(frames)
            total_loss.append(loss_function(output, torch.argmax(label, 1)).item())
            test_loss += loss_function(output, torch.argmax(label, 1)).item()  # sum up batch loss
            pred = torch.argmax(output, 1)
            correct += (pred == torch.argmax(label, 1)).sum().float()
            total += len(label)
            predict_acc = correct / total
            pbar.set_description(f'Test Epoch:{epoch}/{epoches} acc:{predict_acc:.3f}')
    test_losses.append(np.mean(total_loss))
    if save and predict_acc > best_acc:
      best_acc = predict_acc
      torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': round(np.mean(total_loss), 2)
            },
                "/content/drive/MyDrive/slovo/weights" + f'/SkeletonLSTM-Epoch-{epoch}-Test_acc-{best_acc:.3f}.pth')
    check_some_predictions(5)

epoch: 1:

Train Epoch:1/15 train_loss:3.4201:  36%|███▌      | 160/450 [1:11:21<2:09:19, 26.76s/it]


KeyboardInterrupt: ignored

In [None]:


# for epoch in range(10):
#     print(f"epoch: {epoch}", end=":")
#     total_loss=0
#     for train_frames, train_labels in tqdm(train_dataloader):
#         model.zero_grad()
#         tag_scores = model(train_frames)[-1]

#         # Step 4. Compute the loss, gradients, and update the parameters by
#         #  calling optimizer.step()
#         loss = loss_function(tag_scores, train_labels[-1])
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()
#     print(f"total_loss: {total_loss}")
#     losses.append(total_loss)
#     check_some_predictions(5)


epoch: 0:

100%|██████████| 450/450 [3:18:43<00:00, 26.50s/it]


total_loss: 93.19428644003347
Label: бабочка; Predicted: пингвин
Label: орел; Predicted: пингвин
Label: мышь; Predicted: пингвин
Label: лебедь; Predicted: пингвин
Label: собака; Predicted: пингвин
epoch: 1:

 28%|██▊       | 128/450 [56:51<2:23:02, 26.65s/it]


KeyboardInterrupt: ignored