In [2]:
import pandas as pd
import os
from tqdm import tqdm
import pydicom
from ultralytics import YOLO
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torchvision.transforms as T
import segmentation_models_pytorch as smp
from torch.cuda.amp import autocast
from torch.utils.data import Dataset, DataLoader

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
df = pd.read_excel('/data_vault/hexai02/CarpalTunnel/AI Project Data Clean 10-8.xlsx')

In [4]:
df.head(100)

Unnamed: 0,Number,WristImage Segmentation for AI Analysis,Arm (L/R),Sex,Race,Ethnicity,Age,Clinical Signs of CTS,CTS-6 Score,Measurement at Wrist
0,21,Yes,R,M,White,Non-Hispanic,43.0,Y,12.5,12.16
1,22,Yes,R,M,White,Unreported/Chose not to disclose,74.0,Y,16.5,20.10
2,23,Yes,L,M,White,Unreported/Chose not to disclose,74.0,Y,11.5,13.83
3,24,Yes,R,F,Black,Non-Hispanic,63.0,Y,21.0,16.55
4,25,Yes,R,M,White,Non-Hispanic,49.0,Y,9.0,19.21
...,...,...,...,...,...,...,...,...,...,...
95,163,Yes,R,F,White,Non-Hispanic,74.0,?,,12.78
96,164,Yes,R,F,Black,Non-Hispanic,67.0,N,3.5,7.57
97,165,yes,R,F,White,Non-Hispanic,65.0,N,0.0,16.10
98,166,Yes,L,F,White,Non-Hispanic,51.0,Y,16.5,9.95


In [5]:
df['Clinical Signs of CTS'].unique()

array(['Y', 'N', 'Yes', '?'], dtype=object)

In [6]:
df['Clinical Signs of CTS'] = df['Clinical Signs of CTS'].replace('Yes', 'Y')

In [7]:
df['Clinical Signs of CTS'].unique()

array(['Y', 'N', '?'], dtype=object)

In [7]:
count_question_mark = (df['Clinical Signs of CTS'] == '?').sum()
print(count_question_mark)

1


In [8]:
df = df[df['Clinical Signs of CTS'] != '?']

In [23]:
df['Number'] = df['Number'].astype(str)

In [9]:
restructured_root = 'data_yolo'
image_dir = os.path.join(restructured_root, 'images')
image_train_dir = os.path.join(image_dir, 'train')
image_val_dir = os.path.join(image_dir, 'val')
image_test_dir = os.path.join(image_dir, 'test')
label_dir = os.path.join(restructured_root, 'labels')
label_train_dir = os.path.join(label_dir, 'train')
label_val_dir = os.path.join(label_dir, 'val')

In [10]:
anno_dir = '/data_vault/hexai02/CarpalTunnel/Annotations'
dicom_dir = '/data_vault/hexai02/CarpalTunnel/Images'

In [11]:
def load_mask(mask_path):
    mask = sitk.ReadImage(mask_path)
    return sitk.GetArrayFromImage(mask)[0][: 450, 200: 1300]

def load_dicom(dicom_path):
    dicom_data = pydicom.dcmread(dicom_path)
    return dicom_data.pixel_array[: 450, 200: 1300]

In [None]:
model = YOLO("runs/detect/train17/weights/best.pt")
model.eval()

In [13]:
config = {
    'epochs': 40,
    'in_channels': 1,
    'dropout': 0.2,
    'decoder_attention_type': 'scse',
    'init_lr': 5e-4,
    'weight_decay': 0.05,
    'T_max': 10,
    'eta_min': 3e-5
}

In [14]:
def load_model(model, optimizer=None, scheduler=None, path='./checkpoint.pth'):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint['model_state_dict'])
    if optimizer is not None:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    else:
        optimizer = None
    if scheduler is not None:
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    else:
        scheduler = None
    epoch = checkpoint['epoch']
    return model, optimizer, scheduler, epoch

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

unet_model = smp.Unet(encoder_name="resnet18",in_channels=1,dropout=config['in_channels'],decoder_attention_type=config['decoder_attention_type']).to(device)

# unet_model, _, _, _ = load_model(unet_model, optimizer=None, scheduler=None, path=f'unet_runs/epoch_40.pth')
checkpoint = torch.load('unet_runs/epoch_40.pth', map_location="cuda")
unet_model.load_state_dict(checkpoint, strict=True)
unet_model.eval()

In [16]:
df.columns

Index(['Number', 'WristImage Segmentation for AI Analysis', 'Arm (L/R)', 'Sex',
       'Race', 'Ethnicity', 'Age', 'Clinical Signs of CTS', 'CTS-6 Score',
       'Measurement at Wrist'],
      dtype='object')

