In [4]:
pip install torch transformers pytorch-lightning datasets roboflow

[0mNote: you may need to restart the kernel to use updated packages.


In [5]:
pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Downloading evaluate-0.4.3-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: evaluate
Successfully installed evaluate-0.4.3
[0mNote: you may need to restart the kernel to use updated packages.


In [5]:
import pytorch_lightning as pl
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks.model_checkpoint import ModelCheckpoint
from pytorch_lightning.loggers import CSVLogger
from transformers import SegformerFeatureExtractor, SegformerForSemanticSegmentation
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import os
from PIL import Image
import numpy as np
import random
import evaluate

metric = evaluate.load("accuracy")


2024-10-14 03:07:15.924831: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-10-14 03:07:15.928792: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-10-14 03:07:15.937105: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-14 03:07:15.950390: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-10-14 03:07:15.954380: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attemptin

In [6]:
import roboflow

from roboflow import Roboflow
rf = Roboflow(api_key="rWhzxDa8oLFF7F6zw3cG")
project = rf.workspace("cd-pq7yy").project("building-defects-xpjmz")
version = project.version(2)
dataset = version.download("png-mask-semantic")
                

loading Roboflow workspace...
loading Roboflow project...


In [7]:
class SemanticSegmentationDataset(Dataset):
    """Image (semantic) segmentation dataset."""

    def __init__(self, root_dir, feature_extractor):
        """
        Args:
            root_dir (string): Root directory of the dataset containing the images + annotations.
            feature_extractor (SegFormerFeatureExtractor): feature extractor to prepare images + segmentation maps.
        """
        self.root_dir = root_dir
        self.feature_extractor = feature_extractor
        
        # Load class mapping from CSV file (e.g., _classes.csv)
        self.classes_csv_file = os.path.join(self.root_dir, "_classes.csv")
        with open(self.classes_csv_file, 'r') as fid:
            data = [l.split(',') for i, l in enumerate(fid) if i != 0]
        self.id2label = {x[0]: x[1] for x in data}
        
        image_file_names = [f for f in os.listdir(self.root_dir) if '.jpg' in f]
        mask_file_names = [f for f in os.listdir(self.root_dir) if '.png' in f]
        
        self.images = sorted(image_file_names)
        self.masks = sorted(mask_file_names)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.root_dir, self.images[idx]))
        segmentation_map = Image.open(os.path.join(self.root_dir, self.masks[idx]))

        # Convert segmentation map to numpy array (without ignoring any labels)
        segmentation_map = np.array(segmentation_map)

        # Apply feature extractor to both image and segmentation map
        encoded_inputs = self.feature_extractor(image, segmentation_map, return_tensors="pt")

        # Remove batch dimension
        for k, v in encoded_inputs.items():
            encoded_inputs[k].squeeze_()

        return encoded_inputs
    

In [8]:
import torch
import torch.nn.functional as F

