In [1]:
import torch

if torch.cuda.is_available():    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))

else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

There are 2 GPU(s) available.
We will use the GPU: NVIDIA GeForce RTX 3090


In [2]:
import os
import random
import numpy as np
import shutil

# print(len(os.listdir('annotations')))
# print(len(os.listdir('images')))

# !mkdir test_images
# !mkdir test_annotations


# random.seed(1234)
# idx = random.sample(range(853), 170)

# for img in np.array(sorted(os.listdir('images')))[idx]:
#     shutil.move('images/'+img, 'test_images/'+img)

# for annot in np.array(sorted(os.listdir('annotations')))[idx]:
#     shutil.move('annotations/'+annot, 'test_annotations/'+annot)

# print(len(os.listdir('annotations')))
# print(len(os.listdir('images')))
# print(len(os.listdir('test_annotations')))
# print(len(os.listdir('test_images')))

In [3]:
import os
import numpy as np
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from PIL import Image
import torchvision
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import time

In [4]:
def generate_box(obj):
    
    xmin = float(obj.find('xmin').text)
    ymin = float(obj.find('ymin').text)
    xmax = float(obj.find('xmax').text)
    ymax = float(obj.find('ymax').text)
    
    return [xmin, ymin, xmax, ymax]

adjust_label = 1

def generate_label(obj):
    if obj.find('name').text == 'W':
        return 0

    elif obj.find('name').text == "WH":
        return 1
    
    elif obj.find('name').text == "WHV":
        return 2
    
    elif obj.find('name').text == "WV":
        return 3

def generate_target(file): 
    with open(file) as f:
        data = f.read()
        soup = BeautifulSoup(data, "html.parser")
        objects = soup.find_all("object")

        num_objs = len(objects)

        boxes = []
        labels = []
        for i in objects:
            boxes.append(generate_box(i))
            labels.append(generate_label(i))

        boxes = torch.as_tensor(boxes, dtype=torch.float32) 
        labels = torch.as_tensor(labels, dtype=torch.int64) 
        
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        
        return target

def plot_image_from_output(img, annotation):
    
    img = img.cpu().permute(1,2,0)
    
    fig,ax = plt.subplots(1)
    ax.imshow(img)
    
    for idx in range(len(annotation["boxes"])):
        xmin, ymin, xmax, ymax = annotation["boxes"][idx].cpu()

        if annotation['labels'][idx] == 1 :
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='r',facecolor='none')
        
        elif annotation['labels'][idx] == 2 :
            
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='g',facecolor='none')
            
        else :
        
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='orange',facecolor='none')

        ax.add_patch(rect)

    plt.show()

In [5]:

class MaskDataset(object):
    def __init__(self, transforms, path):
        '''
        path: path to train folder or test folder
        '''
        # transform module과 img path 경로를 정의
        self.transforms = transforms
        self.path = path
        self.imgs = list(sorted(os.listdir(self.path)))


    def __getitem__(self, idx): #special method
        # load images ad masks
        file_image = self.imgs[idx]
        file_label = self.imgs[idx][:-3] + 'xml'
        img_path = os.path.join(self.path, file_image)
        
        if 'test' in self.path:
            label_path = f"C:/Users/LGS/Desktop/STEP_7/ppe_det/faster_rcnn/worker-safety-1/test_anntations/{file_label}"
        else:
            label_path = f"C:/Users/LGS/Desktop/STEP_7/ppe_det/faster_rcnn/worker-safety-1/train_annotations/{file_label}"

        img = Image.open(img_path).convert("RGB")
        #Generate Label
        target = generate_target(label_path)
        
        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self): 
        return len(self.imgs)

data_transform = transforms.Compose([  # transforms.Compose : list 내의 작업을 연달아 할 수 있게 호출하는 클래스
        transforms.ToTensor() # ToTensor : numpy 이미지에서 torch 이미지로 변경
    ])

def collate_fn(batch):
    return tuple(zip(*batch))