In [31]:
train_images = []
train_labels = []
val_images = []
val_labels = []


image_transform = T.Compose([
    T.Resize((512, 512), interpolation=T.InterpolationMode.BILINEAR),  # ✅ Resize first
    T.Normalize(mean=[0.5], std=[0.5]),  # ✅ Normalize after
])

def fill_arr(img_dir, img_bucket, mask_bucket):
    for jpg_name in tqdm(os.listdir(img_dir)): 
        entry_name = jpg_name.split('.')[0]
        img_filename = entry_name + '.dcm'

        # df['Clinical Signs of CTS'].unique()
        clinical_signs = df.loc[df['Number'] == entry_name, 'Clinical Signs of CTS'].values
        if len(clinical_signs) == 0:
            print(f"{entry_name}")
            continue
        classification_tensor = torch.tensor([1 if clinical_signs[0] == 'Y' else 0], dtype=torch.float32)
        train_labels.append(classification_tensor)
        # Load DICOM image
        img = load_dicom(os.path.join(dicom_dir, img_filename))

        # Get bounding box from YOLO
        result = model(img, save=False, verbose=False)
        box = result[0].boxes
        
        if len(box.xyxy.tolist()) == 0:
            continue  


        x1, y1, x2, y2 = map(int, box.xyxy.tolist()[0])
        img_slice = img[y1:y2, x1:x2, 0]  # Crop the region


        img_tensor = torch.from_numpy(img_slice).float()  # Convert to float


        img_tensor = img_tensor / 255.0  


        if img_tensor.dim() == 2:
            img_tensor = img_tensor.unsqueeze(0).unsqueeze(0)  # Convert (H, W) → (1, H, W)


        img_tensor = image_transform(img_tensor)  
        

        img_tensor = img_tensor.to(device)


        with torch.no_grad():
            mask_gpu = unet_model(img_tensor)  
        

        img_resized = img_tensor.squeeze().cpu().detach().numpy()  
        mask = mask_gpu.squeeze().cpu().detach().numpy()  


        img_2channel = np.stack((img_resized, mask), axis=0)
        img_bucket.append(img_2channel)

        

fill_arr(image_train_dir, train_images, train_labels)
fill_arr(image_val_dir, val_images, val_labels)

 91%|███████████████████████████████████████    | 79/87 [00:08<00:00, 11.74it/s]

163


100%|███████████████████████████████████████████| 87/87 [00:09<00:00,  8.78it/s]
100%|███████████████████████████████████████████| 33/33 [00:03<00:00, 10.01it/s]


In [32]:
val_images[0].shape

(2, 512, 512)

In [35]:
import torch
import random
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T

class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images  # List of image arrays
        self.labels = labels  # List of label tensors
        self.transform = transform  # Augmentations
    
    def threshold_mask(self, mask):
        """Binarizes mask: pixel > 0.5 → 1, else 0"""
        return torch.where(mask > 0.5, torch.tensor(1.0), torch.tensor(0.0))
    
    def apply_random_cutout(self, image):
        """Applies a small random cutout (1-5% of image size)"""
        c, h, w = image.shape
        cutout_size = random.randint(int(0.01 * h * w), int(0.05 * h * w))  # 1-5% area
        cut_h = int(np.sqrt(cutout_size))
        cut_w = cut_h

        x = random.randint(0, w - cut_w)
        y = random.randint(0, h - cut_h)

        image[:, y:y+cut_h, x:x+cut_w] = 0  # Black-out region
        return image
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]  # Still an array
        label = self.labels[idx]  # Already a tensor

        # Convert image to tensor if it's not already
        if not isinstance(image, torch.Tensor):
            image = torch.tensor(image, dtype=torch.float32)

        # Ensure correct shape (C, H, W)
        if len(image.shape) == 2:
            image = image.unsqueeze(0)  # Convert grayscale image to (1, H, W)

        # Apply mask thresholding (only to the 2nd channel)
        image[1] = self.threshold_mask(image[1])

        # Apply random cutout
        # image = self.apply_random_cutout(image)

        # Apply optional transforms (if provided)
        # if self.transform:
        #     image = self.transform(image)

        return image, label  # Label remains a tensor

# Augmentations for classification
transform = T.Compose([
    T.RandomHorizontalFlip(p=0.5),
    T.RandomRotation(degrees=10),
    T.ColorJitter(brightness=0.2, contrast=0.2),
    T.GaussianBlur(kernel_size=3),
])

