In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install mediapipe

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
import mediapipe as mp

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision import transforms
from torchvision import models

import os
from os.path import isfile, join
import numpy as np
import pandas as pd
from tqdm import tqdm

import pickle

import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import cv2
from google.colab.patches import cv2_imshow

import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [4]:
path_correct = '/content/drive/MyDrive/ml/correct100.mp4'
path_back = '/content/drive/MyDrive/ml/curved_back100_2.mp4'
path_dont_sit_down = '/content/drive/MyDrive/ml/dont_sit_down100.mp4'
path_knees_fell = '/content/drive/MyDrive/ml/knees_fell.mp4'
path_uneven = '/content/drive/MyDrive/ml/uneven100.mp4'

In [5]:
paths_list = [path_correct, path_back, path_dont_sit_down, path_knees_fell, path_uneven]

In [6]:
key_points = [0, 11, 12, 23, 24, 25, 26, 27, 28]

In [229]:
def get_points(path, key_points):
    vidcap = cv2.VideoCapture(path)
    points_list = []
    for _ in tqdm(range(int(cv2.VideoCapture(path).get(cv2.CAP_PROP_FRAME_COUNT)))):
    # for _ in tqdm(range(10)):
        _, image = vidcap.read()
        try:
            with mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.3, model_complexity=1) as pose:
                results = pose.process(image)
            points = []
            for i, point in enumerate(results.pose_world_landmarks.landmark):
                if i in key_points:
                    points.append([point.x, point.y, point.z])
            points_list.append(np.array(points).reshape(-1))
        except:
            points_list.append(np.zeros(len(key_points) * 3))
    return np.array(points_list)

In [230]:
for path in paths_list:
    name = path.split('/')[-1][:-4]
    points_list = get_points(path, key_points)
    with open('/content/drive/MyDrive/ml/' + name + '.p', 'wb') as f:
        pickle.dump(points_list, f)

100%|██████████| 7076/7076 [25:56<00:00,  4.55it/s]
100%|██████████| 6931/6931 [24:57<00:00,  4.63it/s]
100%|██████████| 5898/5898 [21:15<00:00,  4.62it/s]
100%|██████████| 6962/6962 [24:32<00:00,  4.73it/s]
100%|██████████| 7292/7292 [25:57<00:00,  4.68it/s]


### load model 4 squats

In [7]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [8]:
class SimleNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SimleNetwork,self).__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim * 2)
        self.linear2 = nn.Linear(hidden_dim * 2, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, hidden_dim)
        self.out = nn.Linear(hidden_dim, output_dim)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.1)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.out(x)
        return x

In [9]:
input_dim = 27
hidden_dim = 200
output_dim = 1

In [10]:
model = SimleNetwork(input_dim, hidden_dim, output_dim)

In [11]:
model.load_state_dict(torch.load('/content/drive/MyDrive/ml/model_mov_v0.pt'));

### Get predicts

In [236]:
path = '/content/drive/MyDrive/ml/VID_20220822_162003_146.mp4'

vidcap = cv2.VideoCapture(path)
predicts = []
for _ in tqdm(range(int(cv2.VideoCapture(path).get(cv2.CAP_PROP_FRAME_COUNT)))):
    _, image = vidcap.read()
    try:
        with mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.3, model_complexity=1) as pose:
            results = pose.process(image)
        points = []
        for i, point in enumerate(results.pose_world_landmarks.landmark):
            if i in key_points:
                points.append([point.x, point.y, point.z])
        points = np.array(points).reshape(-1)
        with torch.no_grad():
            predict = torch.sigmoid(model(torch.FloatTensor(points))).cpu().detach().numpy()[0]
        predicts.append(predict)
    except:
        predicts.append(-1)

100%|██████████| 1683/1683 [05:56<00:00,  4.72it/s]


