# Faster R-CNN 구현
### 텐서플로우 기반
### 주석은 한글 위주로 작성

In [1]:
import numpy as np
import cv2
import xmltodict
from tqdm import tqdm
import tensorflow as tf
import random
from glob import glob

## 훈련 이미지 가져오기

In [2]:
train_x_path = '/Users/minguinho/Documents/AI_Datasets/PASCAL_VOC_2007/train/VOCdevkit/VOC2007/JPEGImages'
train_y_path = '/Users/minguinho/Documents/AI_Datasets/PASCAL_VOC_2007/train/VOCdevkit/VOC2007/Annotations'

In [None]:
# 파일 경로 휙득
list_train_x = sorted([x for x in glob.glob(train_x_path + '/**')])    
list_train_y = sorted([x for x in glob.glob(train_y_path + '/**')]) 

## 훈련용 데이터로 제작

In [4]:
# 입력용 이미지 생성. 224, 224로 변환시키고 채널 값(0~255)를 0~1 사이의 값으로 정규화 시켜줌
def make_input(image_file_list): 
    images_list = []
    
    for i in tqdm(range(0, len(image_file_list)), desc="get image") :
        
        image = cv2.imread(image_file_list[i])
        image = cv2.resize(image, (224, 224))/255
        
        images_list.append(image)
    
    return np.asarray(images_list)

## RPN 훈련을 위한 데이터셋 생성에 필요한 함수

In [5]:
# 이미지에 어떤 Ground Truth Box가 있는지
def get_Ground_Truth_Box_fromImage(xml_file_path): # xml_file_path은 파일 하나의 경로를 나타낸다

    f = open(xml_file_path)
    xml_file = xmltodict.parse(f.read()) 

    # 우선 원래 이미지 크기를 얻는다. 왜냐하면 앵커는 224*224 기준으로 만들었는데 원본 이미지는 224*224가 아니기 때문.
    # 224*224에 맞게 줄일려고 하는거다
    Image_Height = float(xml_file['annotation']['size']['height'])
    Image_Width  = float(xml_file['annotation']['size']['width'])

    Ground_Truth_Box_list = [] 

    # multi-objects in image
    try:
        for obj in xml_file['annotation']['object']:
            
            # 박스 좌표(왼쪽 위, 오른쪽 아래) 얻기
            x_min = float(obj['bndbox']['xmin']) 
            y_min = float(obj['bndbox']['ymin'])
            x_max = float(obj['bndbox']['xmax']) 
            y_max = float(obj['bndbox']['ymax'])

            # 224*224에 맞게 변형시켜줌
            x_min = float((224/Image_Width)*x_min)
            y_min = float((224/Image_Height)*y_min)
            x_max = float((224/Image_Width)*x_max)
            y_max = float((224/Image_Height)*y_max)

            Ground_Truth_Box = [x_min, y_min, x_max, y_max]
            Ground_Truth_Box_list.append(Ground_Truth_Box)

    # single-object in image
    except TypeError as e : 
        # 박스 좌표(왼쪽 위, 오른쪽 아래) 얻기
        x_min = float(xml_file['annotation']['object']['bndbox']['xmin']) 
        y_min = float(xml_file['annotation']['object']['bndbox']['ymin']) 
        x_max = float(xml_file['annotation']['object']['bndbox']['xmax']) 
        y_max = float(xml_file['annotation']['object']['bndbox']['ymax']) 

        # 224*224에 맞게 변형시켜줌
        x_min = float((224/Image_Width)*x_min)
        y_min = float((224/Image_Height)*y_min)
        x_max = float((224/Image_Width)*x_max)
        y_max = float((224/Image_Height)*y_max)

        Ground_Truth_Box = [x_min, y_min, x_max, y_max]  
        Ground_Truth_Box_list.append(Ground_Truth_Box)

    
    Ground_Truth_Box_list = np.asarray(Ground_Truth_Box_list)
    Ground_Truth_Box_list = np.reshape(Ground_Truth_Box_list, (-1, 4))

    return Ground_Truth_Box_list # 이미지에 있는 Ground Truth Box 리스트 받기(numpy)


## RPN - 앵커 준비
#### 입력 이미지 기준으로 앵커를 생성한다.
#### 풀링을 3번 하므로 2^3 = 8이니 (8, 8)부터 (16, 8)...등 8픽셀씩 중심 좌표를 옮겨가며 앵커들을 k개씩 생성한다
#### 생성한 앵커들 중 사용할 가치가 있는 앵커를 걸러낸다(이미지 범위를 벗어나지 않는 앵커들만 선정)
#### 선정한 앵커 중 실제 물체의 box와 얼마나 곂치는지(IoU) 계산해본다. 확실히 겹친다 하는 애들을 Positive 앵커로, 거의 안겹친다 하는 애들은 Negataive 앵커로 선정한다. 애매한 애들은 거른다.
#### 이렇게 생성된 앵커들로 미니배치를 생성한다. Positive 128개, Negative 128개로 만드는게 ideal한 구성이긴 한데 Positive한 앵커가 별로 없다. 그래서 Positive를 128개 못채웠으면 Negataive 앵커로 채워준다. 

In [6]:
# 앵커 생성 함수. 
def make_anchor(anchor_size, anchor_aspect_ratio) :
    # 입력 이미지(그래봤자 224*224긴 하지만)에 맞춰 앵커를 생성해보자 

    anchors = [] # [x,y,w,h]로 이루어진 리스트 
    anchors_state = [] # 이 앵커를 훈련에 쓸건가? 각 앵커별로 사용 여부를 나타낸다. 

    # 앵커 중심좌표 간격
    interval_x = 16
    interval_y = 16
    Center_max_x = 208 # 224 - 16, 중심좌표가 224가 될 수는 없다.
    Center_max_y = 208 # 224 - 16

    # 2단 while문 생성
    x = 8
    y = 8
    index_count = 0
    while(y <= 224): # 8~208 = 14개 
        while(x <= 224): # 8~208 = 14개 
            # k개의 앵커 생성. 여기서 k = len(anchor_size) * len(anchor_aspect_ratio)다
            for i in range(0, len(anchor_size)) : 
                for j in range(0, len(anchor_aspect_ratio)) :
                    anchor_width = anchor_aspect_ratio[j][0] * anchor_size[i]
                    anchor_height = anchor_aspect_ratio[j][1] * anchor_size[i]

                    anchor = [x, y, anchor_width, anchor_height]
                    anchors.append(anchor)
                    # 앵커가 이미지 경계선을 넘나드나? 필터링
                    if((x - (anchor_width/2) >= 0) and (y - (anchor_height/2) >= 0) and
                    (x + (anchor_width/2) <= 224) and (y + (anchor_height/2) <= 224)):
                        # 경계 안에 있으면 1
                        anchors_state.append(int(1))
                    else :
                        anchors_state.append(int(0))
            x = x + interval_x 
        y = y + interval_y
        x = 8
    return np.asarray(anchors), np.asarray(anchors_state) # 넘파이로 반환

### IoU 계산 후 Positive, Negative 앵커 분류

