In [None]:
import os
import json
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import cv2
import numpy as np
from tensorflow.keras import layers, models

## Separating images in folder by label

In [None]:
HEIGHT = 300
WIDTH = 300

target_size = (HEIGHT, WIDTH)

with open('jiu_annotations.json', 'r') as file:
    annotations = json.load(file)

labels = {'standing': 0, 'takedown': 1, 'open_guard': 2, 'half_guard': 3, 'closed_guard': 4, '5050_guard': 5, 'side_control': 6, 'mount': 7, 'back': 8, 'turtle': 9}

num_labels = len(labels)

image_folder = 'D:\jiu_dataset\images'

num_keypoints = 17

images_normalized = []
annotations_formatted = []

NUM_IMAGES = 10000

separate_images_folder = 'D:\jiu_dataset\images_separated_by_position'

if not os.path.exists(separate_images_folder):
    os.makedirs(separate_images_folder)

try:
    for annotation in annotations:
        image_filename = annotation['image'] + '.jpg'
        image_path = os.path.join(image_folder, image_filename)
        
        if not os.path.exists(image_path):
            continue
        
        print(image_path)
        
        read_image = cv2.imread(image_path)
        
        label = annotation['position']
        if label[-1].isdigit():
            label = label[:-1]
        
        if not os.path.exists(os.path.join(separate_images_folder, 'tmp', label)):
            os.makedirs(os.path.join(separate_images_folder, 'tmp', label))
            
        cv2.imwrite(os.path.join(separate_images_folder, 'tmp', label, image_filename), read_image)
    
except Exception as e:
    print("Error:", e)


## Train and test split 

In [None]:
from random import shuffle

TEST_DIR = os.path.join(separate_images_folder, 'test')
TRAIN_DIR = os.path.join(separate_images_folder, 'train')
TEMP_DIR = os.path.join(separate_images_folder, 'tmp')
PERCENTAGE = 0.8

if not os.path.exists(TEST_DIR):
    os.makedirs(TEST_DIR)
    
if not os.path.exists(TRAIN_DIR):
    os.makedirs(TRAIN_DIR)
    
for category in os.listdir(TEMP_DIR):
    if not os.path.isdir(os.path.join(TEMP_DIR, category)):
        continue
    
    images = os.listdir(os.path.join(TEMP_DIR, category))
    
    shuffle(images)

    num_images = len(images)
    
    train_images = images[:int(num_images * PERCENTAGE)]
    print('NUMBER OF TRAIN IMAGES FOR', category, len(train_images))
    test_images = images[int(num_images * PERCENTAGE):]
    print('NUMBER OF TEST IMAGES FOR', category, len(test_images))
    
    for image in train_images:
        image_path = os.path.join(TEMP_DIR, category, image)
        new_path = os.path.join(TRAIN_DIR, category, image)
        os.makedirs(os.path.dirname(new_path), exist_ok=True)
        os.rename(image_path, new_path)
        
    for image in test_images:
        image_path = os.path.join(TEMP_DIR, category, image)
        new_path = os.path.join(TEST_DIR, category, image)
        os.makedirs(os.path.dirname(new_path), exist_ok=True)
        os.rename(image_path, new_path)

## Creating the image generator

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

TEST_DIR = os.path.join('D:\jiu_dataset\images_separated_by_position', 'test')
TRAIN_DIR = os.path.join('D:\jiu_dataset\images_separated_by_position', 'train')

# Data augmentation and normalization for training
train_datagen = ImageDataGenerator(
      rescale=1./255,
      rotation_range=40,
      width_shift_range=0.2,
      height_shift_range=0.2,
      shear_range=0.2,
      zoom_range=0.2,
      horizontal_flip=True,
      fill_mode='nearest')

train_generator = train_datagen.flow_from_directory(
        TRAIN_DIR, 
        target_size=(300, 300), 
        class_mode='categorical')

test_data_gen = ImageDataGenerator(rescale=1./255)

