## Installing and importing libraries

In [None]:
!pip install evaluate
!pip install accelerate

In [1]:
import os
import torch
import random
import evaluate
import numpy as np
from PIL import ImageOps, ImageFilter
from torchvision import datasets, models, transforms
from transformers import AutoImageProcessor, AutoModelForImageClassification, TrainingArguments, Trainer


## Training on training data and Evaluation on validation data

### model checkpoint

In [2]:
checkpoint = "microsoft/swin-tiny-patch4-window7-224"

### image processor 

In [3]:
image_processor = AutoImageProcessor.from_pretrained(checkpoint, use_fast=True)

### transforming data for model

In [4]:

normalize = transforms.Normalize(mean=image_processor.image_mean, std=image_processor.image_std)

size = (
    image_processor.size["shortest_edge"]
    if "shortest_edge" in image_processor.size
    else (image_processor.size["height"], image_processor.size["width"])
)

_transforms =  {
    'training' :transforms.Compose([transforms.Resize(size), transforms.ToTensor(), normalize]),

    'validation' :transforms.Compose([transforms.Resize(size), transforms.ToTensor(), normalize])
}

### dataset loading

In [None]:

data_dir = "Dataset"


image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          _transforms[x])
                  for x in ['training', 'validation']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['training', 'validation']}

class_names = image_datasets['validation'].classes

print("classes ============ ",class_names)

label2id, id2label = dict(), dict()

for i, label in enumerate(class_names):
    label2id[label] = str(i)
    id2label[str(i)] = label


### sampling 0.08 % of normal class

In [None]:

# Function to sample the dataset
def sample_normal_class(dataset, class_name, sample_fraction=0.1):
    # Identify indices of the specified class
    class_indices = [i for i, (_, label) in enumerate(dataset.samples) if label == int(label2id[class_name])]
    print(f"Class {class_name} length:", len(class_indices))  # Should be > 0 if the class exists

    # Sample only if there are indices available
    if len(class_indices) > 0:
        sample_size = int(len(class_indices) * sample_fraction)
        sampled_indices = random.sample(class_indices, sample_size)
        print("Sample length:", len(sampled_indices))  # Should be > 0
    else:
        sampled_indices = []  # No samples if class length is 0
        print("No samples to draw from the specified class.")

    # Get indices for all other classes
    other_class_indices = [i for i in range(len(dataset)) if dataset.samples[i][1] != int(label2id[class_name])]

    combined_indices = other_class_indices + sampled_indices
    return combined_indices

# Sample the "Normal" class and create a new dataset
sampled_training_indices = sample_normal_class(image_datasets['training'], 'Normal', sample_fraction=0.08)
sampled_validation_indices = sample_normal_class(image_datasets['validation'], 'Normal', sample_fraction=0.08)


# Create a new dataset based on sampled indices
class SampledImageFolder(datasets.ImageFolder):
    def __init__(self, dataset, indices):
        super().__init__(dataset.root, dataset.transform)
        self.samples = [dataset.samples[i] for i in indices]
        self.targets = [dataset.targets[i] for i in indices]

# Instantiate the sampled dataset
sampled_training_dataset = SampledImageFolder(image_datasets['training'], sampled_training_indices)
sampled_validation_dataset = SampledImageFolder(image_datasets['validation'], sampled_validation_indices)



### collate function for data per batch

In [9]:

def collate_fn(batch):
    data = {}
    data["pixel_values"] = torch.stack([x[0] for x in batch])
    data["labels"] = torch.tensor([x[1] for x in batch])

    return data

### metrics

In [10]:
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1_score = evaluate.load("f1")
balanced_accuracy = evaluate.load("hyperml/balanced_accuracy")


In [11]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)

    acc = accuracy.compute(predictions=predictions, references=labels)
    prn = precision.compute(predictions=predictions, references=labels, average="weighted")
    rel = recall.compute(predictions=predictions, references=labels, average="weighted")
    f1 = f1_score.compute(predictions=predictions, references=labels, average="weighted")
    bal_acc = balanced_accuracy.compute(predictions=predictions, references=labels)

    return {"accuracy" : acc["accuracy"],
            "precision" : prn["precision"],
            "recall" : rel["recall"],
            "f1" : f1["f1"],
            "balanced_accuracy": bal_acc.get("balanced_accuracy", None)
            }


### model loading

In [12]:

model = AutoModelForImageClassification.from_pretrained(
    checkpoint,
    num_labels=len(class_names),
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes = True
)

### training arguments