In [12]:
class Cleaner:


    def __init__(self, wide=10, threshold=15):
        self.wide = wide
        self.threshold = threshold
    

    def clean(self, predicts):
        predicts = self._reduse_unrecognized(predicts)
        predicts = self._sliding_window(predicts)
        predicts = self._hysteresis(predicts)
        predicts = self._mount_filter(predicts)
        return predicts


    def _reduse_unrecognized(self, predicts):
        predicts_rep = []
        for predict in predicts:
            if predict == -1:
                try:
                    predicts_rep.append(predicts_rep[-1])
                except:
                    predicts_rep.append(predict)
            else:
                predicts_rep.append(predict)
        return predicts_rep


    def _sliding_window(self, predicts):
        smoothing = []
        for i in range(len(predicts) - self.wide):
            smoothing.append(sum(predicts[i:i + self.wide]) / self.wide)
        return [smoothing[0]] * self.wide + smoothing


    def _hysteresis(self, predicts):
        hyst = []
        if predicts[0] < 0.5:
            hyst.append(0)
        else:
            hyst.append(1)
        for i in range(1, len(predicts)):
            if hyst[-1] == 0:
                if predicts[i] > 0.9:
                    hyst.append(1)
                else:
                    hyst.append(0)
            else:
                if predicts[i] < 0.1:
                    hyst.append(0)
                else:
                    hyst.append(1)
        return hyst


    def _mount_filter(self, predicts):
        accumulator = []
        result = []

        position = predicts[0]

        for predict in predicts:
            if predict != position:
                if len(accumulator) < self.threshold:
                    result += [predict] * len(accumulator)
                else:
                    result += accumulator
                    position = predict
                accumulator = [predict]
            else:
                accumulator.append(predict)
        if accumulator:
            result += accumulator
        return result

In [129]:
def get_pose_predict(cap, model):
    _, image = cap.read()
    try:
        with mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.5, model_complexity=0) as pose:
            results = pose.process(image)
        points = []
        for i, point in enumerate(results.pose_world_landmarks.landmark):
            if i in key_points:
                points.append([point.x, point.y, point.z])
        points = np.array(points).reshape(-1)
        with torch.no_grad():
            predict = torch.sigmoid(model(torch.FloatTensor(points))).cpu().detach().numpy()[0]
    except:
        predict = -1
        points = np.zeros(27)
    return predict, points, image

In [14]:
def start_detector(predicts, start_delay=30):
    cnt = 0
    for predict in predicts:
        if cnt >= start_delay:
            return 1
        if predict:
            cnt = 0
        else:
            cnt += 1     
    return 0

In [15]:
def farme_detector(predicts):
    for i in range(2, len(predicts)):
        if (predicts[i - 2] == 1) and (predicts[i - 1] == 0):
            return 1
    return 0

In [16]:
def frame_equalizer(predicts, points_list):
    l = sum(predicts)
    m = len(predicts) - l - 1
    if m > l:
       return points_list[len(points_list) - l * 2 - 1:]
    return points_list

In [17]:
def gap_filler(sequence):
    result = []
    # find first nonzero frame
    for i, frame in enumerate(sequence):
        if (frame != np.zeros(27)).all():
            result = [frame] * (i + 1)
            start = len(result)
            break
    for frame in sequence[start:]:
        if (frame != np.zeros(27)).all():
            result.append(frame)
        else:
            result.append(result[-1])
    return result

