In [1]:
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, Subset
import cv2
import json
import numpy as np
from PIL import Image
import os
import glob
import random
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms.functional as TF
from sklearn.model_selection import train_test_split
import albumentations as A
from albumentations.core.transforms_interface import ImageOnlyTransform
from albumentations.pytorch import ToTensorV2

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
gamma_values = [0.2, 0.4, 0.6, 0.8, 1.0, 1.2, 1.4, 1.6, 1.8]  # Gamma values

def transform(img):
    return img

def gamma_correction(img, gamma):
    return TF.adjust_gamma(img, gamma)

flip_types = ["horizontal", "vertical", "both", "none"]

def random_flip(img, flip_type):
    if flip_type == "horizontal":
        return TF.hflip(img)
    elif flip_type == "vertical":
        return TF.vflip(img)
    elif flip_type == "both":
        return TF.hflip(TF.vflip(img))
    return img  # No flip

# {'Instrument', 'Care', 'Bubble', 'unkown', 'unknown', 'Fat', 'SoftTIssue', 'Dura', 'BF', 'SoftTissue', 'vessel', 'Vessel', 'Bone', 'LF', 'SofrTissue'}
num_classes = 11 # Background, BF, Vessel, Instrument, Care, Bubble, Fat, Bone, LF, Dura, SoftTissue

# 클래스별 라벨 매핑
class_map = {"BF": 1, "Vessel": 2, "vessel": 2, "Instrument": 3, "Care": 4, "Bubble": 5, "Fat": 6, "Bone": 7, "LF": 8, "Dura": 9, "SoftTissue": 10, "SofrTissue": 10, "SoftTIssue": 10}

# 데이터셋 클래스 정의
class BleedingDataset(Dataset):
    def __init__(self, image_files, transform=None, augmentation=False):
        self.image_paths = image_files
        self.json_paths = [f.replace('.jpeg', '.json').replace('.png', '.json') for f in self.image_paths]
        self.transform = transform
        self.augmentation = augmentation
        self.transform_image = transform_image
        self.transform_mask = transform_mask
        self.toTensor = transforms.ToTensor()

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # 원본 이미지 로드
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")
        image = image.crop((420, 0, 1500, 1080))

        # JSON 파일 로드
        json_path = self.json_paths[idx]
        with open(json_path, 'r') as f:
            data = json.load(f)

        # 빈 마스크 생성
        mask = np.zeros((data["imageHeight"], data["imageWidth"]), dtype=np.uint8)

        # 출혈(BF) 영역 폴리곤 마스크 생성
        for shape in data["shapes"]:
            label = shape["label"]
            if label in class_map:
                points = np.array(shape["points"], dtype=np.int32)
                cv2.fillPoly(mask, [points], class_map[label])

        
        # PIL 이미지 변환 후 Tensor 변환
        mask = Image.fromarray(mask)
        mask = mask.crop((420, 0, 1500, 1080))
        
        image = self.transform_image(image)
        mask = self.transform_mask(mask)

        if self.augmentation:
            gamma = random.choice(gamma_values)  # 랜덤한 gamma 값 선택
            image = gamma_correction(image, gamma)
            #mask = gamma_correction(mask, gamma)
            flip_type = random.choice(flip_types)
            image = random_flip(image, random_flip)
            mask = random_flip(mask, random_flip)

        return image, mask

# 데이터셋 클래스 정의
class BleedingDatasetTest(Dataset):
    def __init__(self, image_files, transform=None, augmentation=False):
        self.image_paths = image_files
        self.transform = transform
        self.augmentation = augmentation
        self.transform_image = transform_image
        self.toTensor = transforms.ToTensor()

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # 원본 이미지 로드
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")
        image = image.crop((420, 0, 1500, 1080))
        
        image = self.transform_image(image)
        # image = self.toTensor(image)
    
        return image

# 데이터셋 클래스 정의
class BleedingDatasetTestVideo(Dataset):
    def __init__(self, image_files, transform=None, augmentation=False):
        self.image_paths = image_files
        self.transform = transform
        self.augmentation = augmentation
        self.transform_image = transform_image
        self.toTensor = transforms.ToTensor()

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # 원본 이미지 로드
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")
        image = image.crop((840, 0, 3000, 2160))
        
        image = self.transform_image(image)
        # image = self.toTensor(image)
    
        return image

# 데이터 변환 정의
transform_image = transforms.Compose([
    transforms.Resize((512, 512), interpolation=transforms.InterpolationMode.BILINEAR),  # 일반 이미지용,
    transforms.ToTensor(),
    #lambda x: x.long(),
])

transform_mask = transforms.Compose([
    transforms.Resize((512, 512), interpolation=transforms.InterpolationMode.NEAREST),  # mask 용,
    transforms.ToTensor(),
    lambda x: x * 255,  # 다시 255를 곱하여 (0,255) 범위로 변환
    lambda x: x.long(),
])