In [19]:
training_args = TrainingArguments(
    output_dir="SWIN_MEDICAL_6",
    remove_unused_columns=False,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=1,
    per_device_eval_batch_size=4,
    num_train_epochs=30,
    warmup_ratio=0.1,
    greater_is_better=True,
    load_best_model_at_end=True,
    save_total_limit=2,
    metric_for_best_model="balanced_accuracy",
    report_to="none"
)


### setting up trainer

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=collate_fn,

    train_dataset=sampled_validation_dataset,
    eval_dataset=sampled_validation_dataset,
    tokenizer=image_processor,
    compute_metrics=compute_metrics,
)


### model training and evaluation

In [None]:

train_results = trainer.train()

trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()

In [None]:
metrics = trainer.evaluate()

trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)

### zip the model folder and download 

In [None]:
!zip -r "/kaggle/working/SWIN_MEDICAL_6.zip" "/kaggle/working/SWIN_BASE_MEDICAL_6"

### Evaluation per class and mean AUC

In [None]:
from transformers import AutoImageProcessor, AutoModelForImageClassification
import torch
import cv2
import os
import time
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import label_binarize

def swin_infer(image, model, device):
    with torch.no_grad():
        inputs = image_processor(image, return_tensors="pt").to(device)
        logits = model(**inputs).logits
        probabilities = torch.softmax(logits, dim=-1).cpu().numpy()
        predicted_label = logits.argmax(-1).item()
        label = model.config.id2label[predicted_label]
        if probabilities.ndim == 1:
            probabilities = probabilities.reshape(1, -1)
        return label, probabilities

def evaluate_classification(actual_dict, predicted_dict, class_names, predicted_probs):
    y_true = []
    y_pred = []
    for cls in class_names:
        y_true.extend([cls] * actual_dict.get(cls, 0))
        y_pred.extend([cls] * predicted_dict.get(cls, 0))

    class_to_index = {cls: idx for idx, cls in enumerate(class_names)}
    y_true = np.array([class_to_index[label] for label in y_true])
    y_pred = np.array([class_to_index[label] for label in y_pred])
    
    # Compute confusion matrix and other metrics
    conf_matrix = confusion_matrix(y_true, y_pred, labels=list(class_to_index.values()))
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

    return conf_matrix, accuracy, precision, recall, f1

# Class names and folder structure
class_names = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
sub_folders = ["KID", "KVASIR", "SEE-AI", "AIIMS"]
val_folder = "/kaggle/input/misahub-capsule-vision-training-challenge-2024/Dataset/validation"

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_processor = AutoImageProcessor.from_pretrained("/kaggle/working/SWIN_MEDICAL_6")
model = AutoModelForImageClassification.from_pretrained("/kaggle/working/SWIN_MEDICAL_6")
model.to(device)

# Timing and results storage
time_for_all_image = []
final_confusion_matrix = np.zeros((len(class_names), len(class_names)), dtype=int)
all_predicted_probs = []
all_true_labels = []

# Evaluation loop
for cls in class_names:
    total_samples_classes = {cls: 0}
    pred_samples_classes = {cls: 0}
    for fldr in sub_folders:
        orig_classes = {cls: 0 for cls in class_names}
        classes = {cls: 0 for cls in class_names}
        folder = os.path.join(val_folder, cls, fldr)
        if os.path.exists(folder):
            images = os.listdir(folder)
            orig_classes[cls] += len(images)
            total_samples_classes[cls] += len(images)
            for name in images:
                start = time.process_time()
                path = os.path.join(folder, name)
                image = cv2.imread(path)
                lbl, probabilities = swin_infer(image, model, device)
                all_predicted_probs.append(probabilities)
                all_true_labels.append(class_names.index(cls))  # Store the true class index
                if lbl not in classes:
                    classes[lbl] = 0
                if lbl not in pred_samples_classes:
                    pred_samples_classes[lbl] = 0
                classes[lbl] += 1
                pred_samples_classes[lbl] += 1
                end = time.process_time()
                infer_time = end - start
                time_for_all_image.append(infer_time)

            print(f"Result per class ----- {cls} subfolder ------- {fldr}")
            print("Actual    ------", orig_classes)
            print("Predicted ------", classes)
            print(f"Total samples = {orig_classes[cls]} Predicted Samples = {classes[cls]}")
            conf_matrix, accuracy, precision, recall, f1 = evaluate_classification(
                orig_classes,
                classes,
                class_names,
                np.vstack(all_predicted_probs) if all_predicted_probs else np.empty((0, len(class_names)))
            )
            print("Confusion Matrix:")
            print(conf_matrix)
            print(f"Accuracy: {accuracy:.4f}")
            print(f"Precision: {precision:.4f}")
            print(f"Recall: {recall:.4f}")
            print(f"F1 Score: {f1:.4f}")
            final_confusion_matrix += conf_matrix