# Creating Dataset
train_dataset = CustomDataset(train_images, train_labels, transform=transform)
val_dataset = CustomDataset(val_images, val_labels, transform=None)  # No augmentation for validation

# Creating DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=2)



In [36]:
for image, label in train_loader:
    print(image.shape, label.shape)
    break

torch.Size([8, 2, 512, 512]) torch.Size([8, 1])


In [None]:
import torchvision.models as models
from torchsummary import summary

summary(models.convnext_tiny(input_channels=2, num_classes=2), (3,224,224), device="cpu") # 96
# summary(models.resnet34(), (3,224,224), device="cpu") # 64
# summary(models.mobilenet_v3_small(), (3,224,224), device="cpu") # 64

### MobileNetV2

In [82]:
class MobileNetV2Block(nn.Module):
    def __init__(self, in_channels, out_channels, expansion_factor=6, stride=1, **kwargs):
        super(MobileNetV2Block, self).__init__()
        self.expansion_factor = expansion_factor
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.stride = stride
        self.expansion_channels = in_channels * expansion_factor
        
        self.c1 = nn.Conv2d(
            in_channels=in_channels,
            out_channels=self.expansion_channels,
            kernel_size=1,
            bias=False
        )

        self.act1 = nn.ReLU6()

        self.bn1 = nn.BatchNorm2d(self.expansion_channels)

        self.d1 = nn.Conv2d(
            in_channels=self.expansion_channels,
            out_channels=self.expansion_channels,
            kernel_size=3,
            stride=self.stride,
            padding=1,
            groups=self.expansion_channels,
            bias=False
        )

        self.act2 = nn.ReLU6()

        self.bn2 = nn.BatchNorm2d(self.expansion_channels)

        self.c2= nn.Conv2d(
            in_channels=self.expansion_channels,
            out_channels=out_channels,
            kernel_size=1,
            bias=False
        )

        self.bn3 = nn.BatchNorm2d(self.out_channels)

        self.residual = (self.stride == 1 and self.in_channels == self.out_channels)

    def forward(self, x):
        out = x
        x = self.c1(x)
        x = self.act1(x)
        x = self.bn1(x)
        x = self.d1(x)
        x = self.act2(x)
        x = self.bn2(x)
        x = self.c2(x)
        x = self.bn3(x)

        if self.residual:
            x += out

        return x                

In [83]:
class MobileNetV2Layer(nn.Module):
    def __init__(self, in_channels, out_channels, n, **kwargs):
        super(MobileNetV2Layer, self).__init__()
        layers = [MobileNetV2Block(in_channels, out_channels, **kwargs)] + [MobileNetV2Block(out_channels, out_channels, **kwargs) for _ in range(n-1)]

        self.layer = nn.Sequential(*layers)

    def forward(self, x):
        x = self.layer(x)
        return x        

In [84]:
class MobileNetV2(nn.Module):
    def __init__(self, in_channels=3, 
                 classes=1000, 
                 expansion_factor_list = [-1,1,6,6,6,6,6,6], 
                 out_channel_list = [32,16,24,32,64,96,160,320,1280], 
                 layer_list = [1,1,2,3,4,3,3,1], 
                 stride_list = [2,1,2,2,2,1,2,1]):
        
        super(MobileNetV2, self).__init__()
        
        self.c1 = nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channel_list[0],
            kernel_size=3,
            stride=stride_list[0],
            padding=1,
            bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channel_list[0])
        self.act1 = nn.ReLU6()
        
        layers = []
        for i in range(1, len(expansion_factor_list)):
            expansion_factor = expansion_factor_list[i]
            in_channels = out_channel_list[i-1]
            out_channels = out_channel_list[i]
            n = layer_list[i]
            stride = stride_list[i]

            layers.append(MobileNetV2Layer(in_channels, 
                                           out_channels, 
                                           n,
                                           expansion_factor=expansion_factor,
                                           stride=stride))

        self.model = nn.Sequential(*layers)

        self.c2 = nn.Conv2d(
            in_channels=out_channel_list[-2],
            out_channels=out_channel_list[-1],
            kernel_size=1,
            stride=1,
            bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channel_list[-1])
        self.act2 = nn.ReLU6()

        self.gap = nn.AdaptiveAvgPool2d(1)

        # self.c3 = nn.Conv2d(
        #     in_channels=out_channel_list[-1],
        #     out_channels=classes,
        #     kernel_size=1,
        #     bias=False
        # )

    def forward(self, x):
        x = self.c1(x)
        x = self.act1(x)
        x = self.bn1(x)

        x = self.model(x)

        x = self.c2(x)
        x = self.act2(x)
        x = self.bn2(x)

        x = self.gap(x)

        # x = self.c3(x)
        return x
        

