In [1]:
# Parameters
boostnet_dir = "./datasets/labeldata" 
output_dir = "./datasets/keypointrcnn_data" 

In [None]:
import os
import imagesize
# import pybboxes as pbx
import pandas as pd
import shutil
import json

In [3]:
# 주어진 랜드마크를 x좌표와 y좌표로 나누어 반환
def split_XY(landmarks):
    first_half = landmarks[:68]
    second_half = landmarks[68:]
    return (first_half, second_half)

# 주어진 x, y 좌표 목록에서 최소 및 최대 값을 찾아 bounding box의 경계 값을 반환
# 아래의 get_bounding_boxes에서 경계를 찾는 데 활용
def find_min_max(x, y):
    # 최소값 계산
    x_min = min(x)
    y_min = min(y)

    # 최대값 계산
    x_max = max(x)
    y_max = max(y)
    return(x_min, y_min, x_max, y_max)

# 정규화된 랜드마크를 기반으로 bounding box를 계산하여 반환
# bounding box : 각 척추를 감싸는 사각형의 경계, 특정 척추(keypoint)들을 포함하는 경계 좌표라고 생각하면 됨 
# 척추의 위치를 정의하는 데 사용 => 모델이 bounding box를 통해 객체(척추)를 식별하고 위치를 파악
def get_bounding_boxes(normalized_landmarks):
  """returns bounding boxes in ``[x1,y1,x2,y2]`` normalized format,
    ``0 <= x1 < x2 <= 1`` and ``0 <= y1 < y2 <= 1``
  """
  x_list, y_list = split_XY(normalized_landmarks)
  bounding_boxes = []
  POINTS_PER_VERTEBRAE = 4  # 각 척추에 대한 포인트 수
  VERTEBRAE_COUNT = int(len(x_list) / POINTS_PER_VERTEBRAE)  # 17 : 총 척추의 수

  for i in range(VERTEBRAE_COUNT):
    x = x_list[POINTS_PER_VERTEBRAE * i: POINTS_PER_VERTEBRAE * i + 4]
    y = y_list[POINTS_PER_VERTEBRAE * i: POINTS_PER_VERTEBRAE * i + 4]
    min_max = find_min_max(x,y)
    bounding_boxes.append((min_max[0],min_max[1],min_max[2],min_max[3]))

  return bounding_boxes

# 정규화된 랜드마크를 사용하여 각 인스턴스의 키포인트를 생성하고, 모델이 사용할 수 있는 형식으로 반환
# 반환 형식 : [x, y, visibility]
# keypoint : 척추의 곡률을 평가하는 데 필요한 주요 포인트, 각 척추들의 위치 좌표라고 생각하면 됨 
def get_keypoints(normalized_landmarks):
  """returns keypoints (normalized), where
  keypoints (FloatTensor[N, K, 3]): the K keypoints location for each of the N instances, in the
        format [x, y, visibility], where visibility=0 means that the keypoint is not visible.
  """
  x_list, y_list = split_XY(normalized_landmarks)

  keypoints = [] # 전체 이미지의 keypoint 저장
  POINTS_PER_VERTEBRAE = 4  # 각 인스턴스에 대한 포인트 수
  VERTEBRAE_COUNT = int(len(x_list) / POINTS_PER_VERTEBRAE)  # 17 : 총 인스턴스 수

  # 각 인스턴스에 대해 반복문 
  for i in range(VERTEBRAE_COUNT):
    x_points = x_list[POINTS_PER_VERTEBRAE * i: POINTS_PER_VERTEBRAE * i + 4]
    y_points = y_list[POINTS_PER_VERTEBRAE * i: POINTS_PER_VERTEBRAE * i + 4]

    # [x, y, visibility] 형식의 리스트 생성
    # visibility는 항상 1로 설정 : 해당 keypoint가 보이는 상태
    # 같은 bounding box에 포함되는 keypoint들은 하나의 인스턴스에 속함
    instance_keypoints = [[x,y,1] for x, y in zip(x_points, y_points)]
    keypoints.append(instance_keypoints)

  return keypoints