In [7]:
# 앵커들을 Positive, Negative 앵커로 나누고 각 앵커가 참고한 Ground Truth Box와 Class를 반환하자
# RPN에는 '어떤 클래스인가?'는 알 필요가 없다. '객체인가 아닌가'이거 하나만 필요할 뿐. 
def align_anchor(anchors, anchors_state, Ground_Truth_Box_list):

    # 각 앵커는 해당 위치에서 구한 여러가지 Ground truth Box와의 ioU 중 제일 높은거만 가져온다. 
    IoU_List = np.array([])
    Ground_truth_box_Highest_IoU_List = [] # 각 앵커가 어떤 Ground Truth Box를 보고 IoU를 계산했는가?

    #start = time.time()

    for i in range(0, len(anchors)):
        if anchors_state[i] == 0 :
            IoU_List = np.append(IoU_List, 0)
            Ground_truth_box_Highest_IoU_List.append([0,0,0,0])

            if i % 9 == 8 :
                IoU_List_inOneSpot = IoU_List[i-8:i+1]
                for num in list(range(i-8, i + 1)):
                    if IoU_List[num] > 0.7 or (max(IoU_List_inOneSpot) == IoU_List[num] and IoU_List[num] >= 0.3): # positive anchor
                        anchors_state[num] = 2
                    elif IoU_List[num] < 0.3 : # negative anchor
                        anchors_state[num] = 1
                    else: # 애매한 앵커들
                        anchors_state[num] = 0    
        else:
            anchor_minX = anchors[i][0] - (anchors[i][2]/2)
            anchor_minY = anchors[i][1] - (anchors[i][3]/2)
            anchor_maxX = anchors[i][0] + (anchors[i][2]/2)
            anchor_maxY = anchors[i][1] + (anchors[i][3]/2)

            anchor = [anchor_minX, anchor_minY, anchor_maxX, anchor_maxY]

            # 연산 속도 때문에 Box대신 Ground Truth Box의 인덱스를 저장
            IoU_max = 0
            ground_truth_box_Highest_IoU = [0,0,0,0]

            for j in range(0, len(Ground_Truth_Box_list)):

                ground_truth_box = Ground_Truth_Box_list[j]

                InterSection_min_x = max(anchor[0], ground_truth_box[0])
                InterSection_min_y = max(anchor[1], ground_truth_box[1])

                InterSection_max_x = min(anchor[2], ground_truth_box[2])
                InterSection_max_y = min(anchor[3], ground_truth_box[3])

                InterSection_Area = 0

                if (InterSection_max_x - InterSection_min_x + 1) >= 0 and (InterSection_max_y - InterSection_min_y + 1) >= 0 :
                    InterSection_Area = (InterSection_max_x - InterSection_min_x + 1) * (InterSection_max_y - InterSection_min_y + 1)

                box1_area = (anchor[2] - anchor[0]) * (anchor[3] - anchor[1])
                box2_area = (ground_truth_box[2] - ground_truth_box[0]) * (ground_truth_box[3] - ground_truth_box[1])
                Union_Area = box1_area + box2_area - InterSection_Area

                IoU = (InterSection_Area/Union_Area)
                if IoU > IoU_max :
                    IoU_max = IoU
                    ground_truth_box_Highest_IoU = ground_truth_box

            IoU_List = np.append(IoU_List, IoU_max)
            Ground_truth_box_Highest_IoU_List.append(ground_truth_box_Highest_IoU)

            # 한 위치에 9개의 앵커 존재 -> 9개 앵커에 대한 IoU를 계산할 때마다 모아서 Positive, Negative 앵커 분류
            if i % 9 == 8 :
                IoU_List_inOneSpot = IoU_List[i-8:i+1]
                for num in list(range(i-8, i + 1)):
                    if IoU_List[num] > 0.7 or (max(IoU_List_inOneSpot) == IoU_List[num] and IoU_List[num] >= 0.3): # positive anchor
                        anchors_state[num] = 2
                    elif IoU_List[num] < 0.3 : # negative anchor
                        anchors_state[num] = 1
                    else: # 애매한 앵커들
                        anchors_state[num] = 0     

    Ground_truth_box_Highest_IoU_List = np.asarray(Ground_truth_box_Highest_IoU_List)
    Ground_truth_box_Highest_IoU_List = np.reshape(Ground_truth_box_Highest_IoU_List, (-1, 4))
            
    return anchors_state, Ground_truth_box_Highest_IoU_List # 각 앵커의 상태, (모든)앵커가 IoU 계산에 참조한 Ground Truth Box

## RPN을 위한 데이터셋 생성

In [8]:
# RPN훈련을 위한 데이터셋. 
# RPN과 Detector는 별개의 모델이다. 즉, 두 모델을 훈련시킬 때 필요한 데이터셋은 따로따로 만들어야한다. 
# 여기선 RPN 훈련에 필요한 데이터만 만들거다. 
def make_dataset_forRPN(input_list) :
    image_file_list = input_list[0]
    xml_file_list = input_list[1]
    anchors = input_list[2]
    anchors_state = input_list[3]

    image_list = make_input(image_file_list) # 입력
    # 출력
    cls_layer_label_list = np.array([])
    reg_layer_label_list = np.array([])

    # 값 계속 생성하는거 막기위한 변수
    cls_label_forPositive = np.array([1.0,0.0])
    cls_label_forNegative = np.array([0.0,1.0])
    cls_label_forUseless  = np.array([0.0,0.0])

    reg_label_forNotPositive = np.array([0.0, 0.0, 0.0, 0.0])

    for i in tqdm(range(0, len(xml_file_list)), desc="get label"): # 각 이미지별로 데이터셋 생성(5011개)

        anchors_state_for = anchors_state # anchors_state는 매 사진마다 다르니까 원본값(?)을 복사해서 쓴다. 
        Ground_Truth_Box_list = get_Ground_Truth_Box_fromImage(xml_file_list[i]) # 여기서는 Ground Truth Box에 대한 정보만 필요하다
        anchors_state_for, Ground_truth_box_Highest_IoU_List = align_anchor(anchors, anchors_state_for, Ground_Truth_Box_list)
        # 어떤 앵커가 Pos, neg 앵커인지, (모든)앵커가 참조한 ground truth box는 뭔지
    
        #start = time.time()
        # 연산 시간 때문에 생성(1761개치 모아놨다가 한 번에 추가하기)
        cls_layer_label_list_for = np.array([])
        reg_layer_label_list_for = np.array([])
        for j in range(0, len(anchors_state_for)) :
            if anchors_state_for[j] == 2 : # positive
                Ground_truth_box = np.array([Ground_truth_box_Highest_IoU_List[j][0] + Ground_truth_box_Highest_IoU_List[j][2]/2, Ground_truth_box_Highest_IoU_List[j][1] + Ground_truth_box_Highest_IoU_List[j][2]/2, Ground_truth_box_Highest_IoU_List[j][2] - Ground_truth_box_Highest_IoU_List[j][0], Ground_truth_box_Highest_IoU_List[j][3] - Ground_truth_box_Highest_IoU_List[j][1]])
                cls_layer_label_list_for = np.append(cls_layer_label_list_for, cls_label_forPositive)
                reg_layer_label_list_for = np.append(reg_layer_label_list_for, Ground_truth_box) # IoU계산에 참조한(pos, neg 분류에 기여한) Ground Truth Box의 정보 휙득
            elif anchors_state_for[j] == 1 : # negative는 Ground Truth Box 정보가 필요없으니 [0,0,0,0]을 넣는다. 
                cls_layer_label_list_for = np.append(cls_layer_label_list_for, cls_label_forNegative) # 해당 앵커 output이 [0,1] -> negative
                reg_layer_label_list_for = np.append(reg_layer_label_list_for, reg_label_forNotPositive)
            else : 
                cls_layer_label_list_for = np.append(cls_layer_label_list_for, cls_label_forUseless) # 해당 앵커 output이 [0.5, 0.5] -> 무의미한 값
                reg_layer_label_list_for = np.append(reg_layer_label_list_for, reg_label_forNotPositive)
        
        # 넘파이 배열로 변환
        cls_layer_label_list = np.append(cls_layer_label_list, cls_layer_label_list_for)
        reg_layer_label_list = np.append(reg_layer_label_list, reg_layer_label_list_for)
        #print("\nmaking label time :", time.time() - start)

    # 논문에서 말한 출력값 크기에 맞게 reshape
    cls_layer_label_list = np.reshape(cls_layer_label_list, (-1, 1764, 2)) 
    reg_layer_label_list = np.reshape(reg_layer_label_list, (-1, 1764, 4))

    image_list = image_list.astype('float32')

    return image_list, cls_layer_label_list, reg_layer_label_list # 훈련 데이터들(입, 출력)

## RPN 생성

