In [5]:
import pandas as pd
import os
data=pd.read_csv("Data_Entry_2017.csv")

In [25]:
data

Unnamed: 0,Image Index,Finding Labels,Follow-up #,Patient ID,Patient Age,Patient Gender,View Position,OriginalImage[Width,Height],OriginalImagePixelSpacing[x,y],Unnamed: 11,new labels
0,00000001_000.png,Cardiomegaly,0,1,58,M,PA,2682,2749,0.143,0.143,,abnormalities
1,00000001_001.png,Cardiomegaly|Emphysema,1,1,58,M,PA,2894,2729,0.143,0.143,,abnormalities
2,00000001_002.png,Cardiomegaly|Effusion,2,1,58,M,PA,2500,2048,0.168,0.168,,abnormalities
3,00000002_000.png,No Finding,0,2,81,M,PA,2500,2048,0.171,0.171,,Normal
4,00000003_000.png,Hernia,0,3,81,F,PA,2582,2991,0.143,0.143,,abnormalities
...,...,...,...,...,...,...,...,...,...,...,...,...,...
112115,00030801_001.png,Mass|Pneumonia,1,30801,39,M,PA,2048,2500,0.168,0.168,,Pneumonia
112116,00030802_000.png,No Finding,0,30802,29,M,PA,2048,2500,0.168,0.168,,Normal
112117,00030803_000.png,No Finding,0,30803,42,F,PA,2048,2500,0.168,0.168,,Normal
112118,00030804_000.png,No Finding,0,30804,30,F,PA,2048,2500,0.168,0.168,,Normal


In [6]:
data['Finding Labels']

0                   Cardiomegaly
1         Cardiomegaly|Emphysema
2          Cardiomegaly|Effusion
3                     No Finding
4                         Hernia
                   ...          
112115            Mass|Pneumonia
112116                No Finding
112117                No Finding
112118                No Finding
112119                No Finding
Name: Finding Labels, Length: 112120, dtype: object

In [8]:
available_files=set(os.listdir("images"))

In [12]:
def convert_labels(data):
    labels=str(data).split("|")
    
    if "Pneumonia" in labels:
        return "Pneumonia"
    elif "No Finding" in labels:
        return "Normal"
    else:
        return "abnormalities"

In [15]:
data["new labels"]=data['Finding Labels'].apply(convert_labels)

In [16]:
data["new labels"]

0         abnormalities
1         abnormalities
2         abnormalities
3                Normal
4         abnormalities
              ...      
112115        Pneumonia
112116           Normal
112117           Normal
112118           Normal
112119           Normal
Name: new labels, Length: 112120, dtype: object

In [32]:
((data["new labels"] == "Normal") & (data["Image Index"].isin(available_files))).sum()


2754

In [33]:
import os
import shutil

In [35]:
image_folder="images"
for _,row in data.iterrows():
    filename=row["Image Index"]
    label=row["new labels"]
    
    src_path=os.path.join(image_folder,filename)
    dst_folder=os.path.join(image_folder,label)
    os.makedirs(dst_folder,exist_ok=True)
    dst_path=os.path.join(dst_folder,filename)
    if os.path.exists(src_path):
        shutil.move(src_path,dst_path)

In [1]:
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import DataLoader
import torch

In [2]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),       
    transforms.RandomVerticalFlip(p=0.2),         
    transforms.RandomRotation(degrees=15),      
    transforms.RandomAffine(
        degrees=0,                           
        scale=(0.9, 1.1),                    
        translate=(0.1, 0.1),                      
        shear=5                               
    ),
    transforms.ToTensor(),
])

In [3]:
dataset=ImageFolder(root="images",transform=None)
len(dataset)

6775

In [4]:
n_total = len(dataset)
n_train = int(0.7 * n_total)
n_val = int(0.15 * n_total)
n_test = n_total - n_val - n_train
train_indices, val_indices, test_indices = torch.utils.data.random_split(
    range(n_total), [n_train, n_val, n_test]
)

# apply augmentation for train_dataset
train_transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.2),
        transforms.RandomRotation(degrees=15),
        transforms.RandomAffine(
            degrees=0, scale=(0.9, 1.1), translate=(0.1, 0.1), shear=5
        ),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
    ]
)
# test


In [5]:
import torch
class TransformSubset(torch.utils.data.Dataset):
    def __init__(self,subset,transform):
        self.subset=subset
        self.transform=transform

    def __getitem__(self,index):
        image,label=self.subset[index]
        if self.transform:
            image=self.transform(image)
        return image,label
    
    def __len__(self):
        return len(self.subset)

In [6]:
train_dataset=TransformSubset(torch.utils.data.Subset(dataset,train_indices),train_transform)

In [41]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange


class VisionConfig:
    def __init__(self, in_channels, layers, out_channels, heads, n_embd, tokens,classes):
        self.in_channels = in_channels
        self.layers = layers
        self.out_channels = out_channels
        self.heads = heads
        self.n_embd = n_embd
        self.tokens = tokens
        self.classes=classes

