In [1]:
# import ds from kaggle
import kagglehub

# Download latest version
path = kagglehub.dataset_download("pkdarabi/bone-break-classification-image-dataset")

print("Path to dataset files:", path)

  from .autonotebook import tqdm as notebook_tqdm


Downloading from https://www.kaggle.com/api/v1/datasets/download/pkdarabi/bone-break-classification-image-dataset?dataset_version_number=4...


100%|██████████| 28.4M/28.4M [00:02<00:00, 12.2MB/s]

Extracting files...





Path to dataset files: C:\Users\toshi\.cache\kagglehub\datasets\pkdarabi\bone-break-classification-image-dataset\versions\4


In [47]:
# imports

import os
import shutil

import matplotlib.pyplot as plt

from PIL import Image

import timm
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import random_split, DataLoader, Dataset

In [9]:
# print(os.listdir(path))

local_folder = "./dataset2"

# os.mkdir(local_folder)

# shutil.copytree(path, local_folder, dirs_exist_ok=True)

# rename folder name
old_name =  './dataset2/fractures/Bone Break CLassification'
new_name = './dataset2/fractures2'

# os.rename(old_name, new_name)

folder_name = "./dataset2/fractures2"
train_target = "./dataset2/train"
test_target = "./dataset2/test"

In [10]:
# Go through each class
for class_name in os.listdir(folder_name):
    class_path = os.path.join(folder_name, class_name)
    if not os.path.isdir(class_path):
        continue

    # Define source train/test subfolders
    src_train = os.path.join(class_path, "Train")
    src_test = os.path.join(class_path, "Test")

    # Define target train/test class folders
    tgt_train_class = os.path.join(train_target, class_name)
    tgt_test_class = os.path.join(test_target, class_name)
    os.makedirs(tgt_train_class, exist_ok=True)
    os.makedirs(tgt_test_class, exist_ok=True)

    # Copy training images
    if os.path.exists(src_train):
        for file in os.listdir(src_train):
            src_file = os.path.join(src_train, file)
            dst_file = os.path.join(tgt_train_class, file)
            if os.path.isfile(src_file):
                shutil.copy(src_file, dst_file)

    # Copy testing images
    if os.path.exists(src_test):
        for file in os.listdir(src_test):
            src_file = os.path.join(src_test, file)
            dst_file = os.path.join(tgt_test_class, file)
            if os.path.isfile(src_file):
                shutil.copy(src_file, dst_file)

In [31]:
# Define transforms for DeiT (224x224 input size)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Set paths
train_dir = r"C:\SSY340_DML\bone_fracture_detection\dataset2\train"
test_dir = r"C:\SSY340_DML\bone_fracture_detection\dataset2\test"


class FractureDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.class_to_idx = {}

        # Assign numeric labels to each folder
        for idx, class_name in enumerate(sorted(os.listdir(root_dir))):
            class_path = os.path.join(root_dir, class_name)
            if os.path.isdir(class_path):
                self.class_to_idx[class_name] = idx
                for img_file in os.listdir(class_path):
                    if img_file.lower().endswith(('.jpg', '.png', '.jpeg')):
                        self.image_paths.append(os.path.join(class_path, img_file))
                        self.labels.append(idx)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Create data loaders
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

