In [None]:
import torch
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import DataLoader, Dataset
from datetime import datetime
import time
from pycocotools.cocoeval import COCOeval
from pycocotools.coco import COCO

# Check for GPU availability
if torch.cuda.is_available():
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

In [None]:
# Function to generate bounding boxes and labels
def generate_boxes_and_labels(label_path):
    boxes, labels = [], []
    with open(label_path, 'r') as file:
        lines = file.readlines()
        
    for line in lines:
        label, x_min, y_min, w, h = map(float, line.strip().split(' '))
        label = int(label)
        
        x_max = x_min + w
        y_max = y_min + h

        boxes.append([x_min, y_min, x_max, y_max])
        labels.append(label)
    
    return torch.as_tensor(boxes, dtype=torch.float32), torch.as_tensor(labels, dtype=torch.int64)

def check_ratio(old_size):
    h_ratio = 224 / old_size[0]
    w_ratio = 224 / old_size[1]
    return h_ratio, w_ratio

In [None]:
# Custom Dataset
class PidrayDataset(Dataset):
    def __init__(self, transforms, image_dir, annotation_dir):
        self.transforms = transforms
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.img_names = os.listdir(image_dir)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.img_names[idx])
        label_path = os.path.join(self.annotation_dir, self.img_names[idx].replace('.png', '.txt'))
        
        old_size = cv2.imread(img_path).shape[:2]
        h_ratio, w_ratio = check_ratio(old_size)
        
        image = Image.open(img_path).convert("RGB")
        if self.transforms:
            image = self.transforms(image)
        
        boxes, labels = generate_boxes_and_labels(label_path)
        boxes = boxes * torch.tensor([w_ratio, h_ratio, w_ratio, h_ratio])
        
        target = {'boxes': boxes, 'labels': labels}
        
        return image, target

    def __len__(self):
        return len(self.img_names)

# Data transformations
trans = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224))
])

def collate_fn(batch):
    images, targets = list(zip(*batch))
    images = torch.stack(images)
    targets = [{k: v for k, v in t.items()} for t in targets]
    return images, targets

In [None]:
# Create data loaders
train_dataset = PidrayDataset(trans, '../pidray/train/images', '../pidray/train/labels')
test_dataset = PidrayDataset(trans, '../pidray/test/images', '../pidray/test/labels')

train_data_loader = DataLoader(train_dataset, batch_size=12, shuffle=True, collate_fn=collate_fn)
test_data_loader = DataLoader(test_dataset, batch_size=12, shuffle=False, collate_fn=collate_fn)

In [None]:
# Function to get the model
def get_model_instance_segmentation(num_classes):
    model = models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes + 1)
    return model

# Classes and model initialization
classes = ["Baton", "Pliers", "Hammer", "Powerbank", "Scissors", "Wrench", "Gun", "Bullet", 
           "Sprayer", "Handcuffs", "Knife", "Lighter"]
num_classes = len(classes)

model = get_model_instance_segmentation(num_classes)
model.to(device)

# Optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Training
num_epochs = 10
print('---------------------- Training Start --------------------------')
print(f"Training started at : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
train_start = time.time()

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for batch_num, (imgs, annotations) in enumerate(train_data_loader):
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        epoch_loss += losses.item()
        
        if batch_num % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_num}], Loss: {losses.item():.4f}')
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')
    torch.save(model.state_dict(), f'model_epoch_{epoch+1}.pth')

print(f"Training completed at : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Total time taken : {time.time() - train_start}")

In [None]:
# Load the final model
model.load_state_dict(torch.load(f'model_epoch_{num_epochs}.pth'))

In [None]:
# Function for making predictions
def make_prediction(model, img, threshold):
    model.eval()
    with torch.no_grad():
        preds = model([img])
        for idx in range(len(preds[0]['scores'])):
            if preds[0]['scores'][idx] < threshold:
                preds[0]['boxes'] = preds[0]['boxes'][:idx]
                preds[0]['labels'] = preds[0]['labels'][:idx]
                preds[0]['scores'] = preds[0]['scores'][:idx]
                break
    return preds[0]

# Evaluate model
def evaluate_model(model, data_loader, device, threshold=0.5):
    model.eval()
    coco_results = []
    for images, targets in data_loader:
        images = list(img.to(device) for img in images)
        outputs = model(images)
        
        for target, output in zip(targets, outputs):
            image_id = target["image_id"].item()
            boxes = output["boxes"].cpu().numpy()
            scores = output["scores"].cpu().numpy()
            labels = output["labels"].cpu().numpy()
            
            for box, score, label in zip(boxes, scores, labels):
                if score < threshold:
                    continue
                xmin, ymin, xmax, ymax = box
                width, height = xmax - xmin, ymax - ymin
                result = {
                    "image_id": image_id,
                    "category_id": label,
                    "bbox": [xmin, ymin, width, height],
                    "score": score
                }
                coco_results.append(result)
    
    return coco_results

# COCO evaluation function
def coco_evaluation(coco_true, coco_results):
    coco_dt = coco_true.loadRes(coco_results)
    coco_eval = COCOeval(coco_true, coco_dt, 'bbox')
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()
    
    return coco_eval.stats

In [None]:
# GT 파일 준비 (이 예시는 GT 파일이 COCO 포맷이라고 가정)
coco_true = COCO('path_to_your_coco_annotations.json')
coco_results = evaluate_model(model, test_data_loader, device)
evaluation_stats = coco_evaluation(coco_true, coco_results)

print(f"COCO Evaluation Results: {evaluation_stats}")

# Function to plot image from output
def plot_image_from_output(img, annotation):
    img = img.cpu().permute(1,2,0).numpy()
    fig, ax = plt.subplots(1)
    ax.imshow(img)
    
    for idx in range(len(annotation["boxes"])):
        xmin, ymin, xmax, ymax = annotation["boxes"][idx]
        rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
    
    plt.show()

In [None]:
# Example plot
_idx = 1
print("Target : ", test_data_loader.dataset[_idx][1]['labels'])
plot_image_from_output(test_data_loader.dataset[_idx][0], test_data_loader.dataset[_idx][1])
print("Prediction : ", coco_results[_idx]['category_id'])
plot_image_from_output(test_data_loader.dataset[_idx][0], coco_results[_idx])