class SegformerFinetuner(pl.LightningModule):
    
    def __init__(self, id2label, train_dataloader=None, val_dataloader=None, test_dataloader=None, metrics_interval=100):
        super(SegformerFinetuner, self).__init__()
        self.id2label = id2label
        self.metrics_interval = metrics_interval
        self.train_dl = train_dataloader
        self.val_dl = val_dataloader
        self.test_dl = test_dataloader
        
        self.num_classes = len(id2label.keys())
        self.label2id = {v:k for k,v in self.id2label.items()}
        
        # SegFormer 모델 로드
        self.model = SegformerForSemanticSegmentation.from_pretrained(
            "nvidia/segformer-b5-finetuned-cityscapes-1024-1024", 
            return_dict=False, 
            num_labels=self.num_classes,
            id2label=self.id2label,
            label2id=self.label2id,
            ignore_mismatched_sizes=True,
        )
        
        # 클래스 가중치 (4번 클래스에 높은 가중치 설정)
        self.class_weights = torch.ones(self.num_classes)  # 기본 가중치는 모두 1
        self.class_weights[4] = 15.0  # 4번 클래스에 가중치 10을 적용
        
        self.train_mean_iou = load_metric("mean_iou")
        self.val_mean_iou = load_metric("mean_iou")
        self.test_mean_iou = load_metric("mean_iou")
        
    def forward(self, images, masks):
        outputs = self.model(pixel_values=images, labels=masks)
        return outputs
    
    def training_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]
        
        # Logits 업샘플링하여 원본 크기와 맞추기
        upsampled_logits = nn.functional.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )

        predicted = upsampled_logits.argmax(dim=1)

        # 클래스 가중치를 적용한 cross-entropy 손실 계산
        loss = F.cross_entropy(upsampled_logits, masks, weight=self.class_weights.to(self.device))

        # IoU 계산
        self.train_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(), 
            references=masks.detach().cpu().numpy()
        )
        
        if batch_nb % self.metrics_interval == 0:
            metrics = self.train_mean_iou.compute(
                num_labels=self.num_classes, 
                ignore_index=255, 
                reduce_labels=False,
            )
            metrics = {'loss': loss, "mean_iou": metrics["mean_iou"], "mean_accuracy": metrics["mean_accuracy"]}
            
            for k,v in metrics.items():
                self.log(k, v)
            
            return metrics
        else:
            return {'loss': loss}
    
    def validation_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]
        
        upsampled_logits = nn.functional.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted = upsampled_logits.argmax(dim=1)

        # Validation에서도 가중치를 적용한 손실 계산
        loss = F.cross_entropy(upsampled_logits, masks, weight=self.class_weights.to(self.device))

        # IoU 계산
        self.val_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(), 
            references=masks.detach().cpu().numpy()
        )
        
        return {'val_loss': loss}

    def validation_epoch_end(self, outputs):
        metrics = self.val_mean_iou.compute(
              num_labels=self.num_classes, 
              ignore_index=255, 
              reduce_labels=False,
          )
        
        avg_val_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        val_mean_iou = metrics["mean_iou"]
        val_mean_accuracy = metrics["mean_accuracy"]
        
        metrics = {"val_loss": avg_val_loss, "val_mean_iou": val_mean_iou, "val_mean_accuracy": val_mean_accuracy}
        for k,v in metrics.items():
            self.log(k, v)

        return metrics
    
    def test_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]
        
        upsampled_logits = nn.functional.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted = upsampled_logits.argmax(dim=1)

        # Test에서도 가중치를 적용한 손실 계산
        loss = F.cross_entropy(upsampled_logits, masks, weight=self.class_weights.to(self.device))

        # IoU 계산
        self.test_mean_iou.add_batch(
            predictions=predicted.detach().cpu().numpy(), 
            references=masks.detach().cpu().numpy()
        )
            
        return {'test_loss': loss}
    
    def test_epoch_end(self, outputs):
        metrics = self.test_mean_iou.compute(
              num_labels=self.num_classes, 
              ignore_index=255, 
              reduce_labels=False,
          )
       
        avg_test_loss = torch.stack([x["test_loss"] for x in outputs]).mean()
        test_mean_iou = metrics["mean_iou"]
        test_mean_accuracy = metrics["mean_accuracy"]

        metrics = {"test_loss": avg_test_loss, "test_mean_iou": test_mean_iou, "test_mean_accuracy": test_mean_accuracy}
        for k,v in metrics.items():
            self.log(k,v)
        
        return metrics
    
    def configure_optimizers(self):
        return torch.optim.Adam([p for p in self.parameters() if p.requires_grad], lr=2e-05, eps=1e-08)
    
    def train_dataloader(self):
        return self.train_dl
    
    def val_dataloader(self):
        return self.val_dl
    
    def test_dataloader(self):
        return self.test_dl


In [9]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from transformers import SegformerForSemanticSegmentation

# IoU 계산 함수
def compute_iou(predictions, targets, num_classes):
    ious = []
    for cls in range(num_classes):
        pred_inds = predictions == cls
        target_inds = targets == cls
        intersection = torch.sum(pred_inds & target_inds)
        union = torch.sum(pred_inds | target_inds)
        if union == 0:
            ious.append(float('nan'))  # NaN 대신 기본값을 설정 가능
        else:
            ious.append(float(intersection) / float(union))
    return torch.tensor(ious).nanmean()  # NaN 제외 평균 IoU

