In [None]:
from google.colab import drive
drive.mount('/content/drive')

#make meta data, remove null label

In [None]:
import os
import pandas as pd

CSV_FILE_PATH = ''
IMAGE_DIR = ''

df = pd.read_csv(CSV_FILE_PATH)

def clean_and_check_image_data(df, image_dir):
    valid_data = []
    missing_in_metadata = []

    for idx, row in df.iterrows():
        job_no = str(row["job_no"])
        image_name = row["image_name"]
        image_path = os.path.join(image_dir, job_no, image_name)

        if os.path.exists(image_path):
            valid_data.append(row)
        else:
            print(f"Image missing: {image_path}")

    cleaned_df = pd.DataFrame(valid_data)

    return cleaned_df


In [None]:

df2 = clean_and_check_image_data(df, IMAGE_DIR)

In [None]:
print(len(df),len(df2))

In [None]:
df = df2
df.to_csv('', index = False)

#import library

In [None]:
import cv2
import os
import pandas as pd
import torch
import numpy as np
import random
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from tqdm import tqdm  # Import tqdm for the progress bar
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, balanced_accuracy_score
from sklearn.metrics import average_precision_score



os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
torch.cuda.empty_cache()
from torch.utils.data import WeightedRandomSampler # for data augmentation


#Common.py: including configuration, seed_everything

In [None]:
# Paths
CSV_FILE_PATH = ''
IMAGE_DIR = ''

#Image resolution
IMG_SIZE = 456

# DataLoader settings
BATCH_SIZE = 16
# Training settings
NUM_EPOCHS = 10
LEARNING_RATE = 1e-3 #3e-4 #0.001 -> 0.0003

# Normalization parameters for pretrained models
NORMALIZE_MEAN = [0.485, 0.456, 0.406]
NORMALIZE_STD = [0.229, 0.224, 0.225]

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
SEED = 42

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

#data.py

In [None]:
# Define a PyTorch Dataset
class SmokeAlarmDataset(Dataset):
    def __init__(self, df, image_dir, transform=None):
        self.df = df
        self.image_dir = image_dir
        self.transform = transform
        self.label_map = {"Approved": 1, "Declined": 0}
        self.type_map = {"ExpiryImages": 1, "Images": 0}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # Get job number and image name
        image_name = self.df.iloc[idx]["image_name"]

        # Map the label
        label = self.df.iloc[idx]["image_status"]
        label = self.label_map.get(label, 0)

        # Map the image_status
        image_status = self.df.iloc[idx]["image_status"]
        type_channel_value = self.type_map.get(image_status, 0)

        if image_status == "Approved":
            folder = "approved"
        elif image_status == "Declined":
            folder = "declined"
        else:
            raise ValueError(f"Unexpected image_status: {image_status}")

        # Construct image path and load the image
        image_path = os.path.join(self.image_dir, folder, image_name)
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert to RGB

        # Create image type channel
        type_channel = np.full((image.shape[0], image.shape[1], 1), type_channel_value, dtype=np.uint8)

        # combine image & image_status
        combined_image = np.concatenate((image, type_channel), axis=-1)  # (H, W, 4)

        # Apply transform if available
        if self.transform:
            combined_image = self.transform(image=combined_image)["image"]

        # Return image and label
        return combined_image, label


#model.py

In [None]:
class BaseModel(nn.Module):
    def __init__(self):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b5(pretrained=True)  # pretrained 대신 weights 사용

        self.backbone.features[0][0] = nn.Conv2d(
            in_channels = 4,
            out_channels = 48,
            kernel_size = (3,3),
            stride = (2,2),
            padding = (1,1),
            bias = False
        )

        # modify the output layer of EfficientNet to match the number of classes
        self.backbone.classifier = nn.Sequential(
            nn.Dropout(p=0.3, inplace=True),
            nn.Linear(self.backbone.classifier[1].in_features, 1)
        )

    def forward(self, x):
        return self.backbone(x)

#train.py

In [None]:
from albumentations.core.transforms_interface import ImageOnlyTransform

class ToTensorV2Custom(ImageOnlyTransform):
    def __init__(self, always_apply=False, p=1.0):
        super(ToTensorV2Custom, self).__init__(always_apply, p)

    def apply(self, img, **params):
        # Albumentations expects images in HWC format (Height, Width, Channels)
        return torch.from_numpy(img.transpose(2, 0, 1))  # Convert to CHW format

    def get_transform_init_args_names(self):
        return ()


train_transform = A.Compose([

        A.Resize(IMG_SIZE, IMG_SIZE),
        A.Normalize(mean = NORMALIZE_MEAN, std = NORMALIZE_STD,
                    max_pixel_value= 255.0, always_apply=False, p = 1.0), #normalization
        ToTensorV2Custom()

])

test_transform = A.Compose([

        A.Resize(IMG_SIZE, IMG_SIZE),
        A.Normalize(mean = NORMALIZE_MEAN, std = NORMALIZE_STD,
                    max_pixel_value= 255.0, always_apply=False, p = 1.0), #normalization
        ToTensorV2Custom()

])

