# Intialize

In [1]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm

from torchvision import models

import matplotlib.pyplot as plt
import numpy as np
import torch
import cv2
import os

In [2]:
device = torch.device('cpu')

# Data Load

In [3]:
from torch.utils.data import Dataset, DataLoader

In [4]:
DATA_PATH = './data/'
DATA_PATH_LIST = os.listdir(DATA_PATH)
DATA_PATH_LIST = [DATA_PATH + path for path in DATA_PATH_LIST]

print(f"Data path list: {DATA_PATH_LIST}")
image_paths = []
for path in tqdm(DATA_PATH_LIST):
    # Remove .DS_Store
    if path == './data/.DS_Store':
        continue

    for file in os.listdir(path):
        image_paths.append(path + '/' + file)

print(f"Image paths: {image_paths}")
print(f"Image path length: {len(image_paths)}")
images = []
for path in tqdm(image_paths[:500]):
    image = cv2.imread(path)
    if image is not None:   image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    else:                   continue
    images.append(image)
images = np.array(images, dtype='float32')
print(f"Images shape: {images.shape}")

Data path list: ['./data/059-1-1-03-Z4_C', './data/115-1-1-05-Z3_C', './data/019-1-1-01-Z4_C', './data/005-1-1-01-Z58_C', './data/093-1-1-04-Z3_C', './data/006-1-1-01-Z17_C', './data/040-1-1-02-Z4_C', './data/030-1-1-01-Z72_C', './data/002-1-1-01-Z58_C', './data/001-1-1-01-Z17_C', './data/078-1-1-03-Z3_C', './data/110-1-1-04-Z3_C', './data/005-1-1-01-Z4_C', './data/017-1-1-01-Z58_C', './data/010-1-1-01-Z58_C', './data/010-1-1-01-Z4_C', './data/050-1-1-03-Z4_C', './data/039-1-1-02-Z4_C', './data/022-1-1-01-Z116_C', './data/026-1-1-01-Z58_C', './data/105-1-1-04-Z3_C', './data/021-1-1-01-Z58_C', './data/009-1-1-01-Z109_C', './data/008-1-1-01-Z116_C', './data/086-1-1-04-Z3_C', './data/022-1-1-01-Z4_C', './data/062-1-1-03-Z4_C', './data/031-1-1-01-Z116_C', './data/026-1-1-01-Z116_C', './data/100-1-1-04-Z3_C', './data/018-1-1-01-Z116_C', './data/019-1-1-01-Z109_C', './data/011-1-1-01-Z58_C', './data/021-1-1-01-Z116_C', './data/016-1-1-01-Z58_C', './data/119-1-1-05-Z3_C', './data/055-1-1-03-Z

100%|██████████| 229/229 [00:00<00:00, 24845.33it/s]


Image paths: ['./data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000005.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000011.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000010.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000004.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000012.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000006.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000007.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000013.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000017.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000003.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000002.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000016.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000014.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000028.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000029.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000015.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000001.jpg', './data/059-1-1-03-Z4_C/059-1-1-03-Z4_C-0000018.jpg', './data/059-1-

100%|██████████| 500/500 [00:03<00:00, 146.32it/s]


Images shape: (500, 1080, 1920, 3)


In [5]:
print(f"Converting to numpy array...")
images = torch.Tensor(images).to(device)

Converting to numpy array...


In [6]:
images = images.permute(0, 3, 1, 2)
print(f"Iamges shape: {images.shape}")

Iamges shape: torch.Size([500, 3, 1080, 1920])


In [7]:
skeleton_extractor = models.detection.keypointrcnn_resnet50_fpn(pretrained=True).to(device).eval()
bounding_boxer = models.detection.fasterrcnn_resnet50_fpn(pretrained=True).to(device).eval()



In [8]:
# Image -> Bounding Box -> Skeleton -> Skeleton point as list (Vector)
skeleton_list = []
for image in tqdm(images):
    image = image / 255.0

    with torch.no_grad():
        prediction = bounding_boxer([image.to(device)])
    prediction = prediction[0]
    x1, y1, x2, y2 = prediction['boxes'][0]

    cropped_image = image[:, int(y1):int(y2), int(x1):int(x2)]
    cropped_image = torch.nn.functional.interpolate(cropped_image.unsqueeze(0), size=(256, 512)).squeeze(0)

    with torch.no_grad():
        prediction = skeleton_extractor([cropped_image.to(device)])
    prediction = prediction[0]

    skeleton_point = []
    for i in range(17):
        try:    skeleton_point.append(prediction['keypoints'][0][i][0:2].cpu().numpy())
        except: continue
    skeleton_list.append(skeleton_point)

skeleton_points = np.array(skeleton_list)
print(f"Skeleton points shape: {skeleton_points.shape}")

 17%|█▋        | 87/500 [03:56<18:18,  2.66s/it]

In [None]:
skeleton_1, skeleton_2, _, _ = train_test_split(skeleton_points, skeleton_points, test_size=0.5, random_state=42, shuffle=True)
print(f"Skeleton 1 shape: {skeleton_1.shape}")
print(f"Skeleton 2 shape: {skeleton_2.shape}")

Skeleton 1 shape: (50,)
Skeleton 2 shape: (50,)


In [None]:
def jaccard_score(skeleton_1, skeleton_2):
    skeleton_1, skeleton_2 = np.array(skeleton_1), np.array(skeleton_2)
    skeleton_1 = skeleton_1.reshape(-1)
    skeleton_2 = skeleton_2.reshape(-1)

    intersection = np.sum(np.minimum(skeleton_1, skeleton_2))
    union = np.sum(np.maximum(skeleton_1, skeleton_2))

    return intersection / union

skeletons_1, skeletons_2 = [], []
labels = []
for i in tqdm(range(len(skeleton_1)), desc="Calculating labels...", total=len(skeleton_1)):
    if len(skeleton_1[i]) == 0 or len(skeleton_2[i]) == 0:
        continue

    labels.append(jaccard_score(skeleton_1[i], skeleton_2[i]))
    skeletons_1.append(skeleton_1[i])
    skeletons_2.append(skeleton_2[i])

labels = np.array(labels)

print(f"Labels shape: {labels.shape}")
print(labels)

Calculating labels...: 100%|██████████| 50/50 [00:00<00:00, 46561.99it/s]

Labels shape: (28,)
[0.6533449  0.6028683  0.56084406 0.48734745 0.771055   0.73819226
 0.47683257 0.70777667 0.6710856  0.76640093 0.38798934 0.75373924
 0.7051418  0.84474593 0.49723864 0.391453   0.3808763  0.6332195
 0.62517685 0.5202835  0.5948114  0.78188026 0.7037459  0.6193155
 0.7217617  0.6572142  0.6182686  0.7218814 ]





In [None]:
guide_X_train, guide_X_test, guide_y_train, guide_y_test = train_test_split(skeleton_1, labels, test_size=0.2, random_state=42)
guide_X_train = guide_X_train.reshape(-1, 34)
guide_X_test = guide_X_test.reshape(-1, 34)

print(f"Guide X train shape: {guide_X_train.shape}")
print(f"Guide y train shape: {guide_y_train.shape}")
print(f"Guide X test shape: {guide_X_test.shape}")
print(f"Guide y test shape: {guide_y_test.shape}")

In [None]:
user_X_train, user_X_test, user_y_train, user_y_test = train_test_split(skeleton_2, labels, test_size=0.2, random_state=42)
user_X_train, user_X_test = user_X_train.reshape(-1, 34), user_X_test.reshape(-1, 34)
print(f"User X train shape: {user_X_train.shape}")
print(f"User y train shape: {user_y_train.shape}")
print(f"User X test shape: {user_X_test.shape}")
print(f"User y test shape: {user_y_test.shape}")

In [None]:
for idx, y in enumerate(guide_y_train):
    if y == user_y_train[idx]:  continue
    else:   print(f"Guide y: {y}, User y: {user_y_train[idx]}")

In [None]:
class TrainDataset(Dataset):
    def __init__(self, guide_X, guide_y, user_X, user_y):
        self.guide_X = guide_X
        self.guide_y = guide_y
        self.user_X = user_X
        self.user_y = user_y

    def __len__(self):
        return len(self.guide_X)
    
    def __getitem__(self, idx):
        return self.guide_X[idx], self.guide_y[idx], self.user_X[idx], self.user_y[idx]
    
class TestDataset(Dataset):
    def __init__(self, guide_X, guide_y, user_X, user_y):
        self.guide_X = guide_X
        self.guide_y = guide_y
        self.user_X = user_X
        self.user_y = user_y

    def __len__(self):
        return len(self.guide_X)
    
    def __getitem__(self, idx):
        return self.guide_X[idx], self.guide_y[idx], self.user_X[idx], self.user_y[idx]
    
train_dataset = TrainDataset(guide_X_train, guide_y_train, user_X_train, user_y_train)
test_dataset = TestDataset(guide_X_test, guide_y_test, user_X_test, user_y_test)

train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)

## Model

In [None]:
import torch.optim as optim
import torch.nn as nn
import torch

In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.guide_points_skeleton = nn.Linear(34, 1)
        self.consumer_points_skeleton = nn.Linear(34, 1)
        self.score = nn.Linear(2, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, guide_X, user_X):
        guide_X = self.relu(self.guide_points_skeleton(guide_X))
        user_X = self.relu(self.consumer_points_skeleton(user_X))

        x = torch.cat((guide_X, user_X), dim=1)

        x = self.sigmoid(self.score(x))

        return x
    
model = Model().to(device)
criterion = nn.SmoothL1Loss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

In [None]:
epoch = 100 
train_loss, validate_loss = [], []

for epoch in tqdm(range(epoch)):
    running_loss = 0.0
    p_bar = tqdm(enumerate(train_dataloader), total=len(train_dataloader))
    for i, data in p_bar:
        guide_X, guide_y, user_X, user_y = data
        guide_X, guide_y, user_X, user_y = guide_X.to(device), guide_y.to(device), user_X.to(device), user_y.to(device)

        optimizer.zero_grad()

        outputs = model(guide_X, user_X)
        loss = criterion(outputs, torch.unsqueeze(user_y, 1))
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        p_bar.set_description(f"Epoch: {epoch}, Loss: {running_loss}")
    train_loss.append(running_loss / len(train_dataloader))

    running_loss = 0.0
    for i, data in enumerate(test_dataloader):
        guide_X, guide_y, user_X, user_y = data
        guide_X, guide_y, user_X, user_y = guide_X.to(device), guide_y.to(device), user_X.to(device), user_y.to(device)

        outputs = model(guide_X, user_X)
        loss = criterion(outputs, torch.unsqueeze(user_y, 1))
        
        running_loss += loss.item()
    validate_loss.append(running_loss / len(test_dataloader))

torch.save(model.state_dict(), './model.pth')

In [None]:
plt.figure(figsize=(10, 5))
plt.title("Loss")
plt.plot(train_loss, label="Train")
plt.plot(validate_loss, label="Validate")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()