# SegFormer fine-tuning 클래스
class SegformerFinetuner(pl.LightningModule):
    def __init__(self, id2label, train_dataloader=None, val_dataloader=None, test_dataloader=None, metrics_interval=100):
        super(SegformerFinetuner, self).__init__()
        self.id2label = id2label
        self.metrics_interval = metrics_interval
        self.train_dl = train_dataloader
        self.val_dl = val_dataloader
        self.test_dl = test_dataloader
        
        self.num_classes = len(id2label.keys())
        self.label2id = {v: k for k, v in self.id2label.items()}
        
        # Pretrained SegFormer 모델 로드
        self.model = SegformerForSemanticSegmentation.from_pretrained(
            "nvidia/segformer-b5-finetuned-cityscapes-1024-1024",
            num_labels=self.num_classes,
            id2label=self.id2label,
            label2id=self.label2id,
            ignore_mismatched_sizes=True
        )
        
        # 클래스 가중치 (4번 클래스에 가중치 부여)
        self.class_weights = torch.ones(self.num_classes)
        self.class_weights[4] = 15.0  # 클래스 4에 높은 가중치 부여

    def forward(self, images, masks):
        outputs = self.model(pixel_values=images, labels=masks)
        return outputs

    def training_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]

        # Logits upsampling to match mask size
        upsampled_logits = torch.nn.functional.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted = upsampled_logits.argmax(dim=1)

        # 가중치를 적용한 cross-entropy 손실 계산
        loss = F.cross_entropy(upsampled_logits, masks, weight=self.class_weights.to(self.device))
        
        # IoU 계산
        mean_iou = compute_iou(predicted, masks, self.num_classes)
        
        metrics = {'loss': loss, 'mean_iou': mean_iou}
        
        if batch_nb % self.metrics_interval == 0:
            for k, v in metrics.items():
                self.log(k, v)

        return metrics

    def validation_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]

        # Logits upsampling to match mask size
        upsampled_logits = torch.nn.functional.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted = upsampled_logits.argmax(dim=1)
        
        # Validation에서도 가중치 적용
        loss = F.cross_entropy(upsampled_logits, masks, weight=self.class_weights.to(self.device))
        
        # IoU 계산
        mean_iou = compute_iou(predicted, masks, self.num_classes)
        
        return {'val_loss': loss, 'val_mean_iou': mean_iou}

    def validation_epoch_end(self, outputs):
        avg_val_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        avg_val_mean_iou = torch.stack([x["val_mean_iou"] for x in outputs]).mean()

        metrics = {"val_loss": avg_val_loss, "val_mean_iou": avg_val_mean_iou}
        for k, v in metrics.items():
            self.log(k, v)

        return metrics

    def test_step(self, batch, batch_nb):
        images, masks = batch['pixel_values'], batch['labels']
        
        outputs = self(images, masks)
        loss, logits = outputs[0], outputs[1]

        # Logits upsampling to match mask size
        upsampled_logits = torch.nn.functional.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted = upsampled_logits.argmax(dim=1)

        # Test에서도 가중치 적용
        loss = F.cross_entropy(upsampled_logits, masks, weight=self.class_weights.to(self.device))
        
        # IoU 계산
        mean_iou = compute_iou(predicted, masks, self.num_classes)
        
        return {'test_loss': loss, 'test_mean_iou': mean_iou}

    def test_epoch_end(self, outputs):
        avg_test_loss = torch.stack([x["test_loss"] for x in outputs]).mean()
        avg_test_mean_iou = torch.stack([x["test_mean_iou"] for x in outputs]).mean()

        metrics = {"test_loss": avg_test_loss, "test_mean_iou": avg_test_mean_iou}
        for k, v in metrics.items():
            self.log(k, v)

        return metrics

    def configure_optimizers(self):
        return torch.optim.Adam([p for p in self.parameters() if p.requires_grad], lr=2e-05, eps=1e-08)

    def train_dataloader(self):
        return self.train_dl

    def val_dataloader(self):
        return self.val_dl

    def test_dataloader(self):
        return self.test_dl