In [None]:
summary(MobileNetV2(),(3,224,224),device="cpu")

### ConvNext

In [77]:
class myLayerNorm(nn.Module):
  def __init__(self, dims, eps=1e-6):
    super().__init__()
    self.norm = nn.LayerNorm(dims, eps)

  def forward(self, x):
    x = x.permute(0, 2, 3, 1)
    x = self.norm(x)
    x = x.permute(0, 3, 1, 2)

    return x

In [78]:
class ConvNeXtBlock2D(nn.Module):
  def __init__(self, dim, layer_scale_init_value=1e-6, drop=0.2):
    super().__init__()

    self.dwconv = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim)  # depthwise conv

    self.norm = nn.LayerNorm(dim)

    self.pwconv1 = nn.Linear(dim, 4 * dim)
    self.act = nn.GELU()
    self.pwconv2 = nn.Linear(4 * dim, dim)

    self.dropout = nn.Dropout2d(p=drop)

    self.gamma = nn.Parameter(layer_scale_init_value * torch.ones((dim,)), requires_grad=True) if layer_scale_init_value > 0 else None


  def forward(self, x):
    residual = x
    x = self.dwconv(x)

    # Transpose for LayerNorm
    x = x.permute(0, 2, 3, 1)
    x = self.norm(x)

    x = self.pwconv1(x)
    x = self.act(x)
    x = self.pwconv2(x)

    x = self.dropout(x)

    if self.gamma is not None:
        x = self.gamma * x

    # Transpose back to (B, C, H, W)
    x = x.permute(0, 3, 1, 2)
    # no drop path y
    return residual + x


In [None]:
summary(ConvNeXtBlock2D(96), (96,112,112), device="cpu")

In [80]:
import torch
import torch.nn as nn

class ConvNext(nn.Module):
    def __init__(self, in_chans=1, dims=[32, 64, 128, 256], stages=[1, 1, 3, 1]):
        super().__init__()

        self.in_chans = in_chans
        self.dims = dims
        self.stages = stages

        self.downsample_layers = nn.ModuleList()  # stem and 3 intermediate downsampling conv layers
        stem = nn.Sequential(
            nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4),
            myLayerNorm(dims[0], eps=1e-6)
        )
        self.downsample_layers.append(stem)
        for i in range(3):
            downsample_layer = nn.Sequential(
                myLayerNorm(dims[i], eps=1e-6),
                nn.Conv2d(dims[i], dims[i + 1], kernel_size=2, stride=2),
            )
            self.downsample_layers.append(downsample_layer)

        self.model_layers = nn.ModuleList()
        for i, stage_length in enumerate(stages):
            stage = nn.ModuleList([ConvNeXtBlock2D(dims[i]) for _ in range(stage_length)])
            self.model_layers.append(stage)

        self.pooling = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten = nn.Flatten()
        self.final_norm = nn.LayerNorm(dims[-1])

    def forward(self, x):
        for i in range(len(self.dims)):
            x = self.downsample_layers[i](x)
            for layer in self.model_layers[i]:
                x = layer(x)

        x = self.pooling(x)
        x = self.flatten(x)
        x = self.final_norm(x)  # Final normalization
        return x

In [None]:
summary(ConvNext(in_chans=2,dims=[96,192,384,768],stages=[1,1,3,1]), (2,512,512), device="cpu")

In [88]:
class ClassificationTail(nn.Module):
    def __init__(self, in_features, num_classes=2, dropout_rate=0.5):
        super(ClassificationTail, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(in_features, in_features // 2),
            nn.BatchNorm1d(in_features // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),

            nn.Linear(in_features // 2, in_features // 4),
            nn.BatchNorm1d(in_features // 4),
            nn.ReLU(),
            nn.Dropout(dropout_rate),

            nn.Linear(in_features // 4, num_classes)
        )

    def forward(self, x):
        return self.fc(x)

In [86]:
class FullModel(nn.Module):
    def __init__(self, backbone, tail):
        super(FullModel, self).__init__()
        self.backbone = backbone
        self.tail = tail

    def forward(self, x):
        x = self.backbone(x)
        x = self.tail(x)
        return x

In [None]:
summary(FullModel(ConvNext(in_chans=2,dims=[96,192,384,768],stages=[1,1,3,1]), 
                  ClassificationTail(768,num_classes=2,dropout_rate=0.2)), (2,512,512),device="cpu")