# Dice Loss 정의
def dice_loss(pred, target, smooth=1e-6):
    pred = F.softmax(pred, dim=1)  # 여러 클래스 예측 확률로 변환
    target_onehot = F.one_hot(target, num_classes=pred.shape[1]).permute(0, 3, 1, 2)  # One-hot encoding
    intersection = (pred * target_onehot).sum(dim=(2,3))
    union = pred.sum(dim=(2,3)) + target_onehot.sum(dim=(2,3))
    dice = (2. * intersection + smooth) / (union + smooth)
    return 1 - dice.mean()  # 다중 클래스 dice loss

def focal_loss(pred, target, gamma=2.0):
    pred = F.softmax(pred, dim=1)  # 확률 분포
    target_onehot = F.one_hot(target, num_classes=pred.shape[1]).permute(0, 3, 1, 2)
    ce_loss = -(target_onehot * torch.log(pred + 1e-6))  # Cross Entropy 기반
    focal_loss = (1 - pred) ** gamma * ce_loss
    return focal_loss.mean()

def iou_loss(pred, target, smooth=1e-6):
    pred = F.softmax(pred, dim=1)
    target_onehot = F.one_hot(target, num_classes=pred.shape[1]).permute(0, 3, 1, 2)
    intersection = (pred * target_onehot).sum(dim=(2,3))
    union = pred.sum(dim=(2,3)) + target_onehot.sum(dim=(2,3)) - intersection
    iou = (intersection + smooth) / (union + smooth)
    return 1 - iou.mean()

def loss_fn(pred, target):
    return (dice_loss(pred, target) + focal_loss(pred, target) + iou_loss(pred, target)) / 3


In [3]:
image_dir = "0014_spine_endoscope_data/"
image_files = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(('.jpeg', '.png'))])  # 이미지 파일 리스트

# ✅ Train Test Split (85:15 비율)
train_images, test_images = train_test_split(image_files, test_size=0.15, random_state=42)

# train 데이터셋 및 DataLoader 생성
train_dataset = BleedingDataset(train_images, transform=transform, augmentation=True)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

# test 데이터셋 및 DataLoader 생성
test_dataset = BleedingDataset(test_images, transform=transform, augmentation=False)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [7]:

# 모델 로드 (ResNet50 기반)
model = torchvision.models.segmentation.deeplabv3_resnet50(pretrained=True)

# 출력 채널 변경 (COCO 클래스 → predict 클래스)
model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1)

model.load_state_dict(torch.load("deeplabv3_bleeding_multiclass_crop.pth"))

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 옵티마이저 설정
optimizer = optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for images, masks in train_loader:
        images, masks = images.to(device), masks.squeeze(1).to(device)

        optimizer.zero_grad()
        outputs = model(images)["out"]  # DeepLabV3의 출력 가져오기
        loss = loss_fn(outputs, masks)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}")

# 모델 저장
torch.save(model.state_dict(), "deeplabv3_bleeding_multiclass_crop.pth")
print("모델 저장 완료!")


Epoch [1/10], Loss: 0.4018
Epoch [2/10], Loss: 0.3655
Epoch [3/10], Loss: 0.3620
Epoch [4/10], Loss: 0.3587
Epoch [5/10], Loss: 0.3541
Epoch [6/10], Loss: 0.3529
Epoch [7/10], Loss: 0.3504
Epoch [8/10], Loss: 0.3477
Epoch [9/10], Loss: 0.3449
Epoch [10/10], Loss: 0.3427
모델 저장 완료!


In [8]:

# 모델 로드 (ResNet50 기반)
model = torchvision.models.segmentation.deeplabv3_resnet50(pretrained=True)

# 출력 채널 변경 (COCO 클래스 → predict 클래스)
model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1)

model.load_state_dict(torch.load("deeplabv3_bleeding_multiclass_crop.pth"))

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()

output_dir = "test_results/"
os.makedirs(output_dir, exist_ok=True)

total_num = 0
for images, masks in test_dataloader:
    images, masks = images.to(device), masks.to(device)
    
    with torch.no_grad():
        preds = model(images)["out"]  # DeepLabV3의 출력 가져오기
        
    for image, mask, pred in zip(images, masks, preds):
        total_num += 1

        # 📌 후처리 (Sigmoid + Threshold)
        pred_mask = torch.argmax(pred, dim=0).cpu().numpy()  # (512, 512)
        original_mask = mask.squeeze().cpu().numpy()  # GT 마스크 (512, 512)
        
        # 원본 이미지, 마스크 변환
        original_image = image.cpu().numpy().transpose(1,2,0)
        original_image = (original_image * 255).astype(np.uint8)  # 정규화 해제
        
        # ✅ 컬러맵 적용 (GT = Green, Pred = Red, Overlap = Yellow)
        overlay = np.array(original_image)
        overlay = cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR)

        green = [0, 255, 0]  # Ground Truth (GT) - Green
        red = [0, 0, 255]  # Prediction - Red
        yellow = [0, 255, 255]  # Overlapping - Yellow

        mask_layer = np.zeros_like(overlay, dtype=np.uint8)
        mask_layer[original_mask == 1] = green  # GT
        mask_layer[pred_mask == 1] = red  # Prediction
        mask_layer[(original_mask == 1) & (pred_mask == 1)] = yellow  # Overlap

        # ✅ 최종 합성
        blended = cv2.addWeighted(overlay, 0.7, mask_layer, 0.5, 0)

        # blended = cv2.resize(blended, (1920, 1080), interpolation=cv2.INTER_LANCZOS4)

        # ✅ 저장
        filename = f"output_{total_num}.png"
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, blended)
        
