# Import

In [None]:
import pandas as pd 
import glob
import cv2 as cv
import random
import os

import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image
import PIL.ImageOps    

import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torchvision.utils
import torch
from torch.autograd import Variable
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from sklearn.metrics import f1_score, accuracy_score

from tqdm.auto import tqdm
import timm
import math
from sklearn.model_selection import train_test_split

import segmentation_models_pytorch as smp
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# Utils

In [None]:
# RLE 인코딩 함수
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = 'a'
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
    
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':20,
    'LEARNING_RATE':3e-5,
    # 'LEARNING_RATE':10,
    'BATCH_SIZE':8,
    'SEED':41
}

seed_everything(CFG['SEED']) # Seed 고정

In [None]:
#폴더 이동시 경로 수정이 필요할 수 있음 
train_source = glob.glob("../Data/train_source_image/*")
val_source = glob.glob("../Data/val_source_image/*")
train_gt = glob.glob("../Data/train_source_gt/*")
val_gt = glob.glob("../Data/val_source_gt/*")

train_source += val_source
train_gt += val_gt

# glob 이후에 정렬이 안되어 있기 때문에, source - gt matching을 위해 정렬
train_source.sort()
train_gt.sort()

print(train_source)

In [None]:
# DF 생성 
df = pd.DataFrame(columns=['source','gt'])
df['source'] = train_source
df['gt'] = train_gt
df

# Custom Dataset

In [None]:
class CustomDataset(Dataset):
    def __init__(self, source, gt, transform=None,t2= None, infer=False):
        self.source = source
        self.gt = gt
        self.transform = transform
        self.t2 = t2
        self.infer = infer


    def __getitem__(self, idx):
        img_path = self.source[idx]
        image = cv.imread(img_path)
        
        
        if self.infer:
            label = self.gt[idx]
            if self.transform:
                image = self.transform(image=image)['image']
            return image, label
        
        mask_path = self.gt[idx]
        mask = cv.imread(mask_path, cv.IMREAD_GRAYSCALE)
        mask[mask == 255] = 12 #/ 배경을 픽셀값 12로 간주 이거 원래 없던 값!

        if self.transform: # 알부네이션 먹이이는 형식으로 진행 
            augmented = self.transform(image=image, mask = mask) 
            image = augmented['image']
            
        if self.t2: #현재 들어가는 구조가 다름 totensor 진행해줌
            augmented = self.t2(image=image, mask = mask) 
            mask = augmented['mask']
            
            
        return image, mask
    
    def __len__(self):
        return len(self.source)
    


# Transfrom - Data Augmentation

In [None]:
transform = A.Compose(
    [   
        A.Resize(540, 960),
        A.Normalize(),
        # ToTensorV2()
    ]
)

transform_gt = A.Compose(
    [   
        A.Resize(128, 256), # 반대일 수도 있음 
        A.Normalize(),
        ToTensorV2()
    ]
)


# Data Loader

In [None]:
train, val, _, _ = train_test_split(df, _, test_size=0.3, random_state=CFG['SEED'])

In [None]:
train_dataset = CustomDataset(source = train['source'].values, gt = train['gt'].values, transform=None,t2 = transform_gt, infer=False)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)

val_dataset = CustomDataset(source = val['source'].values, gt = val['gt'].values, transform=None,t2 = transform_gt, infer=False)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True, num_workers=0)

# Model

### SMP API