In [10]:
feature_extractor = SegformerFeatureExtractor.from_pretrained("nvidia/segformer-b5-finetuned-cityscapes-1024-1024")
feature_extractor.do_reduce_labels = False
feature_extractor.size = 512

train_dataset = SemanticSegmentationDataset(f"{dataset.location}/train/", feature_extractor)
val_dataset = SemanticSegmentationDataset(f"{dataset.location}/valid/", feature_extractor)
test_dataset = SemanticSegmentationDataset(f"{dataset.location}/test/", feature_extractor)

batch_size = 4
num_workers = 0
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, num_workers=0)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, num_workers=0)

segformer_finetuner = SegformerFinetuner(
    train_dataset.id2label, 
    train_dataloader=train_dataloader, 
    val_dataloader=val_dataloader, 
    test_dataloader=test_dataloader, 
    metrics_interval=10,
)

The class SegformerFeatureExtractor is deprecated and will be removed in version 5 of Transformers. Please use SegformerImageProcessor instead.
The following named arguments are not valid for `SegformerFeatureExtractor.__init__` and were ignored: 'feature_extractor_type'
Some weights of SegformerForSemanticSegmentation were not initialized from the model checkpoint at nvidia/segformer-b5-finetuned-cityscapes-1024-1024 and are newly initialized because the shapes did not match:
- decode_head.classifier.weight: found shape torch.Size([19, 768, 1, 1]) in the checkpoint and torch.Size([5, 768, 1, 1]) in the model instantiated
- decode_head.classifier.bias: found shape torch.Size([19]) in the checkpoint and torch.Size([5]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
early_stop_callback = EarlyStopping(
    monitor="val_loss", 
    min_delta=0.00, 
    patience=, 
    verbose=False, 
    mode="min",
)

checkpoint_callback = ModelCheckpoint(save_top_k=1, monitor="val_loss")

trainer = pl.Trainer(
    gpus=[1], 
    callbacks=[early_stop_callback, checkpoint_callback],
    max_epochs=500,
    val_check_interval=len(train_dataloader),
)
trainer.fit(segformer_finetuner)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3]


In [11]:
import torch
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# IoU 계산 함수
def compute_iou(predictions, targets, num_classes):
    ious = []
    for cls in range(num_classes):
        pred_inds = predictions == cls
        target_inds = targets == cls
        intersection = np.sum(pred_inds & target_inds)
        union = np.sum(pred_inds | target_inds)
        if union == 0:
            ious.append(float('nan'))  # NaN 대신 처리할 값을 넣을 수 있음
        else:
            ious.append(float(intersection) / float(union))
    return np.nanmean(ious)  # NaN을 제외한 평균 IoU

def compute_bbox(predictions, targets, num_classes):
    bbox = []
    for cls in range(num_classegs):
        pred_inds = predictions == cls
        target_inds = targets == cls
        intersection = np.sum

# 클래스별로 정밀도, 재현율, F1-score, 정확도 계산 함수
def compute_classwise_metrics(predictions, targets, num_classes):
    precision = precision_score(targets.flatten(), predictions.flatten(), average=None, labels=range(num_classes))
    recall = recall_score(targets.flatten(), predictions.flatten(), average=None, labels=range(num_classes))
    f1 = f1_score(targets.flatten(), predictions.flatten(), average=None, labels=range(num_classes))
    accuracy = accuracy_score(targets.flatten(), predictions.flatten())
    
    return precision, recall, f1, accuracy

