# BoneawareAI

By: Karthik Subramanian, Charles Green, Sai Anurag Pichika, Saarang Prabhuram


## Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install pyyaml==5.4.1
!pip install boto3
!pip install configparser
!pip install torch

Collecting pyyaml==5.4.1
  Using cached PyYAML-5.4.1.tar.gz (175 kB)
  Installing build dependencies ... [?25l[?25hdone
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mGetting requirements to build wheel[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m See above for output.
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
  Getting requirements to build wheel ... [?25l[?25herror
[1;31merror[0m: [1msubprocess-exited-with-error[0m

[31m×[0m [32mGetting requirements to build wheel[0m did not run successfully.
[31m│[0m exit code: [1;36m1[0m
[31m╰─>[0m See above for output.

[1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.


In [None]:
import os
PROJECT_PATH = 'BoneawareAI'
GOOGLE_DRIVE_PATH = f'/content/drive/MyDrive/{PROJECT_PATH}'
os.chdir(GOOGLE_DRIVE_PATH)
os.getcwd()

'/content/drive/MyDrive/BoneawareAI'

In [None]:
# make sure you run this cell so that Boneaware src path is recognized
import sys
sys.path.append(GOOGLE_DRIVE_PATH) # this is important for the imports in the .py files to work
sys.path.append(os.path.join(GOOGLE_DRIVE_PATH, 'src'))


## Data Preprocessing
Get the dataset, perform data augmentation to get finalized MURA dataset

In [None]:
# # Downloading MURA dataset and unzipping the file (this one takes time)
# from src.data_loader import download_dataset
# from src.constants import DATASETS_FOLDER, MURA_DATASET
# from src.helpers.utils import unzip_file
# download_dataset(MURA_DATASET, DATASETS_FOLDER)
# unzip_file(os.path.join(os.getcwd(), DATASETS_FOLDER, MURA_DATASET))

File downloaded successfully to datasets/MURA-v1.1.zip
successfully unzipped the file at path /content/drive/MyDrive/BoneawareAI/datasets/MURA-v1.1.zip


In [None]:
import os
import torch
from torch import nn, optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score


In [None]:
# # Data augmentation
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

valid_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# # Datasets
# train_dataset = datasets.ImageFolder('/content/drive/MyDrive/BoneawareAI/datasets/MURA-v1.1/train', transform=train_transforms)
# valid_dataset = datasets.ImageFolder('/content/drive/MyDrive/BoneawareAI/datasets/MURA-v1.1/valid', transform=valid_transforms)

# # Data loaders
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)



import os
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset

class MURABinaryDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Parse study folders and assign labels
        for _, row in self.data.iterrows():
            study_path = '/content/drive/MyDrive/BoneawareAI/datasets/'+row['path']
            label = row['label']
            # Collect all image paths in the study
            for image_file in os.listdir(study_path):
                if image_file.endswith('.png'):  # Adjust for your dataset's image format
                    self.image_paths.append(os.path.join(study_path, image_file))
                    self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')  # Convert to 3-channel image

        if self.transform:
            image = self.transform(image)

        return image, label

# Example Usage
train_dataset = MURABinaryDataset('/content/drive/MyDrive/BoneawareAI/datasets/MURA-v1.1/train_labeled_studies.csv', transform=train_transforms)
valid_dataset = MURABinaryDataset('/content/drive/MyDrive/BoneawareAI/datasets/MURA-v1.1/valid_labeled_studies.csv', transform=valid_transforms)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)



In [None]:
from torchvision import models
import torch.nn as nn

# Load a pretrained DenseNet
model = models.densenet121(pretrained=True)

# Modify the classifier for binary classification
num_features = model.classifier.in_features
model.classifier = nn.Sequential(
    nn.Linear(num_features, 1),  # Binary classification (output is 1 unit)
    nn.Sigmoid()  # Apply sigmoid activation
)

# Send model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [None]:
# Loss function
criterion = nn.BCELoss()  # Binary Cross Entropy Loss

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
def train_model(model, criterion, optimizer, num_epochs=25):
    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print('-' * 10)

        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()
                loader = train_loader
            else:
                model.eval()
                loader = valid_loader

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in loader:
                inputs, labels = inputs.to(device), labels.float().to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    outputs = outputs.squeeze()  # Flatten outputs to match labels
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Convert predictions to binary (0 or 1)
                preds = (outputs > 0.5).float()
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels)

            epoch_loss = running_loss / len(loader.dataset)
            epoch_acc = running_corrects.double() / len(loader.dataset)

            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

    return model

# Train the model
model = train_model(model, criterion, optimizer, num_epochs=10)


Epoch 1/10
----------


UnidentifiedImageError: cannot identify image file '/content/drive/MyDrive/BoneawareAI/datasets/MURA-v1.1/train/XR_WRIST/patient07840/study1_negative/._image1.png'

In [None]:
# Save the model
torch.save(model.state_dict(), 'densenet_mura.pth')

# Load the model
model.load_state_dict(torch.load('densenet_mura.pth'))
model.eval()


In [None]:
from sklearn.metrics import classification_report, roc_auc_score

def evaluate_model(model, loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.float().to(device)
            outputs = model(inputs).squeeze()
            preds = (outputs > 0.5).float()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print("Classification Report:")
    print(classification_report(all_labels, all_preds))
    print(f"AUC-ROC: {roc_auc_score(all_labels, all_preds):.4f}")

# Evaluate on validation set
evaluate_model(model, valid_loader)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DenseLayer(nn.Module):
    def __init__(self, in_channels, growth_rate):
        super(DenseLayer, self).__init__()
        self.bn = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv = nn.Conv2d(in_channels, growth_rate, kernel_size=3, padding=1, bias=False)

    def forward(self, x):
        out = self.conv(self.relu(self.bn(x)))
        return torch.cat([x, out], dim=1)


class DenseBlock(nn.Module):
    def __init__(self, num_layers, in_channels, growth_rate):
        super(DenseBlock, self).__init__()
        self.layers = nn.ModuleList([
            DenseLayer(in_channels + i * growth_rate, growth_rate) for i in range(num_layers)
        ])

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x


class TransitionLayer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(TransitionLayer, self).__init__()
        self.bn = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        x = self.conv(self.relu(self.bn(x)))
        return self.pool(x)


class DenseNet(nn.Module):
    def __init__(self, num_blocks, num_layers_per_block, growth_rate, num_classes):
        super(DenseNet, self).__init__()
        self.growth_rate = growth_rate
        initial_channels = 2 * growth_rate

        # Initial Convolution
        self.conv1 = nn.Conv2d(3, initial_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(initial_channels)
        self.relu = nn.ReLU(inplace=True)
        self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Dense Blocks and Transition Layers
        self.blocks = nn.ModuleList()
        self.transitions = nn.ModuleList()

        in_channels = initial_channels
        for i in range(num_blocks):
            self.blocks.append(DenseBlock(num_layers_per_block[i], in_channels, growth_rate))
            in_channels += num_layers_per_block[i] * growth_rate

            if i != num_blocks - 1:  # No transition layer after the last dense block
                out_channels = in_channels // 2
                self.transitions.append(TransitionLayer(in_channels, out_channels))
                in_channels = out_channels

        # Final Batch Norm and Classification Layer
        self.bn2 = nn.BatchNorm2d(in_channels)
        self.fc = nn.Linear(in_channels, num_classes)

    def forward(self, x):
        x = self.pool1(self.relu(self.bn1(self.conv1(x))))

        for i, block in enumerate(self.blocks):
            x = block(x)
            if i < len(self.transitions):
                x = self.transitions[i](x)

        x = F.adaptive_avg_pool2d(self.bn2(x), (1, 1))
        x = torch.flatten(x, 1)
        return self.fc(x)