class AttentionPooling_2d(nn.Module):
    def __init__(
        self, tokens: int, n_embd: int, num_heads: int, output_dim: int = None
    ):
        super().__init__()
        self.positional_embdding = nn.Parameter(
            torch.randn(tokens + 1, n_embd) / n_embd**0.5
        )
        self.k_proj = nn.Linear(n_embd, n_embd)
        self.q_proj = nn.Linear(n_embd, n_embd)
        self.v_proj = nn.Linear(n_embd, n_embd)
        self.c_proj = nn.Linear(n_embd, output_dim or n_embd)
        self.num_heads = num_heads

    def forward(self, x):
        x = rearrange(x, "b c h w -> (h w) b c")
        x = torch.cat([x.mean(dim=0, keepdim=True), x], dim=0)
        x = x + self.positional_embdding[:, None, :]
        x, _ = F.multi_head_attention_forward(
            query=x[:1],
            key=x,
            value=x,
            embed_dim_to_check=x.shape[-1],
            num_heads=self.num_heads,
            q_proj_weight=self.q_proj.weight,
            k_proj_weight=self.k_proj.weight,
            v_proj_weight=self.v_proj.weight,
            in_proj_weight=None,
            in_proj_bias=torch.cat(
                [self.q_proj.bias, self.k_proj.bias, self.v_proj.bias]
            ),
            bias_k=None,
            bias_v=None,
            add_zero_attn=False,
            dropout_p=0,
            out_proj_weight=self.c_proj.weight,
            out_proj_bias=self.c_proj.bias,
            use_separate_proj_weight=True,
            training=self.training,
            need_weights=False,
        )
        return x.squeeze(0)


class Conv3x3(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super().__init__()
        self.conv2d_1 = nn.Conv2d(
            in_channels, out_channels, kernel_size=3, stride=stride,padding=1,bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2d_2 = nn.Conv2d(
            out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.stride = stride
        if stride > 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(
                    in_channels, out_channels, stride=stride, kernel_size=1, bias=False
                ),
                nn.BatchNorm2d(out_channels),
            )
        else:
            self.downsample = None

        self.acv1 = nn.ReLU()
        self.acv2 = nn.ReLU()

    def forward(self, x):
        identity = x
        x = self.conv2d_1(x)
        x = self.bn1(x)
        x = self.acv1(x)
        x = self.conv2d_2(x)
        x = self.bn2(x)
        if self.downsample is not None:
            identity = self.downsample(identity)
        x = identity + x
        return self.acv2(x)


class Block2d(nn.Module):
    def __init__(self, in_channels, out_channels, stride=2):
        super().__init__()
        self.conv2d_1 = Conv3x3(in_channels, out_channels, stride=stride)
        self.conv2d_2 = Conv3x3(out_channels, out_channels, stride=1)

    def forward(self, x):
        x = self.conv2d_1(x)
        x = self.conv2d_2(x)
        return x


class ResNet2d_18(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels=config.in_channels,
            out_channels=config.layers[0],
            kernel_size=7,
            stride=1,
            padding=3,
        )
        self.layer1 = Block2d(
            in_channels=config.layers[0],
            out_channels=config.layers[0],
            stride=2,
        )
        self.layer2 = Block2d(
            in_channels=config.layers[0],
            out_channels=config.layers[1],
            stride=2,
        )
        self.layer3 = Block2d(
            in_channels=config.layers[1],
            out_channels=config.layers[2],
            stride=2,
        )
        self.layer4 = Block2d(
            in_channels=config.layers[2],
            out_channels=config.layers[3],
            stride=2,
        )
        self.bn1 = nn.BatchNorm2d(config.layers[0])
        self.acv_fn = nn.ReLU()
        self.globalPooling = AttentionPooling_2d(
            tokens=config.tokens,
            n_embd=config.n_embd,
            num_heads=config.heads,
        )

        self.config = config
        self.to_out=nn.Linear(self.config.out_channels,self.config.classes)
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Conv2d):
            nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu")
        elif isinstance(module, nn.BatchNorm2d):
            nn.init.constant_(module.weight, 1)
            nn.init.constant_(module.bias, 0)
        elif isinstance(module, AttentionPooling_2d):
            std = self.config.in_channels**-0.5
            nn.init.normal_(module.q_proj.weight, std=std)
            nn.init.normal_(module.k_proj.weight, std=std)
            nn.init.normal_(module.v_proj.weight, std=std)
            nn.init.normal_(module.c_proj.weight, std=std)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.acv_fn(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.globalPooling(x)
        output=self.to_out(x)
        return output


In [42]:
config=VisionConfig(in_channels=1,layers=[64, 128, 256, 512],out_channels=512,heads=8,n_embd=512,tokens=196,classes=3)

In [43]:
model=ResNet2d_18(config)

In [44]:
x = torch.randn(5, config.in_channels, 224, 224)


In [45]:
output=model(x)
output

tensor([[  236.2921,    19.2939,   -42.5735],
        [   55.0410,  -418.7348,  -168.3798],
        [  -71.9583, -1081.2875,    -4.6140],
        [  129.8367,  -258.3860,   150.4920],
        [  -10.1414,  -211.5168,  -401.0446]], grad_fn=<AddmmBackward0>)

In [57]:
out=F.softmax(output,dim=1)
labels=torch.tensor([[0],[1],[0],[2],[2]])
labels=labels.t()


In [58]:
torch.sum(torch.argmax(out,dim=1)==labels)

tensor(2)

In [59]:
labels.shape

torch.Size([1, 5])