In [18]:
def get_keyframes(sequence, N=16):
    if len(sequence) >= N:
        sequence = gap_filler(sequence)
        idx = [((len(sequence) * (i + 1)) // N) - 1 for i in range(N)]
        result = [frame for i, frame in enumerate(sequence) if i in idx]
    else:
        result = sequence + [sequence[-1]] * (N - len(sequence))
    return result

In [244]:
path = '/content/drive/MyDrive/ml/VID_20220822_162003_146.mp4'

cap = cv2.VideoCapture(path)
cleaner = Cleaner(wide=10, threshold=15)

frames_list = []
predicts = []
cnt = 0
cnt_list = []
start = 0
i_list = []
predicts_src = []
points_list = []
for i in tqdm(range(int(cv2.VideoCapture(path).get(cv2.CAP_PROP_FRAME_COUNT)))):
    predict, points = get_pose_predict(cap, model)
    predicts.append(predict)
    predicts_src.append(predict)
    points_list.append(points)
    if len(predicts) >= 30:
        # smoothing for all data
        predicts_clean = cleaner.clean(predicts_src)[-len(predicts):]
        if start:
            if farme_detector(predicts_clean):
                frame = frame_equalizer(predicts_clean, points_list)
                frames_list.append(get_keyframes(frame))
                predicts = []
                points_list = []
                cnt += 1
        else:
            if start_detector(predicts_clean, ):
                predicts = []
                points_list = []
                start = 1
    # video stream reconstruction
    cnt_list.append(cnt)

100%|██████████| 1683/1683 [05:03<00:00,  5.54it/s]


In [245]:
len(frames_list[0])

16

### generate data for movement classifier

In [19]:
path = '/content/drive/MyDrive/ml/correct100.p'

In [20]:
with open(path, 'rb') as f:
    data = pickle.load(f)

In [22]:
cleaner = Cleaner(wide=10, threshold=15)

In [23]:
def get_data(data, cleaner):
    frames_list = []
    predicts = []
    start = 0
    i_list = []
    predicts_src = []
    points_list = []

    for points in tqdm(data):
        with torch.no_grad():
            predict = torch.sigmoid(model(torch.FloatTensor(points))).cpu().detach().numpy()[0]
        predicts.append(predict)
        predicts_src.append(predict)
        points_list.append(points)
        if len(predicts) >= 30:
            # smoothing for all data
            predicts_clean = cleaner.clean(predicts_src)[-len(predicts):]
            if start:
                if farme_detector(predicts_clean):
                    frame = frame_equalizer(predicts_clean, points_list)
                    frames_list.append(get_keyframes(frame))
                    predicts = []
                    points_list = []
            else:
                if start_detector(predicts_clean, start_delay=30):
                    predicts = []
                    points_list = []
                    start = 1
    return frames_list

In [24]:
data_list = []
for path in paths_list:
    name = path.split('/')[-1][:-4]
    with open('/content/drive/MyDrive/ml/' + name + '.p', 'rb') as f:
        data = pickle.load(f)
    data_list.append(get_data(data, cleaner))

100%|██████████| 7076/7076 [01:49<00:00, 64.72it/s]
100%|██████████| 6931/6931 [01:43<00:00, 67.09it/s]
100%|██████████| 5898/5898 [02:14<00:00, 43.84it/s]
100%|██████████| 6962/6962 [01:39<00:00, 69.70it/s]
100%|██████████| 7292/7292 [01:57<00:00, 61.85it/s]


In [25]:
with open('/content/drive/MyDrive/ml/squat_dataset.p', 'wb') as f:
    pickle.dump(data_list, f)

In [26]:
with open('/content/drive/MyDrive/ml/squat_dataset.p', 'rb') as f:
    data_list = pickle.load(f)

data_list.pop(2);

### Create datset 4 multiclass classificaion

In [27]:
points_list = []
labels_list = []
for i, data in enumerate(data_list):
    for sequence in data:
        points_list.append(np.asarray(sequence))
        labels_list.append(i)

In [28]:
df = pd.DataFrame()
df['points'] = points_list
df['label'] = labels_list

In [29]:
train, test = train_test_split(df, test_size=0.2, random_state=42)

In [30]:
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)

In [31]:
class CustomDataset(Dataset):
    
    def __init__(self, points, labels):
        self.points = points
        self.labels = labels
        
    def __getitem__(self, index):
        return self.points[index].reshape(-1), torch.FloatTensor(self.labels[index])
        
    def __len__ (self):
        return len(self.points)

In [32]:
train_dataset = CustomDataset(torch.FloatTensor(list(train['points'])), torch.FloatTensor(list(train['label'])))

  """Entry point for launching an IPython kernel.


In [33]:
test_dataset = CustomDataset(torch.FloatTensor(list(test['points'])), torch.FloatTensor(list(test['label'])))

In [34]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=2, shuffle=True)

In [79]:
model = SimleNetwork(432, 300, 4)

In [80]:
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

model.apply(init_weights);

In [81]:
model.to(device);

In [82]:
optimizer = optim.AdamW(model.parameters(), lr=0.001)

In [83]:
loss_fn = nn.CrossEntropyLoss()

In [84]:
loss_fn.to(device)

CrossEntropyLoss()

In [85]:
def eval(model, test_dataset):
    predictions = []
    labels = []
    model.eval()
    with torch.no_grad():
        for X, label in test_dataset:
            prediction = model(X)
            prediction = prediction.max(0, keepdim=True)[1]
            prediction = prediction.reshape(1, -1).cpu().detach().numpy()[0]
            predictions.append(prediction)
            labels.append(label)
    return predictions, labels

In [86]:
import copy

best_score = 0

for epoch in range(20):
    model.train()
    for data in tqdm(train_loader):
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        try:
            # outputs = torch.sigmoid(model(inputs))
            outputs = model(inputs)
            loss = loss_fn(outputs, labels.long())
            loss.backward()
            optimizer.step()
        except:
            pass
        
    predictions, labels = eval(model, test_dataset)
    score = f1_score(labels, predictions, average='macro')
    if score > best_score:
        best_model = copy.deepcopy(model)
        best_score = score
        print(epoch, score)

100%|██████████| 163/163 [00:01<00:00, 114.95it/s]


0 0.6724129605485538


100%|██████████| 163/163 [00:01<00:00, 127.89it/s]


1 0.9462811107996139


100%|██████████| 163/163 [00:01<00:00, 128.70it/s]
100%|██████████| 163/163 [00:01<00:00, 127.60it/s]


3 0.9855252274607114


100%|██████████| 163/163 [00:01<00:00, 94.91it/s]
100%|██████████| 163/163 [00:01<00:00, 82.12it/s]
100%|██████████| 163/163 [00:01<00:00, 85.40it/s]
100%|██████████| 163/163 [00:01<00:00, 84.53it/s]


7 1.0


100%|██████████| 163/163 [00:02<00:00, 79.65it/s]
100%|██████████| 163/163 [00:02<00:00, 78.22it/s]
100%|██████████| 163/163 [00:02<00:00, 75.96it/s]
100%|██████████| 163/163 [00:02<00:00, 74.22it/s]
100%|██████████| 163/163 [00:02<00:00, 75.82it/s]
100%|██████████| 163/163 [00:02<00:00, 72.15it/s]
100%|██████████| 163/163 [00:02<00:00, 72.03it/s]
100%|██████████| 163/163 [00:02<00:00, 72.17it/s]
100%|██████████| 163/163 [00:02<00:00, 73.07it/s]
100%|██████████| 163/163 [00:02<00:00, 72.60it/s]
100%|██████████| 163/163 [00:02<00:00, 74.18it/s]
100%|██████████| 163/163 [00:02<00:00, 72.93it/s]


In [87]:
# Save NN
path = '/content/drive/MyDrive/ml/model_squat_classifier_v0.pt'
torch.save(best_model.state_dict(), path)

In [88]:
test_model = SimleNetwork(432, 300, 4)

In [90]:
test_model.load_state_dict(torch.load(path));

In [91]:
predictions, labels = eval(test_model, test_dataset)

In [92]:
f1_score(labels, predictions, average='macro')

1.0

### Test on video

In [106]:
model_frame = SimleNetwork(27, 200, 1)

In [107]:
model_frame.load_state_dict(torch.load('/content/drive/MyDrive/ml/model_mov_v0.pt'));

In [108]:
model_cls = SimleNetwork(432, 300, 4)

In [109]:
model_cls.load_state_dict(torch.load('/content/drive/MyDrive/ml/model_squat_classifier_v0.pt'));

In [None]:
def get_squat_kind(model, sequence):
    labels = []
    model.eval()
    with torch.no_grad():
            prediction = model(sequence)
            prediction = prediction.max(0, keepdim=True)[1]
            prediction = prediction.reshape(1, -1).cpu().detach().numpy()[0]
    return int(prediction)

In [140]:
def add_predict(image, text):
    font = cv2.FONT_HERSHEY_SIMPLEX
    bottomLeftCornerOfText = (10, 1200)
    fontScale = 2
    fontColor = (255,0,0)
    thickness = 2
    lineType = 2

    cv2.putText(
        image,
        text, 
        bottomLeftCornerOfText, 
        font, 
        fontScale,
        fontColor,
        thickness,
        lineType
        )

In [132]:
squad_dict = {
    0: 'ok',
    1: 'back',
    2: 'knees',
    3: 'uneven',
    -1: '...'
}

In [141]:
path = '/content/drive/MyDrive/ml/VID_20220831_154203_059.mp4'

cap = cv2.VideoCapture(path)
cleaner = Cleaner(wide=10, threshold=15)

predicts = []
cnt = 0
start = 0
predicts_src = []
points_list = []
kind = -1
images = []
for i in tqdm(range(int(cv2.VideoCapture(path).get(cv2.CAP_PROP_FRAME_COUNT)))):
    predict, points, image = get_pose_predict(cap, model_frame)
    predicts.append(predict)
    predicts_src.append(predict)
    points_list.append(points)
    if len(predicts) >= 30:
        # smoothing for all data
        predicts_clean = cleaner.clean(predicts_src)[-len(predicts):]
        if start:
            if farme_detector(predicts_clean):
                frame = frame_equalizer(predicts_clean, points_list)
                frame = get_keyframes(frame)
                kind = get_squat_kind(model_cls, torch.FloatTensor(np.asarray(frame)).reshape(-1))
                predicts = []
                points_list = []
                cnt += 1
        else:
            if start_detector(predicts_clean, start_delay=30):
                predicts = []
                points_list = []
                start = 1
    # video stream reconstruction
    # cnt, kind
    text = str(cnt) + ': ' + squad_dict[kind]
    add_predict(image, text)
    images.append(image)

100%|██████████| 1683/1683 [05:18<00:00,  5.28it/s]


In [142]:
width = images[0].shape[1]
height = images[0].shape[0]

In [143]:
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
video = cv2.VideoWriter('/content/drive/MyDrive/ml/ml_pipeline_test_v0.mp4',fourcc,30,(width,height))

In [144]:
for image in tqdm(images):
    video.write(image)

video.release()

100%|██████████| 1683/1683 [00:12<00:00, 134.07it/s]