In [10]:
class RPN(tf.keras.Model):
    def __init__(self, initializer, regularizer, SharedConvNet, anchors, anchor_size, anchor_aspect_ratio):
        super(RPN, self).__init__(name='rpn')

        self.anchors = anchors
        self.anchor_size = anchor_size
        self.anchor_aspect_ratio = anchor_aspect_ratio
        self.Optimizers = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9)

        # 공용 레이어
        self.conv1_1 = SharedConvNet.layers[0]
        self.conv1_2 = SharedConvNet.layers[1]
        self.pooling_1 = SharedConvNet.layers[2]

        self.conv2_1 = SharedConvNet.layers[3]
        self.conv2_2 = SharedConvNet.layers[4]
        self.pooling_2 = SharedConvNet.layers[5]

        self.conv3_1 = SharedConvNet.layers[6]
        self.conv3_2 = SharedConvNet.layers[7]
        self.conv3_3 = SharedConvNet.layers[8]
        self.pooling_3 = SharedConvNet.layers[9]

        self.conv4_1 = SharedConvNet.layers[10]
        self.conv4_2 = SharedConvNet.layers[11]
        self.conv4_3 = SharedConvNet.layers[12]
        self.pooling_4 = SharedConvNet.layers[13]

        self.conv5_1 = SharedConvNet.layers[14]
        self.conv5_2 = SharedConvNet.layers[15]
        self.conv5_3 = SharedConvNet.layers[16]
    
        # RPN만의 레이어
        self.intermediate_layer = tf.keras.layers.Conv2D(512, (3, 3), padding = 'SAME' , activation = 'relu', name = "intermediate_layer", dtype='float32')
        self.cls_Layer = tf.keras.layers.Conv2D(18, (1, 1), kernel_initializer=initializer, padding = 'SAME' ,kernel_regularizer = regularizer, name = "output_1", dtype='float32')
        self.reg_layer = tf.keras.layers.Conv2D(36, (1, 1), kernel_initializer=initializer, padding = 'SAME' ,kernel_regularizer = regularizer, name = "output_2", dtype='float32')
    
    def call(self, inputs):
        # 정방향 연산
        # inputs = np.array(inputs)
        output = self.conv1_1(inputs)
        output = self.conv1_2(output)
        output = self.pooling_1(output)

        output = self.conv2_1(output)
        output = self.conv2_2(output)
        output = self.pooling_2(output)

        output = self.conv3_1(output)
        output = self.conv3_2(output)
        output = self.conv3_3(output)
        output = self.pooling_3(output)

        output = self.conv4_1(output)
        output = self.conv4_2(output)
        output = self.conv4_3(output)
        output = self.pooling_4(output)

        output = self.conv5_1(output)
        output = self.conv5_2(output)
        shared_output = self.conv5_3(output)
        # RPN
        shared_output = self.intermediate_layer(shared_output)
        cls_Layer_output = self.cls_Layer(shared_output) # (1,14,14,18)
        reg_Layer_output = self.reg_layer(shared_output) # (1,14,14,36)

        cls_layer_output = tf.reshape(cls_layer_output[0], [1764,2])
        cls_layer_output = tf.nn.softmax(cls_layer_output) 
        
        reg_layer_output = tf.reshape(reg_Layer_output[0], [1764,4])
        anchor_Use = np.reshape(self.anchors, (-1,4))
        anchor_tensor = tf.convert_to_tensor(anchor_Use, dtype=tf.float32)
        reg_layer_output = tf.math.add(reg_layer_output, anchor_tensor)
        
        return cls_Layer_output, reg_Layer_output # (1764, 2), (1764, 4) 텐서 반환

    def get_minibatch_index(self, cls_layer_output): # 라벨이니까 (1764,2) 넘파이 온다

        index_list = np.array([])
        
        index_list = np.zeros(1764) # 각 앵커가 미니배치 뽑혔나 안뽑혔나
        index_pos = np.array([])
        index_neg = np.array([])
        # cls_layer_output을 보고 긍정, 부정 앵커 분류. 그렇게 데이터셋을 구성함
        for i in range(0, 1764):
            if cls_layer_output[i][0] == 1.0 : # positive anchor
                index_pos = np.append(index_pos, i)
            elif cls_layer_output[i][0] == 0.0 : # negative anchor
                index_neg = np.append(index_neg, i)

        max_for = min([128, len(index_pos)])
        ran_list = random.sample(range(0, len(index_pos)), max_for)

        for i in range(0, len(ran_list)) :
            index = int(index_pos[ran_list[i]])
            index_list[index] = 1

        ran_list = random.sample(range(0, len(index_neg)), 256 - max_for) # 랜덤성 증가?를 위해 또다시 난수 생성
        for i in range(0, len(ran_list)) :
            index = int(index_neg[ran_list[i]])
            index_list[index] = 1

        return index_list # (1764,1) <- 1,0으로 이루어진 boolean 넘파이 배열

    # multi task loss
    def multi_task_loss(self ,image ,cls_layer_output_label, reg_layer_output_label):

        cls_layer_output, reg_layer_output = self.call(image) # (1764,2), (1764,4) 텐서 휙득
        minibatch_index_list = self.get_minibatch_index(cls_layer_output_label) # 미니배치 인덱스 휙득

        # label은 (1764,2)와 (1764,4)임
        tensor_cls_label = tf.convert_to_tensor(cls_layer_output_label, dtype=tf.float32)
        tensor_reg_label = tf.convert_to_tensor(reg_layer_output_label, dtype=tf.float32)

        # loss 계산(Loss 텐서에서 미니배치에 해당되는 애들만 걸러내야함)
        Cls_Loss = tf.nn.softmax_cross_entropy_with_logits(labels=tensor_cls_label, logits = cls_layer_output) # (1764,1) 텐서

        filter_x = tf.Variable([[1.0],[0.0],[0.0], [0.0]])
        filter_y = tf.Variable([[0.0],[1.0],[0.0], [0.0]])
        filter_w = tf.Variable([[0.0],[0.0],[1.0], [0.0]])
        filter_h = tf.Variable([[0.0],[0.0],[0.0], [1.0]])

        x = tf.matmul(reg_layer_output,filter_x)
        y = tf.matmul(reg_layer_output,filter_y)
        w = tf.matmul(reg_layer_output,filter_w)
        h = tf.matmul(reg_layer_output,filter_h)

        anchor_Use = np.reshape(self.anchors, (-1,4))
        anchor_tensor = tf.convert_to_tensor(anchor_Use, dtype=tf.float32)

        x_a = tf.matmul(anchor_tensor,filter_x)
        y_a = tf.matmul(anchor_tensor,filter_y)
        w_a = tf.matmul(anchor_tensor,filter_w)
        h_a = tf.matmul(anchor_tensor,filter_h)

        x_star = tf.matmul(tensor_reg_label,filter_x)
        y_star = tf.matmul(tensor_reg_label,filter_y)
        w_star = tf.matmul(tensor_reg_label,filter_w)
        h_star = tf.matmul(tensor_reg_label,filter_h)

        denominator = tf.log(tf.constant(10, dtype=tf.float32)) # 텐서 로그는 ln밖에 없어서 ln10을 구한 뒤 나누는 방식으로 log10을 구한다(로그의 밑변환 공식)

        # 4개만 떼서 계산하니까 잘됨
        t_x = tf.math.divide(tf.subtract(x, x_a), w_a)
        t_y = tf.math.divide(tf.subtract(y, y_a), h_a)
        t_w = tf.math.divide(tf.math.log(tf.math.divide(w, w_a)), denominator)
        t_w = tf.math.divide(tf.math.log(tf.math.divide(h, h_a)), denominator)

        t_x_star = tf.math.divide(tf.math.subtract(x_star, x_a), w_a)
        t_y_star = tf.math.divide(tf.math.subtract(y_star, y_a), h_a)
        t_w_star = tf.math.devide(tf.math.log(tf.divide(w_star, w_a)), denominator)
        t_h_star = tf.math.devide(tf.math.log(tf.divide(h_star, h_a)), denominator)

        # (1764,1)에 해당하는 t_x, t_y...을 구했다. 여기서 미니배치에 해당되는 애들만 걸러낸다. 

        # 미니배치에 해당되는 애들만 0이 아닌 값으로 만들기. 미니배치 리스트는 미니배치에 해당되는 인덱스는 1이고 나머지는 다 0이니까 tf.math.multiply를 사용해 원소별 곱을 해주면 미니배치에 해당되는 값들만 얻을 수 있다. 
        minibatch_index_list = np.reshape(minibatch_index_list, (1764,1)) # (1764,1)로 reshape해주기
        minibatch_index_tensor = np.reshape(minibatch_index_list, dtype=tf.float32) # 텐서로 변환

        # 다 곱해서 미니배치 성분만 남기기
        t_x_minibatch = tf.math.multiply(t_x, minibatch_index_tensor)
        t_y_minibatch = tf.math.multiply(t_x, minibatch_index_tensor)
        t_w_minibatch = tf.math.multiply(t_x, minibatch_index_tensor)
        t_h_minibatch = tf.math.multiply(t_x, minibatch_index_tensor)

        t_x_star_minibatch = tf.math.multiply(t_x_star, minibatch_index_tensor)
        t_y_star_minibatch = tf.math.multiply(t_y_star, minibatch_index_tensor)
        t_w_star_minibatch = tf.math.multiply(t_w_star, minibatch_index_tensor)
        t_h_star_minibatch = tf.math.multiply(t_h_star, minibatch_index_tensor)

        Cls_Loss_minibatch = tf.math.multiply(Cls_Loss, minibatch_index_tensor)

        # 각 성분별로 1764개분 Loss를 다 합친 4개의 값이 나왔다
        # X성질에 대한 Smooth L1. huber_loss에서 delta = 1로 하면 smooth L1과 같다.
        # 미니배치 성분만 뽑아내서 미니배치가 아닌 인덱스의 값은 0인데 Smooth L1에서 |x| < 1이면 0.5*x^2니까 0이 나오며 이는 loss에 어떠한 영향을 미치지 않는다. 
        x_huber_loss = tf.compat.v1.losses.huber_loss(t_x_star_minibatch, t_x_minibatch) 
        y_huber_loss = tf.compat.v1.losses.huber_loss(t_y_star_minibatch, t_y_minibatch)
        w_huber_loss = tf.compat.v1.losses.huber_loss(t_w_star_minibatch, t_w_minibatch)
        h_huber_loss = tf.compat.v1.losses.huber_loss(t_h_star_minibatch, t_h_minibatch)

        # 한 번에 더하니까 에러가 발생해 tf.math.add로 두개씩 더한다.
        Reg_Loss = tf.math.add(x_huber_loss, y_huber_loss)   
        Reg_Loss = tf.math.add(Reg_Loss, w_huber_loss) # (x_huber_loss + y_huber_loss) + w_huber_loss
        Reg_Loss = tf.math.add(Reg_Loss, h_huber_loss) # (x_huber_loss + y_huber_loss + w_huber_loss) + h_huber_loss

        N_reg = tf.constant([1764.0])
        N_cls = tf.constant([10.0/256.0]) # lambda도 곱한 값

        loss_cls = tf.multiply(N_reg, tf.reduce_sum(Cls_Loss_minibatch))
        loss_reg = tf.multiply(N_cls, Reg_Loss)

        loss = tf.add(loss_cls, loss_reg)
        
        return loss

    def get_grad(self, Loss, cls_reg_boolean): # cls_reg_boolean = 0이면 cls, cls_reg_boolean = 1 이면 reg
        g = 0

        with tf.GradientTape() as tape:
            tape.watch(self.conv1_1.variables)
            tape.watch(self.conv1_2.variables)
            tape.watch(self.conv2_1.variables)
            tape.watch(self.conv2_2.variables)
            tape.watch(self.conv3_1.variables)
            tape.watch(self.conv3_2.variables)
            tape.watch(self.conv3_3.variables)
            tape.watch(self.conv4_1.variables)
            tape.watch(self.conv4_2.variables)
            tape.watch(self.conv4_3.variables)
            tape.watch(self.conv5_1.variables)
            tape.watch(self.conv5_2.variables)
            tape.watch(self.conv5_3.variables)
            tape.watch(self.intermediate_layer.variables)

            if cls_reg_boolean == 0:
                tape.watch(self.cls_Layer.variables)
            else:
                tape.watch(self.reg_layer.variables)

            if cls_reg_boolean == 0:
                g = tape.gradient(Loss, [self.conv1_1.variables[0], self.conv1_1.variables[1],self.conv1_2.variables[0], self.conv1_2.variables[1],self.conv2_1.variables[0], self.conv2_1.variables[1],self.conv2_2.variables[0], self.conv2_2.variables[1], self.conv3_1.variables[0], self.conv3_1.variables[1], self.conv3_2.variables[0],self.conv3_2.variables[1], self.conv3_3.variables[0],self.conv3_3.variables[1], self.conv4_1.variables[0],self.conv4_1.variables[1], self.conv4_2.variables[0],self.conv4_2.variables[1], self.conv4_3.variables[0],self.conv4_3.variables[1], self.conv5_1.variables[0],self.conv5_2.variables[1], self.conv5_3.variables[0],self.conv5_3.variables[1], self.intermediate_layer.variables[0],self.intermediate_layer.variables[1], self.cls_Layer.variables[0],self.cls_Layer.variables[1]])
            else:
                g = tape.gradient(Loss, [self.conv1_1.variables[0], self.conv1_1.variables[1],self.conv1_2.variables[0], self.conv1_2.variables[1],self.conv2_1.variables[0], self.conv2_1.variables[1],self.conv2_2.variables[0], self.conv2_2.variables[1], self.conv3_1.variables[0], self.conv3_1.variables[1], self.conv3_2.variables[0],self.conv3_2.variables[1], self.conv3_3.variables[0],self.conv3_3.variables[1], self.conv4_1.variables[0],self.conv4_1.variables[1], self.conv4_2.variables[0],self.conv4_2.variables[1], self.conv4_3.variables[0],self.conv4_3.variables[1], self.conv5_1.variables[0],self.conv5_2.variables[1], self.conv5_3.variables[0],self.conv5_3.variables[1], self.intermediate_layer.variables[0],self.intermediate_layer.variables[1], self.reg_layer.variables[0],self.reg_layer.variables[1]])
        return g
    
    def App_Gradient(self, Loss, training_step) :
        if training_step == 1:
            g_cls = self.get_grad(Loss, 0)
            self.Optimizers.apply_gradients(zip(g_cls, [self.conv3_1.variables[0], self.conv3_1.variables[1], self.conv3_2.variables[0],self.conv3_2.variables[1], self.conv3_3.variables[0],self.conv3_3.variables[1], self.conv4_1.variables[0],self.conv4_1.variables[1], self.conv4_2.variables[0],self.conv4_2.variables[1], self.conv4_3.variables[0],self.conv4_3.variables[1], self.conv5_1.variables[0],self.conv5_2.variables[1], self.conv5_3.variables[0],self.conv5_3.variables[1], self.intermediate_layer.variables[0],self.intermediate_layer.variables[1], self.cls_Layer.variables[0],self.cls_Layer.variables[1]]))

            g_reg = self.get_grad(Loss, 1)
            self.Optimizers.apply_gradients(zip(g_reg, [self.conv3_1.variables[0], self.conv3_1.variables[1], self.conv3_2.variables[0],self.conv3_2.variables[1], self.conv3_3.variables[0],self.conv3_3.variables[1], self.conv4_1.variables[0],self.conv4_1.variables[1], self.conv4_2.variables[0],self.conv4_2.variables[1], self.conv4_3.variables[0],self.conv4_3.variables[1], self.conv5_1.variables[0],self.conv5_2.variables[1], self.conv5_3.variables[0],self.conv5_3.variables[1], self.intermediate_layer.variables[0],self.intermediate_layer.variables[1], self.reg_layer.variables[0],self.reg_layer.variables[1]]))

        if training_step == 3:
            g_cls = self.get_grad(Loss, training_step, 0)
            self.Optimizers.apply_gradients(zip(g_cls, [self.intermediate_layer.variables[0],self.intermediate_layer.variables[1], self.cls_Layer.variables[0],self.cls_Layer.variables[1]]))

            g_reg = self.get_grad(Loss, training_step, 1)
            self.Optimizers.apply_gradients(zip(g_reg, [self.intermediate_layer.variables[0],self.intermediate_layer.variables[1], self.reg_layer.variables[0],self.reg_layer.variables[1]]))
        
    def Training_model(self, image_list, cls_layer_ouptut_label_list, reg_layer_ouptut_label_list, training_step):

        for i in tqdm(range(0, len(image_list)), desc = "training RPN"):
            image = np.expand_dims(image_list[i], axis = 0) # (1,224,224,3)으로 제작
            Loss = self.multi_task_loss(image, cls_layer_ouptut_label_list[i], reg_layer_ouptut_label_list[i])
            self.App_Gradient(Loss, training_step)