df = pd.read_csv(CSV_FILE_PATH)
#adding weight
num_declined = (df['image_status'] == 'Declined').sum()  # Declined 클래스 수
num_approved = (df['image_status'] == 'Approved').sum()  # Approved 클래스 수
pos_weight = torch.tensor([num_approved / num_declined], dtype=torch.float).to(DEVICE)


criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

#get rid of sampler(adding weight to prevent overfitting)
def load_data(df, image_dir, train_transform, test_transform, batch_size):

    # Split data into train, validation, and test sets
    train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['image_status'])
    val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42, stratify=temp_df['image_status'])

    # Initialize datasets
    train_dataset = SmokeAlarmDataset(train_df, image_dir, transform=train_transform)
    val_dataset = SmokeAlarmDataset(val_df, image_dir, transform=test_transform)
    test_dataset = SmokeAlarmDataset(test_df, image_dir, transform=test_transform)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # return train_loader, val_loader, test_loader, le
    return train_loader, val_loader, test_loader, train_df, val_df, test_df


def train(model, train_loader, val_loader, criterion, optimizer, scheduler, DEVICE):
    model.to(DEVICE)
    best_score = 0
    best_model = None
    train_losses = []
    train_mAPs = []
    val_losses = []
    val_mAPs = []
    all_train_labels = []  # save train true_labels for all epochs
    all_train_probs = []   # save train predictions_prob for all epochs
    all_val_labels = []    # save val true_labels for all epochs
    all_val_probs = []     # save val predictions_prob for all epochs

    for epoch in range(NUM_EPOCHS):
        model.train()
        batch_losses = []
        true_labels = []
        predictions_prob = []

        # Training Loop
        for img, label in tqdm(iter(train_loader), desc=f"Training Epoch {epoch + 1}/{NUM_EPOCHS}"):
            label = label = label.float()
            img, label = img.float().to(DEVICE), label.to(DEVICE)

            optimizer.zero_grad()
            pred = model(img)
            loss = criterion(pred, label.unsqueeze(1))
            loss.backward()
            optimizer.step()

            batch_losses.append(loss.item())

            # Save predicted and true labels
            true_labels += label.int().cpu().numpy().flatten().tolist()
            predictions_prob += pred.sigmoid().detach().cpu().numpy().flatten().tolist()

        # Calculate the average training loss for this epoch
        avg_train_loss = np.mean(batch_losses)
        train_losses.append(avg_train_loss)

        # Calculate mAP
        train_mAP = average_precision_score(true_labels, predictions_prob)
        train_mAPs.append(train_mAP)

        # Save true_labels and predictions_prob for the current epoch of training
        all_train_labels.append(true_labels)
        all_train_probs.append(predictions_prob)

        # Validation step
        val_results = validation(model, criterion, val_loader, DEVICE)
        val_losses.append(val_results["val_loss"])
        val_mAPs.append(val_results["val_mAP"])

        # Save true_labels and predictions_prob for the validation data
        all_val_labels.append(val_results["true_labels"])
        all_val_probs.append(val_results["predictions_prob"])

        # Print results for the current epoch
        print(f"Epoch [{epoch + 1}/{NUM_EPOCHS}], Train Loss: {avg_train_loss:.5f}, Train mAP: {train_mAP:.5f}, "
              f"Val Loss: {val_results['val_loss']:.5f}, Val mAP: {val_results['val_mAP']:.5f}")

        # Save the best performing model
        if val_results["val_mAP"] > best_score:
            best_score = val_results["val_mAP"]
            best_model = model.state_dict()  # Save model weights

        if scheduler:
            scheduler.step()

    return train_losses, train_mAPs, val_losses, val_mAPs, all_train_labels, all_train_probs, all_val_labels, all_val_probs, best_model

# Updated validation function with DEVICE applied
def validation(model, criterion, val_loader, DEVICE):
    model.eval()
    model_pred = []
    true_labels = []
    val_loss = []
    predictions_prob = []

    with torch.no_grad():
        for img, label in tqdm(iter(val_loader)):
            label = label.type(torch.FloatTensor)
            img, label = img.float().to(DEVICE), label.to(DEVICE)

            pred = model(img)
            loss = criterion(pred, label.unsqueeze(1))

            val_loss.append(loss.item())

            predictions_prob += pred.sigmoid().detach().cpu().numpy().flatten().tolist()
            model_pred += (pred.sigmoid() > 0.5).int().cpu().numpy().flatten().tolist() # Save binary predictions
            true_labels += label.detach().cpu().numpy().tolist()

    avg_val_loss = np.mean(val_loss)
    val_mAP = average_precision_score(true_labels, predictions_prob)

    return {
        "val_loss": avg_val_loss,
        "val_mAP": val_mAP,
        "true_labels": true_labels,          # Return true_labels for PR Curve
        "predictions_prob": predictions_prob  # Return predictions_prob for PR Curve
    }



#train.py

In [None]:

train_loader, val_loader, test_loader, train_df, val_df, test_df = load_data(df,IMAGE_DIR,train_transform,test_transform,BATCH_SIZE)

seed_everything(seed= 42)

model = BaseModel()

optimizer = torch.optim.Adam(params = model.parameters(), lr=LEARNING_RATE)#lr=1e-3