- model.encoder - pretrained backbone to extract features of different spatial resolution
- model.decoder - depends on models architecture (Unet/Linknet/PSPNet/FPN)
- model.segmentation_head - last block to produce required number of mask channels (include also optional upsampling and activation)
- model.classification_head - optional block which create classification head on top of encoder
- model.forward(x) - sequentially pass x through model`s encoder, decoder and segmentation head (and classification head if specified)

### Model Param
 - Docs - https://www.kaggle.com/code/ligtfeather/semantic-segmentation-is-easy-with-pytorch

In [None]:
from transformers import SegformerFeatureExtractor, SegformerForSemanticSegmentation
from PIL import Image
import requests
import cv2 as cv

feature_extractor = SegformerFeatureExtractor.from_pretrained("nvidia/segformer-b0-finetuned-cityscapes-512-1024", size = {"height": 512,"width": 1024})
# feature_extractor = SegformerFeatureExtractor.from_pretrained("nvidia/segformer-b0-finetuned-cityscapes-512-1024")
# model = SegformerForSemanticSegmentation.from_pretrained("nvidia/segformer-b0-finetuned-cityscapes-512-1024")

In [None]:
# class BaseModel(nn.Module):
#     def __init__(self):
#         super(BaseModel, self).__init__()
#         self.backbone = model
#         self.conv1 = nn.Conv2d(in_channels=19, out_channels=13, kernel_size=1, padding=0)
        
#     def forward(self, x):
#         x = self.backbone(**x).logits
#         x = self.conv1(torch.tensor(x))
        
#         return x
    
# model = BaseModel()
    

In [None]:
import torch
import torch.nn as nn
from transformers import SegformerForSemanticSegmentation, SegformerConfig

# Segformer 모델을 불러오고 구성을 수정합니다.
config = SegformerConfig.from_pretrained("nvidia/segformer-b0-finetuned-cityscapes-512-1024")
# config.num_labels = 13  # 분할 클래스 수에 맞게 수정
config.num_labels = 19  # 이거 테스트 해보자. 일단 그냥 19로 하고 아래쪽에 conv 레이어 한번더 


segformer_model = SegformerForSemanticSegmentation.from_pretrained("nvidia/segformer-b0-finetuned-cityscapes-512-1024", config=config, ignore_mismatched_sizes=True)

# 분할 모델을 만듭니다.
class SegmentationModel(nn.Module):
    def __init__(self):
        super(SegmentationModel, self).__init__()
        self.backbone = segformer_model
        self.conv1 = nn.Conv2d(in_channels=config.num_labels, out_channels=13, kernel_size=1, padding=0)
        
    def forward(self, x):
        x = self.backbone(**x).logits
        x = self.conv1(x) # 위랑 연결 
        
        return x

# 모델을 초기화합니다.
model = SegmentationModel() 


In [None]:
print(model)

# Validation

In [None]:
# define mIoU for Score >> 가져온 함수여서... batch 사이즈에 대한 고려가 안되어 있을 수 있음
def mIoU(pred_mask, mask, smooth=1e-10, n_classes=13):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)

        iou_per_class = []
        for clas in range(0, n_classes): #loop per pixel class
            true_class = pred_mask == clas
            true_label = mask == clas

            if true_label.long().sum().item() == 0: #no exist label in this loop
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union +smooth)
                iou_per_class.append(iou)
                
    return np.nanmean(iou_per_class) , iou_per_class

In [None]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = 0
    val_score = 0
    Check_list = []
    with torch.no_grad():
        for source , gt in tqdm(iter(val_loader)):
            inputs = feature_extractor(images=source, return_tensors="pt")
            inputs = inputs.to(device)
            gt = gt.long().to(device)
            
            outputs = model(inputs)
            
            loss = criterion(outputs, gt.squeeze(1))
            val_loss += loss.item()
            a, b = mIoU(outputs, gt)
            val_score += a
            Check_list.append(b)
    Check_list = np.array(Check_list)
    print(np.nanmean(Check_list, axis=0))
    return val_loss/len(val_loader) , val_score/len(val_loader)
    

# Train

In [None]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    # Model load 
    model = model.to(device) # 그냥 model.to(device)만 하면 저장 안됨

    # iou_values = [0.93176631,0.63241147,0.88556955,0.74984703,0.45483095,0.57318848, 0.66932274,0.87954137,0.96864079,0.3430822,0.36345519,0.910648,0.96767449 ]
    # iou_array = np.array(iou_values, dtype=np.float32)
    # class_weights = 1.0 / iou_array
    # class_weights /= class_weights.sum()
    # class_weights_tensor = torch.tensor([class_weights], dtype=torch.float32, device=device)
    # criterion = torch.nn.CrossEntropyLoss(weight=class_weights_tensor)
    # iou_values = [0.93176631, 0.63241147, 0.88556955, 0.74984703, 0.45483095, 0.57318848, 0.66932274, 0.87954137, 0.96864079, 0.3430822, 0.36345519, 0.910648, 0.96767449]
    # iou_array = np.array(iou_values, dtype=np.float32)
    # class_weights = 1.0 / iou_array
    # class_weights /= class_weights.sum()
    # class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32, device=device)
    
    # Define your model, optimizer, and data loaders here
    
    # Create the CrossEntropyLoss criterion with class weights
    # criterion = torch.nn.CrossEntropyLoss(weight=class_weights_tensor)
    
    # loss function과 optimizer 정의
    criterion = torch.nn.CrossEntropyLoss()
    
    best_score = 0
    best_model = None

    for epoch in range(0, CFG['EPOCHS']):
        model.train()
        train_loss = 0
        
        for source , gt in tqdm(train_loader):
            
            inputs = feature_extractor(images=source, return_tensors="pt")
            inputs = inputs.to(device)
            gt = gt.long().to(device)
            
            
            optimizer.zero_grad() #! 이건 뭐해주는거지?? 추후에 확인 필
            # outputs = model(**inputs)
            outputs = model(inputs)
            
            # logits = outputs.logits
            # outputs = outputs.logits.to("cpu").detach().numpy()
            # outputs = torch.from_numpy(outputs).float()
            # outputs = outputs.to(device)
            # outputs.requires_grad_(True)
            loss = criterion(outputs, gt.squeeze(1))
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            
        _train_loss = train_loss/len(train_loader)
    
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val accuracy score : [{_val_score:.5f}]')
         
        if scheduler is not None:
            scheduler.step(_val_score)
        
        if best_score < _val_score:
            best_score = _val_score
            best_model = model
            torch.save(best_model.state_dict(), "./models/NM.pt")
    
    return best_model

In [16]:
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

KeyboardInterrupt: 

# EVAL

In [None]:
df_test = pd.read_csv('../DataPreprocessing/front_OR_back.csv')
df_test

In [None]:
test_dataset = CustomDataset(source = df_test['source'].values ,gt = df_test['label'].values , transform=None,t2=None, infer=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=0 )

In [None]:
mask_0 = np.load("../DataPreprocessing/mask0.npy")
mask_1 = np.load("../DataPreprocessing/mask1.npy")

In [None]:
with torch.no_grad():
    # infer_model.eval()
    model = model.to(device)
    result = []
    for images, label in tqdm(test_loader):
        # images = images.float().to(device)
        # images = images.to(device)
        # print(images)
        # images = cv.imread(images)
        
        inputs = feature_extractor(images=images, return_tensors="pt")
        inputs = inputs.to(device)
        outputs = model(inputs)
        
        outputs = nn.functional.interpolate(outputs,size=(540,960), mode='bilinear',align_corners=False)
        outputs = outputs.argmax( dim=1).to("cpu").numpy()
        
        flag = True
        for pred , l in zip(outputs,label):
            new_pred = np.array(pred)
            new_pred = new_pred.astype(np.uint8)
            # print(new_pred.shape)
            
            if l == 0:
                new_pred[~mask_0] = 12
            else:
                new_pred[~mask_1] = 12
             
            if flag:   
                np.save('./test_img.npy', new_pred)
                flag = False
                
            for class_id in range(12):
                class_mask = (new_pred == class_id).astype(np.uint8)
                if np.sum(class_mask) > 0: # 마스크가 존재하는 경우 encode
                    mask_rle = rle_encode(class_mask)
                    result.append(mask_rle)
                else: # 마스크가 존재하지 않는 경우 -1
                    result.append(-1)

# Submisssion

In [None]:
submit = pd.read_csv('../Data/sample_submission.csv')
submit['mask_rle'] = result
submit

In [None]:
submit.to_csv('./NM.csv', index=False)