## Fast R-CNN

### 한 이미지에 대한 출력값 중 object score가 0.7 이상인 앵커들을 선별한다
### 앞서 가공한 출력값을 사용한다. 여기서 cls_output은 (1764,2)고 reg_output은 (1764,4)다.

### Fast R-CNN을 위한 클래스 라벨 데이터를 생성
#### 이미지별 Ground Truth Box와 Classes 정보를 얻는다 

In [18]:
# 데이터셋에 존재하는 클래스가 얼마나 있는지 알아낸다
def get_Classes_inImage(xml_file_list):
    Classes_inDataSet = []

    for xml_file_path in xml_file_list: 

        f = open(xml_file_path)
        xml_file = xmltodict.parse(f.read())
        # 사진에 객체가 여러개 있을 경우
        try: 
            for obj in xml_file['annotation']['object']:
                classes.append(obj['name'].lower()) # 들어있는 객체 종류를 알아낸다
        # 사진에 객체가 하나만 있을 경우
        except TypeError as e: 
            classes.append(xml_file['annotation']['object']['name'].lower()) 
        f.close()

    Classes_inDataSet = list(set(Classes_inDataSet)) # set은 중복된걸 다 제거하고 유니크한? 아무튼 하나만 가져온다. 그걸 리스트로 만든다
    Classes_inDataSet.sort() # 정렬

    return Classes_inDataSet