scheduler = None # i also remove it

# Train the model with the correct argument order
train_losses, train_mAPs, val_losses, val_mAPs, all_train_labels, all_train_probs, all_val_labels, all_val_probs, best_model_weights = train(
    model, train_loader, val_loader, criterion, optimizer, scheduler, DEVICE
)

torch.save(best_model_weights, 'model.pt')

#train/validation curve, loss, mAP

In [None]:
# torch.save(best_model_weights,'/content/drive/MyDrive/PhotoQA/model.pt')

import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_recall_curve

# 1. Train/Validation Loss Curve
def plot_loss_curve(train_losses, val_losses):
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Train and Validation Loss Curve')
    plt.legend()
    plt.show()

# 2. Train/Validation mAP Curve
def plot_map_curve(train_mAPs, val_mAPs):
    plt.figure(figsize=(10, 5))
    plt.plot(train_mAPs, label='Train mAP')
    plt.plot(val_mAPs, label='Validation mAP')
    plt.xlabel('Epoch')
    plt.ylabel('mAP')
    plt.title('Train and Validation mAP Curve')
    plt.legend()
    plt.show()

# 3. Confusion Matrix
def plot_confusion_matrix(true_labels, predictions, class_names):
    cm = confusion_matrix(true_labels, predictions)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap='Blues')
    plt.title('Confusion Matrix')
    plt.show()

# 4. PR Curve
def plot_pr_curve(true_labels, predictions_prob):
    precision, recall, _ = precision_recall_curve(true_labels, predictions_prob)
    plt.figure(figsize=(10, 5))
    plt.plot(recall, precision, marker='.')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.show()

#train_losses, train_mAPs, val_losses, val_mAPs
#all_train_labels, all_train_probs, all_val_labels, all_val_probs, best_model_weights
plot_loss_curve(train_losses, val_losses)
plot_map_curve(train_mAPs, val_mAPs)


#test.py


In [None]:
def inference(model, test_loader, device):
    model.eval()
    model_pred = []          # Store binary predictions
    predictions_prob = []    # Store prediction probabilities
    true_labels = []         # Store true labels (if labels are available in the test dataset)
    samples = []             # Store tuples of (image, predicted class, prediction probability)

    with torch.no_grad():
        for data in tqdm(iter(test_loader)):
            # If data is in the form of (images, labels) or (images,), consider the first element as images
            if isinstance(data, (list, tuple)):
                img = data[0]  # Use the first element as the image tensor
                if len(data) > 1:
                    true_labels += data[1].cpu().numpy().tolist()  # Store true labels
            else:
                img = data

            img = img.float().to(device)
            pred = model(img)

            # Convert logits to probabilities using Sigmoid and classify based on a threshold of 0.5
            prob = pred.sigmoid().detach().cpu().numpy().flatten()  # Store prediction probabilities
            predictions_prob.extend(prob)
            class_pred = (prob > 0.5).astype(int).tolist()          # Convert to predicted classes
            model_pred.extend(class_pred)

            # Store samples with image and prediction information (for test visualization)
            for image, prediction, probability in zip(img.cpu(), class_pred, prob):
                samples.append((image, prediction, probability))

    return model_pred, predictions_prob, true_labels, samples


In [None]:
# Set the random seed
seed_everything(42)

model = BaseModel().to(DEVICE)
model.load_state_dict(torch.load('model.pt', map_location=DEVICE))

# Run inference
model_pred, predictions_prob, true_labels, samples = inference(model, test_loader, DEVICE)

from sklearn.metrics import f1_score, precision_recall_curve, confusion_matrix

precision, recall, thresholds = precision_recall_curve(true_labels, predictions_prob)
f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)

print("f1 score: ", f1_scores)
print("precision: ", precision)
print("recall: ", recall)

# Perform necessary visualizations and post-processing
# Example: Plot PR Curve and Confusion Matrix
plot_pr_curve(true_labels, predictions_prob)
plot_confusion_matrix(true_labels, model_pred, ["Declined", "Approved"])


In [None]:
def denormalize(image, mean, std):
    # Perform denormalization for each channel
    for c in range(3):  # RGB channels (C, H, W)
        image[c] = image[c] * std[c] + mean[c]
    return image

# ImageNet statistics
imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std = [0.229, 0.224, 0.225]

for i, (image, prediction, probability) in enumerate(samples):
    actual = true_labels[i]

    if (prediction == 1 and actual == 0) or (prediction == 0 and actual == 1):
        status = "False Positive" if prediction == 1 else "False Negative"

        # Denormalize the image
        image = denormalize(image.clone(), imagenet_mean, imagenet_std)  # Use .clone() to avoid modifying the original

        # Transform dimensions and convert to numpy for visualization
        image = image.permute(1, 2, 0).detach().cpu().numpy()  # (C, H, W) -> (H, W, C)
        image = image.clip(0, 1)  # Clip values to the range 0–1

        # Visualization
        plt.imshow(image)
        plt.title(f"{status}: Prediction: {'Declined' if prediction == 0 else 'Approved'} ({probability:.2f})")
        plt.axis("off")
        plt.show()