<a href="https://colab.research.google.com/github/samuel0922/TEAM_SSAMJI/blob/main/disease_detection_FasterRCNN_00.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import glob
import json
import time
import tqdm
import random
import shutil
import pprint
import numpy as np
import pandas as pd

import cv2
import albumentations
from PIL import Image
import matplotlib.pyplot as plt  
import matplotlib.patches as patches

import torch
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# 레이블 인덱스 딕셔너리

label_idx_dict = {
    'ROI': 0, 'a11': 1, 'a12': 2,  'a5': 3,
    'a7' : 4, 'a9' : 5, 'b3' : 6,  'b4': 7,
    'b5' : 8, 'b6' : 9, 'b7' : 10, 'b8': 11
}

# GPU가 있으면 GPU로 DEVICE를 설정

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(device)

In [None]:
# 필요한 정보를 추출해 리턴해 주는 함수 정의

def get_data_dict(data_dir, data_idx):
    '''
    학습에 필요힌 정보는?
    1) 이미지 경로
    2) bbox(대상 작물 좌표) 좌표값
    3) pbox(병징들의 좌표) 좌표값
    4) part label(병해의 레이블) 
        => N개의 병징에 대한 레이블인데... 레이블은 병해 1개이어야 함   
    '''

    # 데이터셋 경로, 데이터 인덱스, 데이터인덱스에 해당하는 json 파일을 오픈함
    with open(os.path.join(data_dir, data_idx, f"{data_idx}.json"), "r") as f:

        # 파일을 불러와서 data_json에 할당
        data_json = json.load(f)

    # json에서 이미지 경로를 찾아서 image_path에 할당    
    image_path = os.path.join(data_dir, data_idx, data_json["description"]["image"])

    # json에서 bbox 좌표 찾아서 할당
    bbox = data_json["annotations"]["bbox"]

    # json에서 pbox 좌표 찾아서 할당
    bbox_part = data_json["annotations"]["part"]

    
    '''
    이 부분이 pbox(병징에 대한 bbox)에 대한 처리 부분임
    '''
    # part(병징)에 대한 좌표값이 없다면 병징 label은 None으로
    if len(bbox_part) == 0:
        bbox_part_label = None

    # N개가 있다면 bbox_part_label(병징 레이블)에 할당     
    else:
        bbox_part_label = data_json["annotations"]["disease"]

    # 산출한 값들을 반환함.    
    return {
        "image_path": image_path,
        "bbox": bbox,
        "bbox_part": bbox_part,
        "bbox_part_label": bbox_part_label
    }

# 바운딩 박스 좌표를 만들어 주는 함수
def generate_box(bbox):
    xmin = float(bbox['x'])
    ymin = float(bbox['y'])
    xmax = xmin + float(bbox['w'])
    ymax = ymin + float(bbox['h'])
    return [xmin, ymin, xmax, ymax]

# label을 만들어 주는 함수
def generate_label(label):
    return label_idx_dict[label]

# target값을 만들어 주는 함수 
def generate_target(bboxes, bbox_part_label):
    '''
    bboxes(작물에 대한 bbox데이터)와 bbox_part_label(병징에 대한 레이블)을 받아
    boxes(bbox 좌표값들)와 labels(레이블 값)을 반환해 줌
    ??? 확인 => label은 무엇에 대한 레이블이어야 하는가?
    '''
    num_objs = len(bboxes)  # boxes의 갯수를 객체의 갯수로 할당
    boxes = []   # 좌표값을 넣어줄 리스트를 선언

    # 매개변수로 받은 bboxes(좌표값들) 리스트에서 하나씩 꺼내서
    for bbox in bboxes:
        boxes.append(generate_box(bbox))  # 좌표를 생성해 추가해줌

    '''
    label에 대한 처리 부분
    ??? 확인 => "ROI"의 역할은??
    '''

    # 레이블인덱스사전 에서 "ROI"에 해당하는 값을 labels 리스트에 초기화       
    labels = [generate_label("ROI")]

    # 매개변수로 받은 병징에 대한 label이 있는 경우.. 즉 병징이 있는 경우 
    if bbox_part_label is not None:
        '''
        ??? 확인 => label 값에 (레이블인덱스사전에서 추출한 코드)*(객체수-1)을 더해주는게 무슨 의미?    
        '''
        labels += [generate_label(bbox_part_label)] * (num_objs - 1)
    
    # boxes, labels 
    boxes = torch.as_tensor(boxes, dtype=torch.float32)
    labels = torch.as_tensor(labels, dtype=torch.int64) 
    
    return {
        "boxes": boxes,
        "labels": labels
    }