In [29]:
def make_Cls_DataSet_forFastRCNN(xml_file_list, Classes_inDataSet):
    # Label List를 받아 데이터셋에 어떤 클래스가 있는지 알아내고 클래스 종류를 받아 이미지별 어떤 클래스가 있는지 Ground Truth Box별로 one-hot encoding을 해서 반환한다
    # 이미지별 GroundTruthBox도 반환한다
    num_Classes = len(Classes_List) # 데이터셋에 클래스가 몇종류인가?

    # 클래스 리스트를 알았으니 데이터셋을 만들어보자
    # 훈련 이미지 5011개 분의 데이터를 얻어야한다
    Cls_labels_for_FastRCNN = []
    Reg_labels_for_FastRCNN = []

    for i in tqdm(range(0, len(xml_file_list)), desc = "get_dataset_forFASTRCNN"):
        GroundTruthBoxes_inImage = get_Ground_Truth_Box_fromImage(xml_file_list[i]) # 이미지별 Ground Truth Box 리스트. (n, 4)크기의 리스트 받음

        classes = []
        f = open(xml_file_list[i])
        xml_file = xmltodict.parse(f.read())
        # 사진에 객체가 여러개 있을 경우
        try: 
            for obj in xml_file['annotation']['object']:
                classes.append(obj['name'].lower()) # 들어있는 객체 종류를 알아낸다
        # 사진에 객체가 하나만 있을 경우
        except TypeError as e: 
            classes.append(xml_file['annotation']['object']['name'].lower()) 

        # 한 이미지에서 얻은 클래스 리스트에서 각 클래스가 Classes_List 내에서 어떤 인덱스 번호를 갖고 있는지 알아낸다.
        # 그 인덱스 번호로 원-핫 인코딩 수행
        cls_index_list = []
        for class_val in classes :
            cls_index = Classes_inDataSet.index(class_val) # 클래스가 Classes_inDataSet 내에서 어떤 인덱스 번호를 갖고 있는가?
            cls_index_list.append(cls_index)# 한 이미지 내에 있는 Ground Truth Box별로 갖고 있는 클래스 인덱스를 저장
        cls_onehot_inImage = np.eye(len(Classes_inDataSet))[cls_index_list] # (n,21) 크기의 리스트 받음. 여기서 n은 한 이미지 내에 있는 객체 숫자

        # 저장
        Reg_labels_for_FastRCNN.append(GroundTruthBoxes_inImage)
        Cls_labels_for_FastRCNN.append(cls_onehot_inImage)

    return Reg_labels_for_FastRCNN, Cls_labels_for_FastRCNN # 이미지별 Ground Truth Box와 Classes 리스트

### RPN에서 뽑아낸 1761(224*224 이미지를 입력으로 받는 VGG16 기준)개의 Box 중 object score가 0.7보다 큰 것들을 RoI로 선정한다.  

In [63]:
def nms(cls_layer_output, reg_layer_output): # 한 이미지 안에 있는 RoI를 선별. (1764,2), (1764,4) 텐서를 받음
    # 넘파이 배열로 변환
    cls_layer_output = cls_layer_output.numpy()
    reg_layer_output = reg_layer_output.numpy()

    nms_RoI_inImage = [] # 한 이미지에 들어있는 RoI 리스트

    for i in range(0, len(cls_layer_output)) :
        if cls_layer_output[i][0] > 0.7 : # 해당 앵커의 object score가 0.7을 넘겼으면 RoI로 선정. 테스트를 위해 0.6으로
            nms_RoI_inImage.append(reg_layer_output[i].tolist()) # RoI 추가

    return nms_RoI_inImage # RoI 반환

In [64]:
def get_nms_list(RPN_Model, image_list) :
    # Detector 훈련에 필요한 데이터를 얻는 곳이다

    NMS_RoIs_List = [] # 전체 입력 이미지의 RoI를 이미지별로 저장(리스트 안에 리스트)
    NMS_GroundTruthBoxes_List = []
    NMS_Classes_List = []

    for i in tqdm(range(0, len(cls_layer_output_list)), desc = "get_RoI"): # 5011개에 대한 nms 구한다
        cls_layer_output, reg_layer_output = RPN_Model(image_list) # output을 얻는다

        nms_RoI_inImage = nms(cls_layer_output, reg_layer_output) # # 각 이미지에서 RoI들 구하기
        NMS_RoIs_List.append(nms_RoI_inImage) # 각 이미지에서 얻은 RoI를 넣기

    return NMS_RoIs_List # (5011, list) 리스트를 반환

## Detector 모델 생성

### RoI Pooling Layer 제작