# Calculate Mean AUC only once at the end
y_true_bin = label_binarize(all_true_labels, classes=range(len(class_names)))
all_predicted_probs = np.vstack(all_predicted_probs)  # Convert list to array

mean_auc = 0
valid_classes = 0
for i in range(len(class_names)):
    if np.sum(y_true_bin[:, i]) > 0 and np.sum(1 - y_true_bin[:, i]) > 0:  # Ensure both positive and negative samples exist
        mean_auc += roc_auc_score(y_true_bin[:, i], all_predicted_probs[:, i])
        valid_classes += 1

if valid_classes > 0:
    mean_auc /= valid_classes  # Average AUC across valid classes
else:
    print("No valid classes for AUC calculation.")

# Final results
print(" --------------------- Full Result --------------------- ")
print("Total Confusion Matrix:")
print(final_confusion_matrix)
print("Total images:", len(time_for_all_image))
print("Total time taken:", sum(time_for_all_image))
print("Time taken per image:", sum(time_for_all_image) / len(time_for_all_image))
print(f"Final Mean AUC: {mean_auc:.4f}")


## Retraining on Validation data

In [None]:

checkpoint = "SWIN_MEDICAL_6/kaggle/working/SWIN_MEDICAL_6"

In [None]:
image_processor = AutoImageProcessor.from_pretrained(checkpoint, use_fast=True)

In [None]:

model = AutoModelForImageClassification.from_pretrained(
    checkpoint,
    num_labels=len(class_names),
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes = True
)

In [None]:
training_args = TrainingArguments(
    output_dir="/kaggle/working/SWIN_MEDICAL_6_validation",
    remove_unused_columns=False,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=1,
    per_device_eval_batch_size=4,
    num_train_epochs=20,
    warmup_ratio=0.1,
    greater_is_better=True,
    load_best_model_at_end=True,
    save_total_limit=2,
    # metric_for_best_model="accuracy",
    metric_for_best_model="balanced_accuracy",
    report_to="none"
)


In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=collate_fn,
    train_dataset=sampled_validation_dataset,
    eval_dataset=sampled_validation_dataset,
    tokenizer=image_processor,
    compute_metrics=compute_metrics,
)


In [None]:

train_results = trainer.train()

trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()

In [None]:
metrics = trainer.evaluate()

trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)

In [None]:
!zip -r "/kaggle/working/SWIN_MEDICAL_6_validation.zip" "/kaggle/working/SWIN_MEDICAL_6_validation"


In [None]:
from transformers import AutoImageProcessor, AutoModelForImageClassification
import torch
import cv2
import os
import time
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import label_binarize

def swin_infer(image, model, device):
    with torch.no_grad():
        inputs = image_processor(image, return_tensors="pt").to(device)
        logits = model(**inputs).logits
        probabilities = torch.softmax(logits, dim=-1).cpu().numpy()
        predicted_label = logits.argmax(-1).item()
        label = model.config.id2label[predicted_label]
        if probabilities.ndim == 1:
            probabilities = probabilities.reshape(1, -1)
        return label, probabilities

def evaluate_classification(actual_dict, predicted_dict, class_names, predicted_probs):
    y_true = []
    y_pred = []
    for cls in class_names:
        y_true.extend([cls] * actual_dict.get(cls, 0))
        y_pred.extend([cls] * predicted_dict.get(cls, 0))

    class_to_index = {cls: idx for idx, cls in enumerate(class_names)}
    y_true = np.array([class_to_index[label] for label in y_true])
    y_pred = np.array([class_to_index[label] for label in y_pred])
    
    # Compute confusion matrix and other metrics
    conf_matrix = confusion_matrix(y_true, y_pred, labels=list(class_to_index.values()))
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

    return conf_matrix, accuracy, precision, recall, f1

# Class names and folder structure
class_names = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
sub_folders = ["KID", "KVASIR", "SEE-AI", "AIIMS"]
val_folder = "/kaggle/input/misahub-capsule-vision-training-challenge-2024/Dataset/validation"

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_processor = AutoImageProcessor.from_pretrained("/kaggle/working/SWIN_MEDICAL_6_validation")
model = AutoModelForImageClassification.from_pretrained("/kaggle/working/SWIN_MEDICAL_6_validation")
model.to(device)

# Timing and results storage
time_for_all_image = []
final_confusion_matrix = np.zeros((len(class_names), len(class_names)), dtype=int)
all_predicted_probs = []
all_true_labels = []