class MaskDataset(object):
    def __init__(self, data_dir, data_list, transforms):
        '''
        path: path to train folder or test folder
        '''
        self.data_dir = data_dir
        self.data_list = data_list
        self.transforms = transforms

    def __len__(self): 
        return len(self.data_list)

    def __getitem__(self, idx):
        data_dict = get_data_dict(self.data_dir, self.data_list[idx])
        
        image_path = data_dict['image_path']
        bboxes = data_dict["bbox"] + data_dict["bbox_part"]
        bbox_part_label = data_dict["bbox_part_label"]
        
        image = Image.open(image_path).convert("RGB")
        target = generate_target(bboxes, bbox_part_label)
        
        if self.transforms is not None:
            image = self.transforms(image)
        return image, target

In [None]:
data_dir = "./train"
data_list = sorted(os.listdir(data_dir))

train_data_list = data_list[:int(len(data_list) * 0.8)]
val_data_list = data_list[int(len(data_list) * 0.8):]

train_ds = MaskDataset(
    data_dir=data_dir,
    data_list=train_data_list,
    transforms = transforms.Compose([
        transforms.ToTensor()
    ]),
)
val_ds = MaskDataset(
    data_dir=data_dir,
    data_list=val_data_list,
    transforms = transforms.Compose([
        transforms.ToTensor()
    ]),
)
train_dl = DataLoader(
    train_ds, 
    batch_size=1,
    shuffle=True, 
    num_workers=8,
    collate_fn=lambda batch: tuple(zip(*batch))
)
val_dl = DataLoader(
    val_ds, 
    batch_size=1,
    shuffle=False,
    num_workers=8,
    collate_fn=lambda batch: tuple(zip(*batch))
)

In [None]:
n_classes = len(label_idx_dict) # 12

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.roi_heads.box_predictor = FastRCNNPredictor(
    model.roi_heads.box_predictor.cls_score.in_features,
    n_classes
)
model.to(device)

In [None]:
trainable_params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(trainable_params, lr=0.0001)
    
num_epochs = 50
print('----------------------train start--------------------------')
for epoch in range(num_epochs):
    start = time.time()
    model.train()
    i = 0    
    epoch_loss = 0
    for images, targets in tqdm.tqdm(train_dl):
        i += 1
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets) 
        losses = sum(loss for loss in loss_dict.values())        

        optimizer.zero_grad()
        losses.backward()
        optimizer.step() 
        epoch_loss += losses
    print(f'epoch : {epoch+1}, Loss : {epoch_loss}, time : {time.time() - start}')    
    
torch.save(model.state_dict(),f'model_{num_epochs}.pt')
model.load_state_dict(torch.load(f'model_{num_epochs}.pt'))
print(f"model saved: {f'model_{num_epochs}.pt'}")

In [None]:
torch.save(model.state_dict(),f'model_{num_epochs}.pt')
model.load_state_dict(torch.load(f'model_{num_epochs}.pt'))
print(f"model saved: {f'model_{num_epochs}.pt'}")

In [None]:
def make_prediction(model, images, threshold):
    model.eval()
    preds = model(images)
    for id in range(len(preds)) :
        idx_list = []

        for idx, score in enumerate(preds[id]['scores']) :
            if score > threshold : 
                idx_list.append(idx)

        preds[id]['boxes'] = preds[id]['boxes'][idx_list]
        preds[id]['labels'] = preds[id]['labels'][idx_list]
        preds[id]['scores'] = preds[id]['scores'][idx_list]

    return preds



label_idx_dict = {
    'ROI': 0, 'a11': 1, 'a12': 2,  'a5': 3,
    'a7' : 4, 'a9' : 5, 'b3' : 6,  'b4': 7,
    'b5' : 8, 'b6' : 9, 'b7' : 10, 'b8': 11
}



def plot_image_from_output(img, annotation):
    img = img.cpu().permute(1,2,0)
    fig,ax = plt.subplots(1)
    ax.imshow(img)
    for idx in range(len(annotation["boxes"])):
        xmin, ymin, xmax, ymax = annotation["boxes"][idx]
        if annotation['labels'][idx] == 0 :
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=2,edgecolor='r',facecolor='none')
        else :
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=2,edgecolor='g',facecolor='none')
        ax.add_patch(rect)
    plt.show()


with torch.no_grad(): 
    # 테스트셋 배치사이즈= 2
    for images, targets in val_dl:
        images = list(image.to(device) for image in images)
        preds = make_prediction(model, images, 0.5)
        print(preds)
        break
    
    
_idx = 0
print("Target : ", targets[_idx]['labels'])
plot_image_from_output(images[_idx], targets[_idx])
print("Prediction : ", preds[_idx]['labels'])
plot_image_from_output(images[_idx], targets[_idx])