print("test completed")

test completed


In [13]:
# video test

def extract_frames(video_path, output_folder, fps=20):
    # 비디오 캡처 객체 생성
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video file.")
        return
    
    # 원본 비디오의 FPS 및 총 프레임 수 가져오기
    video_fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # 프레임을 저장할 폴더 생성
    os.makedirs(output_folder, exist_ok=True)
    
    frame_interval = video_fps // fps  # 몇 프레임마다 저장할지 계산
    frame_count = 0
    saved_count = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        if frame_count % frame_interval == 0:
            frame_filename = os.path.join(output_folder, f"frame_{saved_count:05d}.png")
            cv2.imwrite(frame_filename, frame)
            saved_count += 1
        
        frame_count += 1
    
    cap.release()
    print(f"Extracted {saved_count} frames and saved to {output_folder}")

# 사용 예시
video_file = "bleeding_test_1.mp4"  # MP4 파일 경로
video_image_dir = "bleeding_test_3/"  # 저장할 폴더
# extract_frames(video_file, video_image_dir)

output_dir = "bleeding_test_result_3/"

video_image_files = sorted([os.path.join(video_image_dir, f) for f in os.listdir(video_image_dir) if f.endswith(('.jpeg', '.png'))])  # 이미지 파일 리스트

# test 데이터셋 및 DataLoader 생성
video_test_dataset = BleedingDatasetTestVideo(video_image_files, transform=transform, augmentation=False)
video_test_dataloader = DataLoader(video_test_dataset, batch_size=8, shuffle=False)


# 모델 로드 (ResNet50 기반)
model = torchvision.models.segmentation.deeplabv3_resnet50(pretrained=True)

# 출력 채널 변경 (COCO 클래스 → predict 클래스)
model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1)

model.load_state_dict(torch.load("deeplabv3_bleeding_multiclass_crop.pth"))

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()

os.makedirs(output_dir, exist_ok=True)

total_num = 0
for images in video_test_dataloader:
    images = images.to(device)
    
    with torch.no_grad():
        preds = model(images)["out"]  # DeepLabV3의 출력 가져오기
        
    for image, pred in zip(images, preds):
        total_num += 1
        
        # 📌 후처리 (Sigmoid + Threshold)
        pred_mask = torch.argmax(pred, dim=0).cpu().numpy()  # (512, 512)
        
        # 원본 이미지, 마스크 변환
        original_image = image.cpu().numpy().transpose(1,2,0)
        original_image = (original_image * 255).astype(np.uint8)  # 정규화 해제
       
        # ✅ 컬러맵 적용 (GT = Green, Pred = Red, Overlap = Yellow)
        overlay = np.array(original_image)
        overlay = cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR)

        green = [0, 255, 0]  # Ground Truth (GT) - Green
        red = [0, 0, 255]  # Prediction - Red
        yellow = [0, 255, 255]  # Overlapping - Yellow

        mask_layer = np.zeros_like(overlay, dtype=np.uint8)
        mask_layer[pred_mask == 1] = green  # Prediction

        # ✅ 최종 합성
        blended = cv2.addWeighted(overlay, 0.7, mask_layer, 0.5, 0)

        # blended = cv2.resize(blended, (1920, 1080), interpolation=cv2.INTER_LANCZOS4)

        # ✅ 저장
        filename = f"output_{total_num:04d}.png"
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, blended)

os.system(f"cd {output_dir} && ffmpeg -framerate 10 -i output_%04d.png -c:v libx264 -pix_fmt yuv420p output.mp4 & cd ..")
print("video test completed")

video test completed


ffmpeg version 6.1.1-3ubuntu5 Copyright (c) 2000-2023 the FFmpeg developers
  built with gcc 13 (Ubuntu 13.2.0-23ubuntu3)
  configuration: --prefix=/usr --extra-version=3ubuntu5 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --disable-omx --enable-gnutls --enable-libaom --enable-libass --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libglslang --enable-libgme --enable-libgsm --enable-libharfbuzz --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx265 --enable-libxml2 --enable-libxvid --enable-libzimg --ena