train_dataset = FractureDataset(train_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = FractureDataset(test_dir, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [37]:
# Visualize 6 random images from train_data
total_size = 989
train_size = int(0.8 * total_size)  
val_size = total_size - train_size  

train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)

In [46]:
def output_to_label(z):
    c = torch.argmax(z, dim=1)
    return c

def training_loop(model, optimizer, loss_fn, train_loader, val_loader, num_epochs, print_every):
    print("Starting training")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    train_losses, train_accs, val_losses, val_accs = [], [], [], []

    for epoch in range(1, num_epochs + 1):
        
        # optimizer = optim.AdamW(
        #         filter(lambda p: p.requires_grad, model.parameters()),
        #         lr=lr,
        #         weight_decay=0.05
        #     )
        
        if epoch == 15:
            print("Unfreezing last 2 DeiT blocks for fine-tuning...")
            for name, param in model.base.blocks[-3:].named_parameters():
                param.requires_grad = True
            for name, param in model.base.norm.named_parameters():
                param.requires_grad = True
            optimizer = optim.AdamW(
                filter(lambda p: p.requires_grad, model.parameters()),
                lr=7e-6,
                weight_decay=0.05
            )


        if epoch == 30:
            print("Unfreezing last 6 DeiT blocks for fine-tuning...")
            for name, param in model.base.blocks[-7:].named_parameters():
                param.requires_grad = True
            for name, param in model.base.norm.named_parameters():
                param.requires_grad = True
            optimizer = optim.AdamW(
                filter(lambda p: p.requires_grad, model.parameters()),
                lr=5e-6,
                weight_decay=0.05
            )

        # if epoch == 35:
        #     print("Unfreezing all DeiT blocks for fine-tuning...")
        #     for name, param in model.base.blocks.named_parameters():
        #         param.requires_grad = True
        #     for name, param in model.base.norm.named_parameters():
        #         param.requires_grad = True
        #     optimizer = optim.AdamW(
        #         filter(lambda p: p.requires_grad, model.parameters()),
        #         lr=7e-7,
        #         weight_decay=0.05
        #     )

        model, train_loss, train_acc = train_epoch(
            model, optimizer, loss_fn, train_loader, val_loader, device, print_every
        )
        val_loss, val_acc = validate(model, loss_fn, val_loader, device)
        print(
            f"Epoch {epoch}/{num_epochs}: "
            f"Train loss: {sum(train_loss)/len(train_loss):.3f}, "
            f"Train acc.: {sum(train_acc)/len(train_acc):.3f}, "
            f"Val. loss: {val_loss:.3f}, "
            f"Val. acc.: {val_acc:.3f}, "
            # f"LR: {lr:.3f}, "
        )
        train_losses.extend(train_loss)
        train_accs.extend(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
    return model, train_losses, train_accs, val_losses, val_accs


def train_epoch(model, optimizer, loss_fn, train_loader, val_loader, device, print_every):
    # Train:
    model.train()
    train_loss_batches, train_acc_batches = [], []
    num_batches = len(train_loader)
    for batch_index, (x, y) in enumerate(train_loader, 1):
        inputs, labels = x.to(device), y.to(device)
        optimizer.zero_grad()

        # with torch.no_grad():
        #     x = model.blocks[:-1](inputs)

        # x = model.blocks[-1:](x)
        # outputs = model.forward(inputs)
        z = model.forward(inputs)
        # print("Outputs:", z.shape)
        # print("Targets:", labels.shape)
        labels.long()
        loss = loss_fn(z, labels)
        loss.backward()
        optimizer.step()
        train_loss_batches.append(loss.item())

        hard_preds = output_to_label(z)
        acc_batch_avg = (hard_preds == labels).float().mean().item()
        train_acc_batches.append(acc_batch_avg)

        # If you want to print your progress more often than every epoch you can
        # set `print_every` to the number of batches you want between every status update.
        # Note that the print out will trigger a full validation on the full val. set => slows down training
        if print_every is not None and batch_index % print_every == 0:
            val_loss, val_acc = validate(model, loss_fn, val_loader, device)
            model.train()
            print(
                f"\tBatch {batch_index}/{num_batches}: "
                f"\tTrain loss: {sum(train_loss_batches[-print_every:])/print_every:.3f}, "
                f"\tTrain acc.: {sum(train_acc_batches[-print_every:])/print_every:.3f}, "
                f"\tVal. loss: {val_loss:.3f}, "
                f"\tVal. acc.: {val_acc:.3f}"
            )

    return model, train_loss_batches, train_acc_batches


def validate(model, loss_fn, val_loader, device):
    val_loss_cum = 0
    val_acc_cum = 0
    model.eval()
    with torch.no_grad():
        for batch_index, (x, y) in enumerate(val_loader, 1):
            inputs, labels = x.to(device), y.to(device)
            z = model.forward(inputs)

            labels.long()
            batch_loss = loss_fn(z, labels)
            val_loss_cum += batch_loss.item()
            hard_preds = output_to_label(z)
            acc_batch_avg = (hard_preds == labels).float().mean().item()
            val_acc_cum += acc_batch_avg
    return val_loss_cum / len(val_loader), val_acc_cum / len(val_loader)

In [None]:
model = timm.create_model('deit_base_patch16_224', pretrained=True)

model.head = nn.Linear(model.head.in_features, 10)

25


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class_counts = 10
total = sum(class_counts)
class_weights = [total / c for c in class_counts]  # inverse frequency

loss = nn.CrossEntropyLoss(weight=class_weights.to(device))

lr = 8e-6
optimizer = optim.AdamW(model.parameters(), lr=8e-6,weight_decay=0.05)
num_epochs = 40
print_every = None

model, train_losses, train_accs, val_losses, val_accs = training_loop(model, optimizer, loss, train_loader, val_loader, num_epochs, print_every)