In [80]:
# Pooling 작업을 RoI 지역에 대해 한다. 14*14*512를 7*7*512로
class RoiPoolingLayer(tf.keras.layers.Layer):
    def __init__(self, pool_size):
        super(RoiPoolingLayer, self).__init__(name='RoI_Pooling_Layer')
        self.pool_size = pool_size # VGG16에서는 7*7
        
    def build(self, input_shape): # input shape로 (1,14,14,512)와 같이 받으니까 3번 원소 자리의 값인 512를 가져간다. 
        self.nb_channels = input_shape[3] # 채널 조정
        # 맨처음 입력받을 때 채널 숫자를 받는다. 풀링이라 채널 개수를 유지해야하기 때문

    def compute_output_shape(self, input_shape): # If the layer has not been built, this method will call build on the layer. 
        return None, self.num_rois, self.pool_size, self.pool_size, self.nb_channels

    def call(self, image, RoI_inImage): # 정방향 연산. shared FeatureMap (1,14,14,512)와 입력 이미지에 있는 RoI 리스트를 받는다. RoI_inImage는 리스트다
        # 해야될거
        # 1. RoiPooling을 위해 RoI 영역을 (14,14)에 맞게 변형하기

        RoiPooling_outputs_List = [] # RoI 부근을 잘라낸 뒤 7*7로 만들어낸 것들을 여기에 모은다. 그러면 (n,1,7,7,512)가 되겠지

        for i in range(0, len(RoI_inImage)): # 이미지 당 RoI 갯수만큼 for문 반복 -> RoI 갯수만큼 특성맵 얻으려고
            # 224 -> 14로 16배 줄어들었으니 이에 맞춰 RoI도 줄인다. 
            # RoI 양식이(x,y,w,h)였는데 이를 (r,c,w,h)로 바꿔준다. r,c는 왼쪽 위 좌표(min x, min y)고 w,h,는 RoI의 너비, 높이다. 

            r = RoI_inImage[i][0] - RoI_inImage[i][2]/2
            c = RoI_inImage[i][1] - RoI_inImage[i][3]/2
            w = RoI_inImage[i][2]
            h = RoI_inImage[i][3]
            
            # 1/16배로 만들기
            r = round(r / 16.0)
            c = round(c / 16.0)
            w = round(w / 16.0)
            h = round(h / 16.0)

            # 제일 큰 앵커 사이즈가 128*128인데 이는 (14,14)에서 (8,8)이 된다. 
            # 제일 작은 앵커는 (2,2)이다. 그래서 나는 'by copying 7 times each cell and then max-pooling back to 7x7', 즉 이미지의 각 셀을 7*7로 복사한 뒤 (7*7)로 max Pooling하는 방식을 사용하고자 했다.
            # 아이디어 출처 : https://stackoverflow.com/questions/48163961/how-do-you-do-roi-pooling-on-areas-smaller-than-the-target-size
            # 그러나 이 방식은 너무 까다로워 max pooling의 대체지인 resize를 사용하기로 했다. 픽셀간 경계부분 말고는 7*7 출력값이 달라지는게 없어서 큰 차이는 없을것으로 예상된다.
            image_inRoI = image[:, c:c+h, r:r+w, :] # RoI에 해당되는 부분을 추출한다.
            image_resize = tf.image.resize(image_inRoI, (self.pool_size, self.pool_size)) # 7*7로 resize
            RoiPooling_outputs_List.append(image_resize)

        # RoiPooling_outputs_List는 (1,7,7,512) 텐서들로 이루어진 리스트다
        final_Pooling_output = tf.concat(RoiPooling_outputs_List, axis=0) # [resize_RoI, resize_RoI, resize_RoI]...리스트를 하나의 텐서로 통합. 밑에 붙히는 방식으로 쫘라락 붙힌다

        final_Pooling_output = tf.reshape(final_Pooling_output, (1, len(RoI_inImage), self.pool_size, self.pool_size, self.nb_channels)) # 통합한걸 (1,RoI 개수,7,7,512)로 reshape

        return final_Pooling_output # (1,RoI 개수,7,7,512) 텐서를 반환
    
    def get_config(self): # 구성요소 반환
        config = {'pool_size': self.pool_size,
                  'num_rois': self.num_rois}
        base_config = super(RoiPoolingLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

In [112]:
class Detector(tf.keras.Model):
    def __init__(self, initializer, regularizer, SharedConvNet):
        super(Detector, self).__init__(name='Detector')
        # 레이어 
        # 공용 레이어(14*14*512)를 받아서 RoI Pooling Layer에 넣어 7*7*512를 만들고 FCs를 거쳐 두가지 Output을 생성한다. 

        # 클래스 분류 레이어와 박스 위치 회귀 레이어의 초기화를 다르게 해야한다. 
        Classify_layer_initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None)
        Box_regression_layer_initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.001, seed=None)

        # 공용 컨볼루전 레이어들 추가
        self.conv1_1 = SharedConvNet.layers[0]
        self.conv1_2 = SharedConvNet.layers[1]
        self.pooling_1 = SharedConvNet.layers[2]

        self.conv2_1 = SharedConvNet.layers[3]
        self.conv2_2 = SharedConvNet.layers[4]
        self.pooling_2 = SharedConvNet.layers[5]

        self.conv3_1 = SharedConvNet.layers[6]
        self.conv3_2 = SharedConvNet.layers[7]
        self.conv3_3 = SharedConvNet.layers[8]
        self.pooling_3 = SharedConvNet.layers[9]

        self.conv4_1 = SharedConvNet.layers[10]
        self.conv4_2 = SharedConvNet.layers[11]
        self.conv4_3 = SharedConvNet.layers[12]
        self.pooling_4 = SharedConvNet.layers[13]

        self.conv5_1 = SharedConvNet.layers[14]
        self.conv5_2 = SharedConvNet.layers[15]
        self.conv5_3 = SharedConvNet.layers[16]

        # RoI Pooling : H*W(7*7)에 맞게 입력 특성맵을 pooling. RoI에 해당하는 영역을 7*7로 Pooling한다. 
        self.RoI_Pooling_Layer = RoiPoolingLayer(7) # Pooling 이후 크기를 7*7*512로 만든다. -> (1,num_roi,7,7,512)
        self.Flatten_layer = tf.keras.layers.Flatten() # num_roi*7*7*512개의 텐서가 일렬로 나열됨
        self.Fully_Connected = tf.keras.layers.Dense(4096, activation='relu') # 별 말이 없으니 기본적으로 지정된 kernel_initializer를 사용하자. 여기선 RoI별 [1, 7*7*512] 텐서를 넣는다.
        self.Classify_layer = tf.keras.layers.Dense(21, activation='softmax', kernel_initializer = Classify_layer_initializer, name = "output_1")
        self.Reg_layer = tf.keras.layers.Dense(84, activation= None, kernel_initializer = Box_regression_layer_initializer, name = "output_2")
    
    def call(self, Image, RoI_list): # input으로 (224,224,3)과 RoI를 받는다. RoI는 앞서 RPN에서 뽑아낸걸 쓴다. 

        output = self.conv1_1(Image)
        output = self.conv1_2(output)
        output = self.pooling_1(output)

        output = self.conv2_1(output)
        output = self.conv2_2(output)
        output = self.pooling_2(output)

        output = self.conv3_1(output)
        output = self.conv3_2(output)
        output = self.conv3_3(output)
        output = self.pooling_3(output)

        output = self.conv4_1(output)
        output = self.conv4_2(output)
        output = self.conv4_3(output)
        output = self.pooling_4(output)

        output = self.conv5_1(output)
        output = self.conv5_2(output)
        shared_output = self.conv5_3(output)
        
        final_Pooling_output = self.RoI_Pooling_Layer(shared_output, RoI_list) # 공용 레이어와 RoI를 넣어 (1, RoI 개수, 7,7,512) 휙득

        flatten_ouput_forAllRoI = self.Flatten_layer(final_Pooling_output) # Flatten으로 한줄 세우기 = (7*7*512) * RoI 개수

        flatten_perRoI = tf.split(flatten_ouput_forAllRoI, num_or_size_splits = len(RoI_list)) # (1, 7*7*512)텐서가 모인 리스트로 만들었다.

        Classify_layer_output = []
        Reg_layer_output = []

        for i in range(0, len(flatten_perRoI)):
            flatten_output = flatten_perRoI[i] # flatten된걸 하나씩 꺼냄
            Fully_Connected_output = self.Fully_Connected(flatten_output) # FCs로 만들기
            # 객체 분류 레이어, 박스 회귀 레이어
            cls_output = self.Classify_layer(Fully_Connected_output) 
            reg_output = self.Reg_layer(Fully_Connected_output)

            Classify_layer_output.append(cls_output)
            Reg_layer_output.append(reg_output)

        return Classify_layer_output, Reg_layer_output # Classify_layer_output : [1,21] 텐서가 len(RoI_list)개 모인 리스트, Reg_layer_output : [1, 84] 텐서가 len(RoI_list)개 모인 리스트

    # 필요한거 : multi task loss, gradient 계산, 적용
    def multi_task_loss(self, image, RoI_list, Ground_Truth_Box_list, Cls_label_list):
        # image는 (1,224,224,3), RoI_list는 (x,y,w,h)양식인 박스들 64개, Ground_Truth_Box_list는 (x_min, y_min, x_max, y_max)인 애들이 64개, Cls_label_list는 (,21)인 원-핫 인코딩된 애들이 64개
        # 여기서는 64개의 RoI에 해당하는 Loss 64개를 구한다. 
        Classify_layer_output, Reg_layer_output = self.call(image, RoI_list) # tensor들이 64개씩 모인 리스트 2개 휙득. Reg_layer_output은 (x,y,w,h)인 텐서들이 21개 일렬로 있다 -> (1, 84) 텐서

        loss_list = []

        for i in range(0, 64) : # index 0~15는 IoU가 0.5이상인 RoI들, 16~63은 IoU가 0.1~0.49999...인 RoI들
            # 각 RoI별 리스트 하나씩 꺼냄
            cls_output = Classify_layer_output[i]
            reg_output = Reg_layer_output[i]
            # 라벨값도 하나씩 꺼냄
            ground_truth_box = Ground_Truth_Box_list[i]
            cls_label = Cls_label_list[i]

            # loss 계산
            cls_loss = tf.nn.softmax_cross_entropy_with_logits(labels=cls_label, logits=cls_output)

            reg_loss = 0
            if i < 16:
                # (1,84)에서 해당 클래스에 해당하는 값을 얻어야한다(예 : '자동차'객체에 대한 박스 위치 추측값)
                # 논문에서 (x,y,w,h)에 대한 smooth l1을 구하라길래 ground_truth_box를 (x,y,w,h)로 바꿔주고자 한다
                gtb = tf.constant([ground_truth_box[0] + ground_truth_box[2]/2, ground_truth_box[1] + ground_truth_box[3]/2, ground_truth_box[2] - ground_truth_box[0], ground_truth_box[3] - ground_truth_box[1]])
                class_index = tf.argmax(cls_label) # 라벨값의 원-핫 인코딩에서 가장 큰 값의 인덱스 = 클래스의 인덱스에 해당. 
                pred_box = reg_output[4*class_index:4*class_index + 4] # 예측값에서 해당 클래스에 해당되는 박스 좌표를 불러온다. 
                reg_loss = tf.compat.v1.losses.huber_loss(gtb, pred_box) # (x,y,w,h) 각 성분에 대해 smoothL1(ti −vi)한 값을 다 더한게 나온다. 
        
            loss = tf.add(cls_loss, reg_loss)
            loss_list.append(loss) # loss list에 loss를 넣는다.

        return loss_list # 64개의 loss로 이루어진 리스트를 반환

    def get_grad(self, Loss, cls_reg_boolean): # Loss 하나씩 그래디언트 구하기
        g = 0
        with tf.GradientTape() as tape:
            tape.watch(self.conv1_1.variables)
            tape.watch(self.conv1_2.variables)
            tape.watch(self.conv2_1.variables)
            tape.watch(self.conv2_2.variables)
            tape.watch(self.conv3_1.variables)
            tape.watch(self.conv3_2.variables)
            tape.watch(self.conv3_3.variables)
            tape.watch(self.conv4_1.variables)
            tape.watch(self.conv4_2.variables)
            tape.watch(self.conv4_3.variables)
            tape.watch(self.conv5_1.variables)
            tape.watch(self.conv5_2.variables)
            tape.watch(self.conv5_3.variables)
            tape.watch(self.Fully_Connected.variables)

            if cls_reg_boolean == 0:
                tape.watch(self.Classify_layer.variables)
            else:
                tape.watch(self.Reg_layer.variables)

            
            if cls_reg_boolean == 0:
                g = tape.gradient(Loss, [self.conv1_1.variables[0], self.conv1_1.variables[1],self.conv1_2.variables[0], self.conv1_2.variables[1],self.conv2_1.variables[0], self.conv2_1.variables[1],self.conv2_2.variables[0], self.conv2_2.variables[1], self.conv3_1.variables[0], self.conv3_1.variables[1], self.conv3_2.variables[0],self.conv3_2.variables[1], self.conv3_3.variables[0],self.conv3_3.variables[1], self.conv4_1.variables[0],self.conv4_1.variables[1], self.conv4_2.variables[0],self.conv4_2.variables[1], self.conv4_3.variables[0],self.conv4_3.variables[1], self.conv5_1.variables[0],self.conv5_2.variables[1], self.conv5_3.variables[0],self.conv5_3.variables[1], self.Fully_Connected.variables[0],self.Fully_Connected.variables[1], self.Classify_layer.variables[0],self.Classify_layer.variables[1]])
            else:
                g = tape.gradient(Loss, [self.conv1_1.variables[0], self.conv1_1.variables[1],self.conv1_2.variables[0], self.conv1_2.variables[1],self.conv2_1.variables[0], self.conv2_1.variables[1],self.conv2_2.variables[0], self.conv2_2.variables[1], self.conv3_1.variables[0], self.conv3_1.variables[1], self.conv3_2.variables[0],self.conv3_2.variables[1], self.conv3_3.variables[0],self.conv3_3.variables[1], self.conv4_1.variables[0],self.conv4_1.variables[1], self.conv4_2.variables[0],self.conv4_2.variables[1], self.conv4_3.variables[0],self.conv4_3.variables[1], self.conv5_1.variables[0],self.conv5_2.variables[1], self.conv5_3.variables[0],self.conv5_3.variables[1], self.Fully_Connected.variables[0],self.Fully_Connected.variables[1], self.Reg_layer.variables[0],self.Reg_layer.variables[1]])

        return g

    def App_Gradient(self, Loss_list, training_step) : # 64개의 loss로 이루어진 리스트를 받는다. training_step은 2 아니면 4다. 
        if training_step == 2:
            grad_acc_cls = tf.Variable(0.0)
            grad_acc_reg = tf.Variable(0.0)
            for i in range (0, 64) :
                Loss = Loss_list[i]
                # Detector는 로스 하나하나 적용함
                g_cls = self.get_grad(Loss, 0)
                self.Optimizers.apply_gradients(zip(g_cls, [self.Fully_Connected.variables[0],self.Fully_Connected.variables[1], self.Classify_layer.variables[0],self.Classify_layer.variables[1]]))
                g_reg = self.get_grad(Loss, training_step, 1)
                self.Optimizers.apply_gradients(zip(g_reg, [self.Fully_Connected.variables[0],self.Fully_Connected.variables[1], self.Classify_layer.variables[0],self.Reg_layer.variables[1]]))
                
                # grad를 다 더해서 VGG16을 훈련시킬거다.
                grad_acc_cls = tf.add(grad_acc_cls, g_cls)
                grad_acc_reg = tf.add(grad_acc_cls, g_reg)

            # 한 이미지에 대한 전체 grad를 얻는다. 
            total_grad = tf.add(grad_acc_cls, grad_acc_reg)
            # vgg16 훈련(conv3_1부터 conv5_3까지)
            self.Optimizers.apply_gradients(zip(total_grad, [self.conv3_1.variables[0], self.conv3_1.variables[1], self.conv3_2.variables[0],self.conv3_2.variables[1], self.conv3_3.variables[0],self.conv3_3.variables[1], self.conv4_1.variables[0],self.conv4_1.variables[1], self.conv4_2.variables[0],self.conv4_2.variables[1], self.conv4_3.variables[0],self.conv4_3.variables[1], self.conv5_1.variables[0],self.conv5_2.variables[1], self.conv5_3.variables[0],self.conv5_3.variables[1]]))
            
        # 4번 째 단계에선 Detector만 훈련시키면 된다.
        if training_step == 4:
            for i in range (0, 64) :
                Loss = Loss_list[i]
                # Detector는 로스 하나하나 적용함
                g_cls = self.get_grad(Loss, 0)
                self.Optimizers.apply_gradients(zip(g_cls, [self.Fully_Connected.variables[0],self.Fully_Connected.variables[1], self.Classify_layer.variables[0],self.Classify_layer.variables[1]]))
                g_reg = self.get_grad(Loss, training_step, 1)
                self.Optimizers.apply_gradients(zip(g_reg, [self.Fully_Connected.variables[0],self.Fully_Connected.variables[1], self.Classify_layer.variables[0],self.Reg_layer.variables[1]]))

    def get_minibatch(self, RoI_list, Ground_Truth_Box_list, Cls_label_list): # 64개의 RoI 추출 + RoI에 맞는 라벨들 추출
        # Ground_Truth_Box_list : (x_min, y_min, x_max, y_max)
        RoI_object_presume_group = []
        Ground_Truth_Box_object_presume_group = []
        Cls_label_object_presume_group = []

        RoI_background_presume_group = []
        Ground_Truth_Box_background_presume_group = []
        Cls_label_background_presume_group = []

        for i in range(0, len(RoI_list)):
            # IoU가 0.5 이상인 그룹, 0.1~0.499999...인 그룹 두개로 나눔
            RoI_x_min = RoI_list[i][0] - RoI_list[i][2]/2
            RoI_y_min = RoI_list[i][1] - RoI_list[i][3]/2
            RoI_x_max = RoI_list[i][0] + RoI_list[i][2]/2
            RoI_y_max = RoI_list[i][1] + RoI_list[i][3]/2

            max_IoU = 0 # RoI가 가진 최대 IoU
            ground_truth_box_Highest_IoU = 0 # 어떤 ground truth box와 가장 높은 IoU를 이뤄냈나
            cls_Higest_IoU = 0 # ground_truth_box_Highest_IoU는 어떤 클래스의 ground truth box와 IoU가 가장 높았나. 원핫 인코딩 양식임

            # RoI의 IoU를 구한다.
            for j in range(0, len(Ground_Truth_Box_list)):

                ground_truth_box = Ground_Truth_Box_list[j]
                cls_label = Cls_label_list[j]

                InterSection_min_x = max(RoI_x_min, ground_truth_box[0])
                InterSection_min_y = max(RoI_y_min, ground_truth_box[1])

                InterSection_max_x = min(RoI_x_max, ground_truth_box[2])
                InterSection_max_y = min(RoI_y_max, ground_truth_box[3])

                InterSection_Area = 0

                if (InterSection_max_x - InterSection_min_x + 1) >= 0 and (InterSection_max_y - InterSection_min_y + 1) >= 0 :
                    InterSection_Area = (InterSection_max_x - InterSection_min_x + 1) * (InterSection_max_y - InterSection_min_y + 1)

                box1_area = RoI_list[i][2] * RoI_list[i][3]
                box2_area = (ground_truth_box[2] - ground_truth_box[0]) * (ground_truth_box[3] - ground_truth_box[1])
                Union_Area = box1_area + box2_area - InterSection_Area

                IoU = (InterSection_Area/Union_Area)
                if IoU > max_IoU :
                    max_IoU = IoU
                    ground_truth_box_Highest_IoU = ground_truth_box
                    cls_Higest_IoU = cls_label

            # 두 그룹(IoU가 0.5이상인 애들, IoU가 0.1~0.499...인 애들)으로 나눔
            if max_IoU >= 0.5 :
                RoI_object_presume_group.append(RoI_list[i])
                Ground_Truth_Box_object_presume_group = ground_truth_box_Highest_IoU
                Cls_label_object_presume_group = cls_Higest_IoU

            elif max_IoU >= 0.1 and max_IoU < 0.5 :
                RoI_background_presume_group.append(RoI_list[i])
                Ground_Truth_Box_background_presume_group.append(ground_truth_box_Highest_IoU)
                Cls_label_background_presume_group.append(cls_Higest_IoU)

        # 나눠진 애들 중 각각 16, 32개를 선별
        # 인덱스를 랜덤으로 각각 16, 32개 선발
        RoI_minibatch = random.choice(RoI_object_presume_group, 16)
        Ground_Truth_Box_minibatch = random.choice(Ground_Truth_Box_object_presume_group, 16)
        Cls_label_minibatch = random.choice(Cls_label_object_presume_group, 16)

        RoI_minibatch.extend(random.choice(RoI_background_presume_group, 32))
        Ground_Truth_Box_minibatch.extend(random.choice(Ground_Truth_Box_background_presume_group, 32))
        Cls_label_minibatch.extend(random.choice(Cls_label_background_presume_group, 32))

        return RoI_minibatch, Ground_Truth_Box_minibatch, Cls_label_minibatch


    def Training_model(self, image_list, RoI_list_forAllImage, Reg_labels_for_FastRCNN, Cls_labels_for_FastRCNN, training_step):
        for i in tqdm(range(0, len(image_list)), desc = "training"):
            image = np.expand_dims(image_list[i], axis = 0)
            RoI_minibatch, Ground_Truth_Box_minibatch, Cls_label_minibatch = self.get_minibatch(self, RoI_list_forAllImage, Reg_labels_for_FastRCNN, Cls_labels_for_FastRCNN) # 이미지 당 64개의 미니배치 선별 (128/2 = 64)
            
            Loss_list = self.multi_task_loss(image, RoI_minibatch, Ground_Truth_Box_minibatch, Cls_label_minibatch) # RoI 64개에 대한 64개의 loss로 이루어진 리스트 반환
            self.App_Gradient(Loss_list, training_step)