# 모델 추론 및 성능 지표 계산 함수
def evaluate_model_on_test_data(dataloader, model, num_classes):
    iou_list = []
    precision_list, recall_list, f1_list = [], [], []
    
    for batch in dataloader:
        images, masks = batch['pixel_values'], batch['labels']
        
        with torch.no_grad():
            outputs = model(pixel_values=images)
        
        logits = outputs.logits
        upsampled_logits = torch.nn.functional.interpolate(
            logits, 
            size=masks.shape[-2:], 
            mode="bilinear", 
            align_corners=False
        )
        
        predicted_mask = upsampled_logits.argmax(dim=1).cpu().numpy()
        masks = masks.cpu().numpy()

        # IoU 계산
        iou = compute_iou(predicted_mask, masks, num_classes)
        iou_list.append(iou)

        # 정밀도, 재현율, F1-score 계산
        precision, recall, f1, accuracy = compute_classwise_metrics(predicted_mask, masks, num_classes)
        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)

    avg_iou = np.mean(iou_list)
    avg_precision = np.mean(precision_list, axis=0)
    avg_recall = np.mean(recall_list, axis=0)
    avg_f1 = np.mean(f1_list, axis=0)

    return avg_iou, avg_precision, avg_recall, avg_f1, accuracy

# 모델 성능 평가
num_classes = 3  # 클래스 개수
avg_iou, avg_precision, avg_recall, avg_f1, accuracy = evaluate_model_on_test_data(test_dataloader, segformer_finetuner.model, num_classes)

# 결과 출력
print(f"Average IoU: {avg_iou}")
for cls in range(num_classes):
    print(f"Class {cls}: Precision={avg_precision[cls]}, Recall={avg_recall[cls]}, F1-score={avg_f1[cls]}")
print(f"Overall Accuracy: {accuracy}")


Average IoU: 0.08172360687864229
Class 0: Precision=0.4699793903165759, Recall=0.18847458954341678, F1-score=0.2689630662652155
Class 1: Precision=0.18301535205015532, Recall=0.1141222009772243, F1-score=0.14048916045689816
Class 2: Precision=0.014977487904955343, Recall=0.12255718092009951, F1-score=0.026683486146758906
Overall Accuracy: 0.1688995361328125


In [1]:
import torch
import torch.nn.functional as F
from scipy.ndimage import binary_erosion
# Dice Coefficient 계산 함수
def compute_dice_coefficient(predictions, targets, num_classes):
    dice_scores = []
    for cls in range(num_classes):
        pred_inds = predictions == cls
        target_inds = targets == cls
        intersection = torch.sum(pred_inds & target_inds)
        total = torch.sum(pred_inds) + torch.sum(target_inds)
        if total == 0:
            dice_scores.append(float('nan'))  # NaN 대신 기본값을 설정 가능
        else:
            dice_scores.append((2.0 * float(intersection)) / float(total))
    return torch.tensor(dice_scores).nanmean()  # NaN 제외 평균 Dice Coefficient
# Boundary IoU 계산 함수
def compute_boundary_iou(predictions, targets, num_classes, boundary_width=1):
    ious = []
    for cls in range(num_classes):
        pred_inds = predictions == cls
        target_inds = targets == cls
        # 경계 추출: erosion을 이용하여 경계 영역을 얻음
        pred_boundary = pred_inds ^ binary_erosion(pred_inds, structure=np.ones((boundary_width, boundary_width)))
        target_boundary = target_inds ^ binary_erosion(target_inds, structure=np.ones((boundary_width, boundary_width)))
        intersection = torch.sum(pred_boundary & target_boundary)
        union = torch.sum(pred_boundary | target_boundary)
        if union == 0:
            ious.append(float('nan'))  # NaN 대신 기본값을 설정 가능
        else:
            ious.append(float(intersection) / float(union))
    return torch.tensor(ious).nanmean()  # NaN 제외 평균 Boundary IoU