# Evaluation loop
for cls in class_names:
    total_samples_classes = {cls: 0}
    pred_samples_classes = {cls: 0}
    for fldr in sub_folders:
        orig_classes = {cls: 0 for cls in class_names}
        classes = {cls: 0 for cls in class_names}
        folder = os.path.join(val_folder, cls, fldr)
        if os.path.exists(folder):
            images = os.listdir(folder)
            orig_classes[cls] += len(images)
            total_samples_classes[cls] += len(images)
            for name in images:
                start = time.process_time()
                path = os.path.join(folder, name)
                image = cv2.imread(path)
                lbl, probabilities = swin_infer(image, model, device)
                all_predicted_probs.append(probabilities)
                all_true_labels.append(class_names.index(cls))  # Store the true class index
                if lbl not in classes:
                    classes[lbl] = 0
                if lbl not in pred_samples_classes:
                    pred_samples_classes[lbl] = 0
                classes[lbl] += 1
                pred_samples_classes[lbl] += 1
                end = time.process_time()
                infer_time = end - start
                time_for_all_image.append(infer_time)

            print(f"Result per class ----- {cls} subfolder ------- {fldr}")
            print("Actual    ------", orig_classes)
            print("Predicted ------", classes)
            print(f"Total samples = {orig_classes[cls]} Predicted Samples = {classes[cls]}")
            conf_matrix, accuracy, precision, recall, f1 = evaluate_classification(
                orig_classes,
                classes,
                class_names,
                np.vstack(all_predicted_probs) if all_predicted_probs else np.empty((0, len(class_names)))
            )
            print("Confusion Matrix:")
            print(conf_matrix)
            print(f"Accuracy: {accuracy:.4f}")
            print(f"Precision: {precision:.4f}")
            print(f"Recall: {recall:.4f}")
            print(f"F1 Score: {f1:.4f}")
            final_confusion_matrix += conf_matrix

# Calculate Mean AUC only once at the end
y_true_bin = label_binarize(all_true_labels, classes=range(len(class_names)))
all_predicted_probs = np.vstack(all_predicted_probs)  # Convert list to array

mean_auc = 0
valid_classes = 0
for i in range(len(class_names)):
    if np.sum(y_true_bin[:, i]) > 0 and np.sum(1 - y_true_bin[:, i]) > 0:  # Ensure both positive and negative samples exist
        mean_auc += roc_auc_score(y_true_bin[:, i], all_predicted_probs[:, i])
        valid_classes += 1

if valid_classes > 0:
    mean_auc /= valid_classes  # Average AUC across valid classes
else:
    print("No valid classes for AUC calculation.")

# Final results
print(" --------------------- Full Result --------------------- ")
print("Total Confusion Matrix:")
print(final_confusion_matrix)
print("Total images:", len(time_for_all_image))
print("Total time taken:", sum(time_for_all_image))
print("Time taken per image:", sum(time_for_all_image) / len(time_for_all_image))
print(f"Final Mean AUC: {mean_auc:.4f}")


## Final Output

In [None]:
from transformers import AutoImageProcessor, AutoModelForImageClassification
import torch
import cv2 
import os
import numpy as np
import pandas as pd

def swin_infer(image, model, device):
    with torch.no_grad():
        inputs = image_processor(image, return_tensors="pt").to(device)
        logits = model(**inputs).logits
        probabilities = torch.softmax(logits, dim=-1)  # Apply softmax to logits
        predicted_label = probabilities.argmax(-1).item()
        label = model.config.id2label[predicted_label]
    return label, probabilities

class_names = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 
               'Foreign Body', 'Lymphangiectasia', 'Normal', 'Polyp', 
               'Ulcer', 'Worms']

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_processor = AutoImageProcessor.from_pretrained("/kaggle/working/SWIN_MEDICAL_6_validation")
model = AutoModelForImageClassification.from_pretrained("/kaggle/working/SWIN_MEDICAL_6_validation")
model.to(device)

folder = "/kaggle/input/misahub-capsule-vision-challenge-2024/Testing set/Images"
images = os.listdir(folder)

# Prepare lists to collect results
logits_results = []

for image_name in images:
    image_path = os.path.join(folder, image_name)
    
    image = cv2.imread(image_path)

    if image is not None:
        label, probabilities = swin_infer(image, model, device)
        probabilities = probabilities.cpu().numpy().flatten()  # Flatten the probabilities

        # Logits data
        logits_results.append([image_name] + probabilities.tolist() + [label])
        
    else:
        print(f"Failed to load image: {image_path}")

# Create DataFrame
logits_df = pd.DataFrame(logits_results, columns=['image_path'] + class_names + ['predicted_class'])

# Save DataFrame to Excel
logits_output_file = "/kaggle/working/SWIN_MEDICAL_6_validation_trained_output.xlsx"
logits_df.to_excel(logits_output_file, index=False)

print(f"Logits saved to {logits_output_file}")