### 4-step training

In [115]:
def four_Step_Alternating_Training(RPN_Model, Detector_Model, image_list, xml_file_list, cls_layer_label_list, reg_layer_label_list, Classes_inDataSet, EPOCH): # 두 모델을 받아 훈련시킴
    # 각자 독립된 상태에서 훈련

    for i in range(0, EPOCH) : # RPN 훈련
        RPN_Model.Training_model(image_list, cls_layer_label_list, reg_layer_label_list, 1)

    # 훈련시킨 RPN에서 Detector훈련에 필요한 데이터 휙득
    NMS_RoIs_List = get_nms_list(RPN_Model, image_list) # 입력 데이터
    Reg_labels_for_FastRCNN, Cls_labels_for_FastRCNN = make_Cls_DataSet_forFastRCNN(xml_file_list, Classes_inDataSet) # 라벨 데이터

    for i in range(0, EPOCH) : # Detector 훈련
        Detector_Model.Training_model(image_list, NMS_RoIs_List, Reg_labels_for_FastRCNN, Cls_labels_for_FastRCNN, 2)

    # Detector_Model의 VGG를 RPN에 이식(레이어 공유 시작)
    RPN_Model.conv1_1 = Detector_Model.conv1_1
    RPN_Model.conv1_2 = Detector_Model.conv1_2
    RPN_Model.conv2_1 = Detector_Model.conv2_1
    RPN_Model.conv2_2 = Detector_Model.conv2_2
    RPN_Model.conv3_1 = Detector_Model.conv3_1
    RPN_Model.conv3_2 = Detector_Model.conv3_2
    RPN_Model.conv3_3 = Detector_Model.conv3_3
    RPN_Model.conv4_1 = Detector_Model.conv4_1
    RPN_Model.conv4_2 = Detector_Model.conv4_2
    RPN_Model.conv4_3 = Detector_Model.conv4_3
    RPN_Model.conv5_1 = Detector_Model.conv5_1
    RPN_Model.conv5_2 = Detector_Model.conv5_2
    RPN_Model.conv5_3 = Detector_Model.conv5_3

    for i in range(0, EPOCH) : # RPN 훈련
        RPN_Model.Training_model(image_list, cls_layer_label_list, reg_layer_label_list, 3)

    NMS_RoIs_List = get_nms_list(RPN_Model, image_list) # 입력 데이터. 새로 훈련한 RPN에서 RoI를 선별한다

    # RPN의 VGG16을 Detector의 VGG16 부분에 이식
    Detector_Model.conv1_1 = RPN_Model.conv1_1
    Detector_Model.conv1_2 = RPN_Model.conv1_2
    Detector_Model.conv2_1 = RPN_Model.conv2_1
    Detector_Model.conv2_2 = RPN_Model.conv2_2
    Detector_Model.conv3_1 = RPN_Model.conv3_1
    Detector_Model.conv3_2 = RPN_Model.conv3_2
    Detector_Model.conv3_3 = RPN_Model.conv3_3
    Detector_Model.conv4_1 = RPN_Model.conv4_1
    Detector_Model.conv4_2 = RPN_Model.conv4_2
    Detector_Model.conv4_3 = RPN_Model.conv4_3
    Detector_Model.conv5_1 = RPN_Model.conv5_1
    Detector_Model.conv5_2 = RPN_Model.conv5_2
    Detector_Model.conv5_3 = RPN_Model.conv5_3

    for i in range(0, EPOCH) : # Detector 훈련
        Detector_Model.Training_model(image_list, NMS_RoIs_List, Reg_labels_for_FastRCNN, Cls_labels_for_FastRCNN, 4)

    return RPN_Model, Detector_Model