# SegformerFinetuner 클래스에 성능 평가 메서드 추가
class SegformerFinetuner(pl.LightningModule):
    # 기존 코드 유지...
    def evaluate_metrics(self, dataloader):
        self.model.eval()  # 모델을 평가 모드로 전환
        dice_scores = []
        boundary_ious = []
        with torch.no_grad():
            for batch in dataloader:
                images, masks = batch['pixel_values'], batch['labels']
                outputs = self(images, masks)
                logits = outputs[1]
                # Logits upsampling to match mask size
                upsampled_logits = torch.nn.functional.interpolate(
                    logits,
                    size=masks.shape[-2:],
                    mode="bilinear",
                    align_corners=False
                )
                predicted = upsampled_logits.argmax(dim=1)
                # Dice Coefficient 및 Boundary IoU 계산
                dice_score = compute_dice_coefficient(predicted, masks, self.num_classes)
                boundary_iou = compute_boundary_iou(predicted, masks, self.num_classes)
                dice_scores.append(dice_score)
                boundary_ious.append(boundary_iou)
        avg_dice = torch.stack(dice_scores).mean()
        avg_boundary_iou = torch.stack(boundary_ious).mean()
        print(f"Average Dice Coefficient: {avg_dice:.4f}")
        print(f"Average Boundary IoU: {avg_boundary_iou:.4f}")
        return {'avg_dice': avg_dice, 'avg_boundary_iou': avg_boundary_iou}
# 학습된 모델에 대해 테스트 데이터셋을 사용하여 평가 수행
evaluator = segformer_finetuner.evaluate_metrics(test_dataloader)

NameError: name 'pl' is not defined

In [39]:
import os
from PIL import Image
import torch
import numpy as np

# Color map 설정 (배경, 건물, 크랙 등)
color_map = {
    0: (0, 0, 0),      # 배경
    1: (255, 255, 255),  # 건물
    2: (255, 0, 0),     # 크랙
}

# 예측된 마스크를 시각화하는 함수
def prediction_to_vis(prediction):
    vis_shape = prediction.shape + (3,)
    vis = np.zeros(vis_shape)
    for i, c in color_map.items():
        vis[prediction == i] = color_map[i]
    return Image.fromarray(vis.astype(np.uint8))

# 이미지 전처리 함수
def preprocess_image(image_path):
    image = Image.open(image_path).convert("RGB")
    inputs = feature_extractor(images=image, return_tensors="pt")
    return inputs, image

# 폴더 내 모든 이미지에 대해 inference를 수행하고 결과를 시각화하는 함수
def run_inference_on_folder(folder_path, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)  # 결과를 저장할 폴더 생성

    for file_name in os.listdir(folder_path):
        if file_name.endswith(('.png', '.jpg', '.jpeg')):  # 이미지 파일만 처리
            image_path = os.path.join(folder_path, file_name)
            inputs, input_image = preprocess_image(image_path)

            # 모델 추론
            with torch.no_grad():
                outputs = segformer_finetuner.model(pixel_values=inputs['pixel_values'])

            logits = outputs.logits
            upsampled_logits = torch.nn.functional.interpolate(
                logits,
                size=(input_image.size[1], input_image.size[0]),
                mode="bilinear",
                align_corners=False
            )

            # 각 픽셀에서 가장 가능성 높은 클래스를 선택
            predicted_mask = upsampled_logits.argmax(dim=1).cpu().numpy()[0]

            # 시각화
            mask = prediction_to_vis(predicted_mask)
            mask = mask.resize(input_image.size)
            mask = mask.convert("RGBA")

            # 원본 이미지와 예측된 마스크를 오버레이
            input_image = input_image.convert("RGBA")
            overlay_img = Image.blend(input_image, mask, 0.5)

            # 결과 이미지 저장
            output_path = os.path.join(output_folder, f"segmented_{file_name}")
            overlay_img.save(output_path)
            print(f"Saved segmented image with overlay: {output_path}")

# 사용 예시
input_folder = "sv_data"  # 추론할 이미지가 있는 폴더 경로
output_folder = "ft_v7_model_infer_result"  # 결과를 저장할 폴더 경로
run_inference_on_folder(input_folder, output_folder)


Saved segmented image with overlay: ft_v7_model_infer_result/segmented_18.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_95.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_69.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_98.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_56.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_58.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_11.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_87.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_20.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_14.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_66.png
Saved segmented image with overlay: ft_v7_model_infer_result/segmented_13.png
Saved segmented image with overlay: ft_v7_model_infer_result/seg