# KP Detection
## 1-1 Training Dataset Generation

In [1]:
import json
import os
import sys
import random
import yaml

import matplotlib.pyplot as plt
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
# from ultralytics.data.annotator import auto_annotate

In [2]:
path_current = os.path.dirname(os.path.abspath('__file__'))
os.path.split(path_current)[0]
sys.path.append('/workspaces/MoonClimbers/app')

In [3]:
from app_sys import AppSys

## Controls

In [4]:
neffect_max = 4
param_filtering = 1
method_filtering = 'percent'
generate = True
split = True
coco2yolo = False
yolo_config_yaml = False
annotate = False
# Instance of the AppSys
app_sys = AppSys()

In [5]:
class Effect:
    """
    Frame modifications
    """
    def __init__(self, neffects):
        self.neffects = neffects
        self.dict_effects = {
            'resize': self.resize,
            'rotate': self.rotate,
            'skew': self.skew,
            'afin': self.afin,
            'blur': self.blur,
            # 'noise': self.noise,
            'gray': self.gray,
            # 'dark': self.dark,
            # 'light': self.light,
            # 'color_shift': self.color_shift,
            'color_shuffle': self.color_shuffle
            }
        self.get_effects()
    
    def get_effects(self):
        """
        Get a list of effects to be applied. If 'gray' comes with 'colour shift/shuffle', remove 'colour shift/shuffle'
        """
        self.list_effects = random.sample(self.dict_effects.keys(), self.neffects)
        if 'gray' in self.list_effects:
            try:
                for ee in ['color_shift', 'color_shuffle']:
                    self.list_effects.remove(ee)
            except ValueError:
                pass

    def apply_effects(self, img):
        """
        Apply the effects and generate a string to log what effects were applied
        """
        e = ''
        _img = img
        for effect in self.list_effects:
            print(effect, _img.dtype)
            e += f'_{effect}'
            _img = self.dict_effects[effect](_img)
        return _img, e
    
    # Utils

    def gamma_correction(self, img, gamma):
        lookup_table = np.zeros((256, 1), dtype='uint8')
        for i in range(256):
            lookup_table[i][0] = 255 * pow(float(i) / 255, 1.0 / gamma)
        return cv2.LUT(img, lookup_table)
        
    # Geometry

    def resize(self, img):
        zoom = random.uniform(0.3, 1.)
        # aspect = random.choice([0, 1, 4, 16])
        h, w = img.shape[:2]
        img = cv2.resize(img, (round(w * zoom), round(h * zoom)))
        return img
    
    def rotate(self, img):
        h, w = img.shape[:2]
        ss = max(h, w)
        deg = random.uniform(-179, 180)
        if abs(deg) > 1e-3:
            zoom = random.uniform(0.6, 1.)
            M = cv2.getRotationMatrix2D((w / 2, h / 2), deg, zoom)
            img = cv2.warpAffine(img, M, (round(ss * zoom), round(ss * zoom)), borderValue=(128,128,128))
            return img
        else:
            return img

    def skew(self, img):
        h, w = img.shape[:2]
        deg = random.uniform(-30, 30)
        if abs(deg) > 1e-3:
            a = np.tan(np.deg2rad(deg))
            M = np.array([[1, a, 0], [0, 1, 0]], dtype=np.float32)
            img = cv2.warpAffine(img, M, (int(w + h * a), h), borderValue=(128,128,128))
        return img

    def afin(self, img):
        """ rotate + skew, defined by projection of 3 points """
        h, w = img.shape[:2]
        pts1 = np.float32([[0, 50], [50, 0], [10, 50]])
        pts2 = np.float32([[0, 50], [40, 10], [10, 50]])
        M = cv2.getAffineTransform(pts1, pts2)
        img = cv2.warpAffine(img, M, (w, h), borderValue=(128,128,128))
        return img
    
    # Image sharpness
    
    def blur(self, img):
        size = random.randint(3, 9)
        img = cv2.blur(img, (size, size))
        return img

    # def noise(self, img):
    #     noise_level = random.randint(1, 30)
    #     noise = np.random.randint(0, noise_level, img.shape)
    #     img = img + noise.astype('uint8')
    #     return img
    
    # Colours

    def gray(self, img):
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        return img

    # def dark(self, img):
    #     img = self.gamma_correction(img, round(random.uniform(0.7, 1), 1))
    #     return img

    def light(self, img):
        img = self.gamma_correction(img, round(random.uniform(1, 2), 1))
        return img

    def color_shift(self, img):
        """
        Random colour shift by look-up table
        """
        coeff = random.uniform(0.05, 0.8)
        idx = np.arange(256)
        sigmoid = 1/(1+(np.exp(-coeff*(idx - 256//2 + 1))))
        lut = (sigmoid * 256).astype(np.uint8)
        return cv2.LUT(img, lut)
    
    def color_shuffle(self, img):
        """
        Shuffle RBG channels
        """
        lis_bgr = list(cv2.split(img))
        random.shuffle(lis_bgr)
        return cv2.merge((tuple(lis_bgr)))



In [6]:
class SampleImage():
    """
    A class to extract random frames from a video and apply random effects to them.
    """
    def __init__(self, video, neffect_max=3) -> None:
        # Max number of effects applied
        self.neffects_max = neffect_max
        # Number of frames to be extracted from each video

        self.vpath = video
        self.vname = os.path.split(self.vpath)[1]
        # Directpry to save the augmented frames
        self.saveto = app_sys.PATH_ASSET_HOLD_DETECT
        print('save to: ', self.saveto)
        os.makedirs(self.saveto, exist_ok=True)

        # Open the video and get the number of frames
        self.vidcap = cv2.VideoCapture(self.vpath)
        self.num_frames = int(self.vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
        print(self.num_frames)
        # Extract frames from the video and apply random effects (max 3 effects)

    def standardize_fsize(self, img, target_size=640):
        """
        Standardize the frame size to 640x640 pixels for YOLOv8.
        Resize while keeping the aspect ratio. Padding the frame with black pixels.
        """

        h, w = img.shape[:2]
        scale = target_size / max(h, w)
        new_w, new_h = int(w * scale), int(h * scale)

        resized = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
        # Create a blank canvas with padding
        # If grayscale image
        if len(img.shape) == 2:
            padded_img = np.ones((target_size, target_size), dtype=np.uint8) * 128  # Gray padding
        else:
            padded_img = np.ones((target_size, target_size, 3), dtype=np.uint8) * 128  # Gray padding
        pad_top = (target_size - new_h) // 2
        pad_left = (target_size - new_w) // 2
        padded_img[pad_top:pad_top+new_h, pad_left:pad_left+new_w] = resized

        return padded_img

    def get_frame(self, nf, method):
        """ Extract the nf-th frame / random frame of the video """
        # If selecting random frames, overwite nf.
        if method == 'random':
            nf = random.randint(0, self.num_frames - 1)
        self.vidcap.set(cv2.CAP_PROP_POS_FRAMES, nf)
        success, img = self.vidcap.read()
        if success:
            return img

    def get_modified_frames(self, arg, method='percent'):
        """
        Parameters:
            arg: - number of frames to extract per video for method == 'random'
                 - percent of the number of frames to extract per video for method == 'percent'
            methodf: 'random' or 'percent'
        """
        if method == 'random':
            if min(0, arg) < 0:
                arg = 0
            elif max(self.num_frames, arg) > self.num_frames:
                arg = self.num_frames
            list_nfs = np.arange(arg)
        else:
            if min(0, arg) < 0:
                arg = 0
            elif max(100, arg) > 100:
                arg = 100
            list_nfs = np.unique(np.linspace(0, self.num_frames-1, num=int(self.num_frames*arg/100)).astype(int))

        for i in list_nfs:
            img = self.get_frame(i, method)
            if img is None:
                continue
            neffets = random.randint(0, self.neffects_max)
            if neffets == 0:
                img_std = self.standardize_fsize(img, target_size=640)
                cv2.imwrite(os.path.join(self.saveto, f"{self.vname}_{i}.jpg"), img_std)
            else:
                _img = img
                # Instance of Effect class
                effect = Effect(neffets)
                _img, ff = effect.apply_effects(_img)
                img_std = self.standardize_fsize(_img, target_size=640)
                cv2.imwrite(os.path.join(self.saveto, f"{self.vname}_{i}_{ff}.jpg"), img_std)


In [7]:
def rename(lis_imgs, base=0):
    # Rename images in the list of image names
    imgs = [f for f in lis_imgs if f.endswith('.jpg')]
    imgs_shuffled = shuffle(imgs, random_state=1)
    lis_new_names = []
    for e, i in enumerate(imgs_shuffled):
        index = base + e
        img_name_new = f'{index:05d}.jpg'
        print(i, img_name_new)
        lis_new_names.append(img_name_new)
    return lis_new_names

In [8]:
# def auto_annotater(dir_data, dir_out):
#     # auto_annotate(data=dir_data, det_model='yolov8n.pt', sam_model='sam_b.pt', output_dir=dir_out)
#     auto_annotate(data=dir_data, det_model='yolov8n.pt', sam_model='sam2_b.pt', output_dir=dir_out)
#     # auto_annotate(data=dir_data, det_model='yolov9e.pt', sam_model='sam_l.pt', output_dir=dir_out)
#     # auto_annotate(data=dir_data, det_model=os.path.join(os.getcwd(), "results", "8_epochs-2", "weights", "best.pt"), sam_model='sam_l.pt', output_dir=dir_out)
#     # auto_annotate(data=dir_data, det_model=os.path.join(os.getcwd(), "results", "8_epochs-2", "weights", "best.pt"), output_dir=dir_out)


## Generating frame images

In [15]:
# test, valid = 'Stargaze_luke.mp4', 'Potato_has_Fallen_jonah.mp4'
test, valid = '', ''

In [16]:
if generate:
    samples = os.listdir(app_sys.PATH_ASSET_RAW)
    for sample in samples:
        if sample.endswith('.mp4'):
            if sample not in [test, valid]:
                print(sample)
                sample_ = SampleImage(os.path.join(app_sys.PATH_ASSET_RAW, sample), neffect_max)
                sample_.get_modified_frames(param_filtering, method=method_filtering)

New_Heart_Design_luke.mp4
save to:  /workspaces/MoonClimbers/app/asset/hold_detect
2933
resize uint8
afin uint8


since Python 3.9 and will be removed in a subsequent version.
  self.list_effects = random.sample(self.dict_effects.keys(), self.neffects)


resize uint8
gray uint8
rotate uint8
gray uint8
resize uint8
rotate uint8
afin uint8
resize uint8
gray uint8
skew uint8
gray uint8
skew uint8
blur uint8
blur uint8
gray uint8
skew uint8
color_shuffle uint8
gray uint8
afin uint8
blur uint8
rotate uint8
gray uint8
blur uint8
rotate uint8
gray uint8
afin uint8
gray uint8
skew uint8
resize uint8
color_shuffle uint8
rotate uint8
skew uint8
resize uint8
afin uint8
gray uint8
color_shuffle uint8
afin uint8
resize uint8
skew uint8
resize uint8
color_shuffle uint8
rotate uint8
blur uint8
skew uint8
afin uint8
blur uint8
resize uint8
resize uint8
rotate uint8
blur uint8
color_shuffle uint8
MB2024_scanning.mp4
save to:  /workspaces/MoonClimbers/app/asset/hold_detect
3474
blur uint8
afin uint8
blur uint8
blur uint8
resize uint8
color_shuffle uint8
afin uint8
blur uint8
rotate uint8
resize uint8
resize uint8
afin uint8
gray uint8
resize uint8
blur uint8
gray uint8
skew uint8
resize uint8
skew uint8
blur uint8
gray uint8
skew uint8
color_shuffle uin

## Splitting Dataset into Train, Test, Valid

In [17]:
if split:
    direc_data = app_sys.PATH_ASSET_HOLD_DETECT
    files = os.listdir(direc_data)
    imgs = [f for f in files if f.endswith('.jpg')]
    X = shuffle(imgs, random_state=1)
    train, test_valid = train_test_split(X, test_size=0.2, random_state=1)
    test, valid = train_test_split(test_valid, test_size=0.5, random_state=1)
    dict_data = dict(zip(["Train", "Test", "Validation"], [train, test, valid]))
    for ndirec in dict_data.keys():
        new_direc = os.path.join(direc_data, ndirec)
        os.makedirs(new_direc, exist_ok=True)
        n_pre_exist = len(os.listdir(new_direc))
        print(f'{n_pre_exist} images exists in {ndirec} directory.\nNewly added files counts staret from: {n_pre_exist}')
        for nfile, nfile_new in zip(dict_data[ndirec], rename(dict_data[ndirec], base=n_pre_exist)):
            # Number of files exist in the directory already.
            os.rename(os.path.join(direc_data, nfile), os.path.join(new_direc, nfile_new))

0 images exists in Train directory.
Newly added files counts staret from: 0
Open_hands_zac.mp4_541.jpg 00000.jpg
MB2024_scanning.mp4_631__resize_afin.jpg 00001.jpg
Potato_has_Fallen_jonah.mp4_0__afin_skew_gray_resize.jpg 00002.jpg
Open_hands_zac.mp4_1299__rotate.jpg 00003.jpg
New_Heart_Design_luke.mp4_209__resize_gray.jpg 00004.jpg
Stargaze_luke.mp4_238.jpg 00005.jpg
New_Heart_Design_luke.mp4_1151__blur_rotate_gray.jpg 00006.jpg
New_Heart_Design_luke.mp4_1361__gray_afin.jpg 00007.jpg
New_Heart_Design_luke.mp4_2722__resize_rotate_blur_color_shuffle.jpg 00008.jpg
Mollys_Pinches_luke.mp4_845.jpg 00009.jpg
MB2024_scanning.mp4_1578__color_shuffle_resize.jpg 00010.jpg
Open_hands_zac.mp4_1082.jpg 00011.jpg
Mollys_Pinches_luke.mp4_422__afin_blur_skew.jpg 00012.jpg
Potato_has_Fallen_jonah.mp4_1271__afin_resize_color_shuffle.jpg 00013.jpg
Mollys_Pinches_luke.mp4_1690__afin.jpg 00014.jpg
MB2024_scanning.mp4_3367__rotate_afin_resize_blur.jpg 00015.jpg
Potato_has_Fallen_jonah.mp4_462__gray.jpg 0001

In [12]:
if coco2yolo:
    for mode in ["train", "valid"]:
        cocojson_dir = os.path.join(os.getcwd(), "data", mode, f"{mode}.json")
        imgs_dir = os.path.join(os.getcwd(), "data", mode, "images")
        txt_dir = os.path.join(os.getcwd(), "data", mode, "labels")
        with open(cocojson_dir) as f:
            cocojson = json.load(f)
        print(cocojson.keys())
        for img in cocojson["images"]:
            print(img["id"])
            anns = [ann for ann in cocojson["annotations"] if ann["image_id"] == img["id"]]
            w, h = img["width"], img["height"]
            if len(anns) > 0:
                with open(os.path.join(txt_dir, img["file_name"].split(".")[0] + ".txt"), "w") as ff:
                    for ann in anns:
                        # Yolo txt label counts labels from zero
                        category = ann["category_id"] - 1
                        # Normalise the coordinates between 0 and 1
                        polygon = ann["segmentation"][0]
                        norm_polygon = [format(coord / w if i % 2 == 0 else coord / h, ".6f") for i, coord in enumerate(polygon)]
                        ff.write(f"{category} " + " ".join(map(str, norm_polygon)) + "\n")


In [13]:
if yolo_config_yaml:
    class_names = ["hold", "volume", "wall", "mat"]
    nc = len(class_names)
    path = os.path.join(os.getcwd(), "data")
    train_dir = os.path.join(path, "train", "images")
    valid_dir = os.path.join(path, "valid", "images")
    test_dir = ""
    config = {
        "names": class_names,
        "nc": nc,
        "path": path,
        "train": train_dir,
        "val": valid_dir
    }
    with open(os.path.join(os.getcwd(), "data.yaml"), "w") as f:
        yaml.dump(config, f, default_flow_style=False)

In [14]:
# if annotate:
#     dir_data = os.path.join(os.getcwd(), "data", "train", "unused")
#     dir_out = os.path.join(os.getcwd(), "data", "train", "unused", "label")
#     auto_annotater(dir_data, dir_out)

#     from random import randint

#     img = cv2.imread(os.path.join(dir_data, "sample1.mp4_0.jpg"))
#     h, w = img.shape[:2]
#     with open(os.path.join(dir_out, "sample1.mp4_0.txt"), 'r') as f:
#         labels = f.read().splitlines()

#     for label in labels:
#         class_id, *poly = label.split(' ')

#         poly = np.asarray(poly, dtype=np.float16).reshape(-1, 2)  # Read poly, reshape
#         poly *= [w, h]  # Unscale

#         cv2.polylines(img, [poly.astype('int')], True, (randint(0, 255), randint(0, 255), randint(0, 255)), 2)  # Draw Poly Lines

#     import matplotlib.pyplot as plt
#     plt.imshow(img)