In [4]:
# 주어진 이미지와 랜드마크를 사용하여 KeyPoint R-CNN format의 주석 생성
# 이미지의 크기를 기반으로 bounding box와 keypoint를 비정규화하여 반환
def preprocess_image(image_dir, landmarks):
  """return KeyPoint RCNN Format annotation of 1 image

  During training, the model expects both the input tensors, as well as a targets (list of dictionary),
  containing:
      - boxes (``FloatTensor[N, 4]``): the ground-truth boxes in ``[x1, y1, x2, y2]`` format, with
          ``0 <= x1 < x2 <= W`` and ``0 <= y1 < y2 <= H``.
      - labels (Int64Tensor[N]): the class label for each ground-truth box
      - keypoints (FloatTensor[N, K, 3]): the K keypoints location for each of the N instances, in the
        format [x, y, visibility], where visibility=0 means that the keypoint is not visible.
  """

  # width, height = imagesize.get(image_dir)  # 이미지 크기 가져오기 (비정규화할 때 사용)
  voc_bboxes = get_bounding_boxes(landmarks)
  keypoints = get_keypoints(landmarks)

  # voc_bboxes = [[int(bbox[0]*width), int(bbox[1]*height), int(bbox[2]*width), int(bbox[3]*height)] for bbox in voc_bboxes]  # 비정규화
  labels = [0 for bbox in voc_bboxes] # 모든 bounding box에 대해 레이블 생성 : 모든 bounding box가 척추이므로 0으로 설정

  # 각 keypoint 비정규화 
  # x, y 좌표를 이미지 크기로 곱하여 픽셀 단위로 변환. visibility는 유지 
  denormalized_keypoints = []
  for instance in keypoints:
    instance_keypoints = []
    for kp in instance:
      # instance_keypoints.append([int(kp[0]*width), int(kp[1]*height), kp[2]])
      instance_keypoints.append([int(kp[0]), int(kp[1]), kp[2]]) # 비정규화 X
    denormalized_keypoints.append(instance_keypoints)

  # 반환값 : 딕셔너리
  return {
      "boxes": voc_bboxes, # 비정규화된 bounding box
      "labels": labels, # 0
      "keypoints": denormalized_keypoints # 비정규화된 keypoint
  }

In [5]:
# data_dir = boostnet_dir + "/data"
# labels_dir = boostnet_dir + "/labels"
# splits = ['/test', '/train']

# # loop through split
# # 각 split에 대해 파일을 읽고 결함 있는 행을 제거한 후, 각 이미지에 대해 주석을 생성하고 JSON 파일로 저장
# for split in splits:
#     filenames = pd.read_csv(labels_dir+split+"/filenames.csv") # indexer
#     landmarks_df = pd.read_csv(labels_dir+split+"/landmarks.csv")

#     print(labels_dir+split+"/filenames.csv")
#     print(labels_dir+split+"/landmarks.csv")
#     print()
    
#     for i, filename in enumerate(filenames.filenames):
#         print(data_dir, split, filename, '//////////', landmarks_df.iloc[i].to_list())

In [6]:
# boostnet_dir의 데이터를 읽고 전처리하여 output_dir에 저장
def preprocess_data(boostnet_dir, output_dir):
  data_dir = boostnet_dir + "/data"
  labels_dir = boostnet_dir + "/labels"
  splits = ['/test', '/train']

  # loop through split
  # 각 split에 대해 파일을 읽고 결함 있는 행을 제거한 후, 각 이미지에 대해 주석을 생성하고 JSON 파일로 저장
  for split in splits:
    filenames = pd.read_csv(labels_dir+split+"/filenames.csv") # indexer
    landmarks_df = pd.read_csv(labels_dir+split+"/landmarks.csv")
    landmarks_list = landmarks_df.landmarks.apply(lambda x: [float(y.strip()) for y in x.split(',')])

    # loop through image
    for i, filename in enumerate(filenames.filenames):
      # preprocess_image(image_dir, landmarks) : boxes, laels, keypoints가 포함된 딕셔너리 반환
      # preprocess_image 함수를 호출하여 주석(annotation) 생성
      annotation = preprocess_image(data_dir+split+"/"+filename, landmarks_list[i])

      # 주석파일을 저장할 디렉토리 이름 설정
      # 테스트 : /val, 훈련 : /train
      split_dirname = "/val" if split == "/test" else "/train"

      # 출력 디렉토리가 존재하지 않으면 생성
      if (not os.path.exists(output_dir)):
        os.mkdir(output_dir)
      if (not os.path.exists(output_dir+"/labels")):
        os.mkdir(output_dir+"/labels")
      if (not os.path.exists(output_dir+"/labels"+split_dirname)):
        os.mkdir(output_dir+"/labels"+split_dirname)

      # 생성된 주석을 JSON 형식으로 저장
      annotation_filename = os.path.splitext(filename)[0]+".json"
      with open(output_dir+"/labels"+split_dirname+"/"+annotation_filename, 'w') as outfile:
        json.dump(annotation, outfile)

      # 출력 디렉토리에 이미지를 저장할 디렉토리 생성
      if (not os.path.exists(output_dir+"/images")):
        os.mkdir(output_dir+"/images")
      if (not os.path.exists(output_dir+"/images"+split_dirname)):
        os.mkdir(output_dir+"/images"+split_dirname)

      # 원본 이미지를 지정된 출력 디렉토리로 복사
      original = data_dir+split+"/"+filename
      target = output_dir+"/images/"+split_dirname

      shutil.copy(original, target)
  print(f"Finished writing to {output_dir}")

In [7]:
preprocess_data(boostnet_dir, output_dir)

Finished writing to ./datasets/keypointrcnn_data


In [8]:
# change rcnn_data to the output_dir
# !zip -r keypointrcnn_data.zip rcnn_data