test_generator = test_data_gen.flow_from_directory(
            TEST_DIR, 
            target_size=(300, 300), 
            class_mode='categorical')

## Creating the neural model

In [None]:
def create_pose_detection_model(input_shape, labels=10):
    model = models.Sequential()

    model.add(layers.Conv2D(16, (3, 3), activation='relu', input_shape=input_shape))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(32, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    model.add(layers.Flatten())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dense(labels, activation='softmax'))

    return model

input_shape = (300, 300, 3)

pose_detection_model = create_pose_detection_model(input_shape)

# sparse_categorical_crossentropy (?)
pose_detection_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

pose_detection_model.summary()


## Training the model

In [None]:
class callback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs={}):
        if(logs.get('accuracy')>0.85):
            print("\nReached 85% accuracy so cancelling training!")
            self.model.stop_training = True

num_epochs = 10

history = pose_detection_model.fit(
    train_generator, 
    epochs=num_epochs, 
    callbacks=[callback()],
    validation_data=test_generator
    )

In [None]:
# Save the model
pose_detection_model.save('jiu_pose_detection_model.h5')

## Load the model

In [None]:
pose_detection_model = tf.keras.models.load_model('jiu_pose_detection_model.h5')

## Seeing model history

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['loss'], label = 'loss')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')

## Getting predictions in a random image

In [None]:
from random import randint

image_folder = 'D:\jiu_dataset\images'

while True:
    try:
        random_n = str(randint(1, 1500000))

        while len(random_n) < 7:
            random_n = '0' + random_n

        image_path = image_folder + '/' + random_n + '.jpg'
        image = load_img(image_path, target_size=(300, 300))
        
        break
    except:
        continue

image_array = img_to_array(image)
image_array = image_array / 255.0
image_array = np.expand_dims(image_array, axis=0)

prediction = pose_detection_model.predict(image_array)
categories = train_generator.class_indices

categories_keys = list(categories.keys())

prob_by_category = {}

for i, prev in enumerate(prediction[0]):
    l = categories_keys[i]
    prob_by_category[l] = '{:.4f}%'.format(float(prev) * 100)

prob_by_category = dict(sorted(prob_by_category.items(), key=lambda item: float(item[1][:-1]), reverse=True))

print(prob_by_category)

label = categories_keys[np.argmax(prediction)]

plt.imshow(image)
plt.show()

print(label)
        


In [None]:
test_images_dir = 'test_images'

for image in os.listdir(test_images_dir):
    image_path = os.path.join(test_images_dir, image)
    image = load_img(image_path, target_size=(300, 300))
    image_array = img_to_array(image)
    image_array = image_array / 255.0
    image_array = np.expand_dims(image_array, axis=0)

    prediction = pose_detection_model.predict(image_array)
    categories = train_generator.class_indices

    categories_keys = list(categories.keys())

    prob_by_category = {}

    for i, prev in enumerate(prediction[0]):
        l = categories_keys[i]
        prob_by_category[l] = '{:.4f}%'.format(float(prev) * 100)

    prob_by_category = dict(sorted(prob_by_category.items(), key=lambda item: float(item[1][:-1]), reverse=True))

    print(prob_by_category)

    label = categories_keys[np.argmax(prediction)]

    plt.imshow(image)
    plt.show()

    print(label)
    
    print('-----------------')

## Video test

In [None]:
import cv2
import numpy as np
from time import sleep

video_path = 'video.mp4'

cap = cv2.VideoCapture(video_path)

while True:
    ret, frame = cap.read()
    
    if not ret:
        break
    
    frame = cv2.resize(frame, (300, 300))
    
    cv2.imshow('frame', frame)
    
    frame = np.expand_dims(frame, axis=0)
    
    prediction = pose_detection_model.predict(frame)
    categories = train_generator.class_indices

    categories_keys = list(categories.keys())

    label = categories_keys[np.argmax(prediction)]
    
    print(label)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