dataset = MaskDataset(data_transform, 'worker-safety-1/train_images/')
test_dataset = MaskDataset(data_transform, 'worker-safety-1/test_images/')

data_loader = torch.utils.data.DataLoader(dataset, batch_size=32, collate_fn=collate_fn)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn)

In [6]:
def get_model_instance_segmentation(num_classes):
  
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

In [8]:
model = get_model_instance_segmentation(4)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') 
model.to(device)



FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [10]:
num_epochs = 100
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)

In [39]:
print('----------------------train start--------------------------')
for epoch in range(num_epochs):
    start = time.time()
    model.train()
    i = 0    
    epoch_loss = 0
    for imgs_dum, annotations_dum in data_loader:
        i += 1
        imgs = []
        annotations = []
        for img, anno in zip(imgs_dum, annotations_dum):
            if len(anno['boxes']) == 0:
                continue
            imgs.append(img.to(device))
            annotations.append({k: v.to(device) for k, v in anno.items()})
        loss_dict = model(imgs, annotations) 
        losses = sum(loss for loss in loss_dict.values())        

        optimizer.zero_grad()
        losses.backward()
        optimizer.step() 
        epoch_loss += losses
    print(f'epoch : {epoch+1}, Loss : {epoch_loss}, time : {time.time() - start}')

----------------------train start--------------------------
epoch : 1, Loss : 17.276485443115234, time : 194.27614665031433
epoch : 2, Loss : 11.476853370666504, time : 150.18771648406982
epoch : 3, Loss : 9.484040260314941, time : 154.77302813529968
epoch : 4, Loss : 8.941046714782715, time : 158.03654956817627
epoch : 5, Loss : 8.269207000732422, time : 158.7150056362152
epoch : 6, Loss : 8.0880708694458, time : 157.63674139976501
epoch : 7, Loss : 7.602746963500977, time : 156.92012357711792
epoch : 8, Loss : 7.300926208496094, time : 157.69312596321106
epoch : 9, Loss : 6.7668280601501465, time : 158.93270087242126
epoch : 10, Loss : 6.66456937789917, time : 159.4765181541443
epoch : 11, Loss : 6.155863285064697, time : 158.34260630607605
epoch : 12, Loss : 5.9710612297058105, time : 158.37443113327026
epoch : 13, Loss : 5.614724159240723, time : 157.86891269683838
epoch : 14, Loss : 5.324803352355957, time : 156.5039975643158
epoch : 15, Loss : 5.137792110443115, time : 158.855011

In [40]:
torch.save(model.state_dict(),f'model_{num_epochs}.pt')


In [11]:
model.load_state_dict(torch.load(f'model_{num_epochs}.pt'))


<All keys matched successfully>

In [12]:
def make_prediction(model, img, threshold):
    model.eval()
    preds = model(img)
    for id in range(len(preds)) :
        idx_list = []

        for idx, score in enumerate(preds[id]['scores']) :
            if score > threshold : 
                idx_list.append(idx)

        preds[id]['boxes'] = preds[id]['boxes'][idx_list]
        preds[id]['labels'] = preds[id]['labels'][idx_list]
        preds[id]['scores'] = preds[id]['scores'][idx_list]

    return preds

In [20]:
import cv2
import timeit

model = model.to('cuda')
model.eval() 
video_path = "C:/Users/LGS/Desktop/STEP_7/ppe_det/data_set/test_worker.mp4"
cap = cv2.VideoCapture(video_path)

fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('multiple_box.mp4', fourcc, cap.get(5), (int(cap.get(3)), int(cap.get(4))))
with torch.no_grad(): 

    while cap.isOpened():
        success, frame = cap.read()

        if success:
            start_time = timeit.default_timer()
            frame_tensor = frame