### Faster RCNN 출력값 휙득

In [None]:
def get_FasterRCNN_output(RPN_Model = RPN_Model, Detector_Model = Detector_Model, Classes_inDataSet = Classes_inDataSet, Image) : # Image : 이미지 경로
    image_cv = cv2.imread(Image)
    image_size = [image_cv[1], image_cv[0]] # 이미지 원래 사이즈를 얻는다. [w, h]

    image_cv = cv2.resize(image_cv, (224, 224))/255
    image_cv = np.expand_dims(image_cv, axis = 0)
    
    cls_output, reg_output = RPN_Model(Image) # (1764, 2), (1764, 4) 휙득
    nms_RoI_inImage = nms(cls_output, reg_output)

    cls_output, reg_layer = Detector_Model(Image, nms_RoI_inImage) # [1,21] 텐서 여러개, [1,84] 텐서 여러개
    # 넘파이 배열로 변환
    cls_output = cls_output.numpy()
    # softmax된 21개의 값 중 가장 큰거 = 예측한 객체 index
    obj_index = np.argmax(cls_output)
    reg_layer = (reg_layer[4*obj_index:4*obj_index+3]).numpy() # 해당 클래스의 Box정보만 얻고 넘파이 배열로 만든다
    # 얻은 박스를 원래 이미지 비율에 맞게 바꿔준다. 
    reg_layer[0] = reg_layer[0] * (image_size[0] / 224)
    reg_layer[1] = reg_layer[1] * (image_size[1] / 224)
    reg_layer[2] = reg_layer[2] * (image_size[0] / 224)
    reg_layer[3] = reg_layer[3] * (image_size[1] / 224)

    # rectangle함수를 위해 필요한 '박스의 최소 x,y 좌표'와 '박스의 최대 x,y좌표'리스트를 생성한다. 
    min_box = (round(reg_layer[0] - reg_layer[2]/2), round(reg_layer[1] - reg_layer[3]/2))
    max_box = (round(reg_layer[0] + reg_layer[2]/2), round(reg_layer[1] + reg_layer[3]/2)) 

    # 출력하기
    im_read = cv2.read(Image)
    cv2.rectangle(im_read, min_box, max_box, (255, 0, 0), -1) # 박스 그리기

    cv2.imshow("output", im_read)
    cv2.waitKey()
    cv2.destroyAllWindows()

# 실행

## 데이터셋 생성

In [None]:
image_file_list = sorted([x for x in glob.glob(train_x_path + '/**')])
xml_file_list = sorted([x for x in glob.glob(train_y_path + '/**')])

anchor_size = [32, 64, 128] # 이미지 크기가 224*224라 32, 64, 128로 지정
anchor_aspect_ratio = [[1,1],[1,0.5], [0.5,1]] # W*L기준 
anchors, anchors_state = make_anchor(anchor_size, anchor_aspect_ratio) # 앵커 생성 + 유효한 앵커 인덱스 휙득

image_list, cls_layer_label_list, reg_layer_label_list = make_dataset_forRPN([image_file_list, xml_file_list, anchors, anchors_state])
Classes_inDataSet = get_Classes_inImage(xml_file_list)

## 모델 생성

In [None]:
max_num = len(tf.keras.applications.VGG16(weights='imagenet', include_top=False,  input_shape=(224, 224, 3)).layers) # 레이어 최대 개수

SharedConvNet = tf.keras.models.Sequential()
for i in range(0, max_num-1):
    SharedConvNet.add(tf.keras.applications.VGG16(weights='imagenet', include_top=False,  input_shape=(224, 224, 3)).layers[i])

initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None)
regularizer = tf.keras.regularizers.l2(0.0005)

for layer in SharedConvNet.layers:
    # 'kernel_regularizer' 속성이 있는 인스턴스를 찾아 regularizer를 추가
    if hasattr(layer, 'kernel_regularizer'):
        setattr(layer, 'kernel_regularizer', regularizer)

RPN_Model = RPN(initializer, regularizer, SharedConvNet, anchors, anchor_size, anchor_aspect_ratio)
Detector_Model = Detector(initializer, regularizer, SharedConvNet)

## 훈련

In [None]:
RPN_Model, Detector_Model = four_Step_Alternating_Training(RPN_Model, Detector_Model, image_list, cls_layer_label_list, reg_layer_label_list, Classes_inDataSet, EPOCH)

### 출력

In [None]:
# 출력 함수의 예
get_FasterRCNN_output("이미지 경로")