#             frame_tensor = cv2.resize(frame, (640,640))
            
            results = model([data_transform(frame_tensor).to(device)])
            boxes = results[0]['boxes'][results[0]['scores'] > 0.7]
            boxes = boxes.cpu().numpy()

            # 프레임에 사각형 그리기
            for box, label, score in zip(boxes, results[0]['labels'], results[0]['scores']):
                xmin, ymin, xmax, ymax = box
                cv2.rectangle(frame, (int(xmin), int(ymin)), (int(xmax), int(ymax)), (0, 255, 0), 2)

                # 레이블 정보와 스코어를 텍스트로 생성
                if label == 0:
                    label_text = f" W {score:.2f}"
                if label == 1:
                    label_text = f" WH {score:.2f}"
                if label == 2:
                    label_text = f" WHV {score:.2f}"
                if label == 3:
                    label_text = f" WV {score:.2f}"
                # 텍스트를 프레임에 추가
                cv2.putText(frame, label_text, (int(xmin), int(ymin) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)


            terminate_time = timeit.default_timer()
            fps = str(int(1. / (terminate_time - start_time)))

            cv2.putText(frame, "fps = {} ".format(fps), (0, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0))
            cv2.imshow("YOLOv8 Inference", frame)

            out.write(frame)

            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
        else:
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()


In [18]:
results

[{'boxes': tensor([], device='cuda:0', size=(0, 4), grad_fn=<StackBackward0>),
  'labels': tensor([], device='cuda:0', dtype=torch.int64),
  'scores': tensor([], device='cuda:0', grad_fn=<IndexBackward0>)}]

In [None]:
_idx = 1
print("Target : ", annotations[_idx]['labels'])
plot_image_from_output(imgs[_idx], annotations[_idx])
print("Prediction : ", pred[_idx]['labels'])
plot_image_from_output(imgs[_idx], pred[_idx])

In [None]:
_idx = 1
print("Target : ", annotations[_idx]['labels'])
plot_image_from_output(imgs[_idx], annotations[_idx])
print("Prediction : ", pred[_idx]['labels'])
plot_image_from_output(imgs[_idx], pred[_idx])

In [None]:
def plot_image_from_output(img, annotation):
    
    img = img.cpu().permute(1,2,0)
    
    fig,ax = plt.subplots(1)
    ax.imshow(img)
    
    for idx in range(len(annotation["boxes"])):
        xmin, ymin, xmax, ymax = annotation["boxes"][idx].cpu()

        if annotation['labels'][idx] == 1 :
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='r',facecolor='none')
        
        elif annotation['labels'][idx] == 2 :
            
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='g',facecolor='none')
            
        else :
        
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='orange',facecolor='none')

        ax.add_patch(rect)

    plt.show()

In [None]:
from tqdm import tqdm

labels = []
preds_adj_all = []
annot_all = []

for im, annot in tqdm(test_data_loader, position = 0, leave = True):
    im = list(img.to(device) for img in im)
    #annot = [{k: v.to(device) for k, v in t.items()} for t in annot]

    for t in annot:
        labels += t['labels']

    with torch.no_grad():
        preds_adj = make_prediction(model, im, 0.5)
        preds_adj = [{k: v.to(torch.device('cpu')) for k, v in t.items()} for t in preds_adj]
        preds_adj_all.append(preds_adj)
        annot_all.append(annot)

In [None]:
%cd Tutorial-Book-Utils/
import utils_ObjectDetection as utils

In [None]:
sample_metrics = []
for batch_i in range(len(preds_adj_all)):
    sample_metrics += utils.get_batch_statistics(preds_adj_all[batch_i], annot_all[batch_i], iou_threshold=0.5) 

true_positives, pred_scores, pred_labels = [torch.cat(x, 0) for x in list(zip(*sample_metrics))]  # 배치가 전부 합쳐짐
precision, recall, AP, f1, ap_class = utils.ap_per_class(true_positives, pred_scores, pred_labels, torch.tensor(labels))
mAP = torch.mean(AP)
print(f'mAP : {mAP}')
print(f'AP : {AP}')

In [None]:
+