In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from DEEPKNN_RESNET50 import Bottleneck, ResNet, ResNet50

from transform_data import trans_data

from deepknn_loss_func import DeepKNNLoss
#from RESNET50 import Bottleneck, ResNet, ResNet50

In [2]:
import torch
import torch.nn as  nn
import torch.nn.functional as F


class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)
        
        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion)
        
        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()
        
    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))
        
        x = self.relu(self.batch_norm2(self.conv2(x)))
        
        x = self.conv3(x)
        x = self.batch_norm3(x)
        
        #downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        #add identity
        x+=identity
        x=self.relu(x)
        
        return x

class Block(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Block, self).__init__()
       

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
      identity = x.clone()

      x = self.relu(self.batch_norm2(self.conv1(x)))
      x = self.batch_norm2(self.conv2(x))

      if self.i_downsample is not None:
          identity = self.i_downsample(identity)
      print(x.shape)
      print(identity.shape)
      x += identity
      x = self.relu(x)
      return x


        
        
class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64
        
        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*ResBlock.expansion, num_classes)
        
    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        
        return x
        
    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []
        
        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes*ResBlock.expansion)
            )
            
        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion
        
        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))
            
        return nn.Sequential(*layers)

        
        
def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)
    
def ResNet101(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,23,3], num_classes, channels)

def ResNet152(num_classes, channels=3):
    return ResNet(Bottleneck, [3,8,36,3], num_classes, channels)


In [3]:
EPOC=30
LR=0.001

In [None]:
train_data_path = r"C:\Users\joaog\Deep_Learning\Dataset\chest_xray\train"
test_data_path = r"C:\Users\joaog\Deep_Learning\Dataset\chest_xray\test"
trainloader,testloader = trans_data(train_data_path,test_data_path,1000,5)

In [5]:
net = ResNet50(10).to('cuda')

criterion = DeepKNNLoss()
optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9, weight_decay=0.0001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor = 0.1, patience=5)

In [6]:
from tqdm import tqdm # Recommended for better progress bars
import time
start_time = time.time()
# --- 3. The Adapted Training Loop ---
EPOCHS = EPOC # Starting with fewer epochs for demonstration
for epoch in range(EPOCHS):
    net.train() # Set model to training mode
    
    running_loss = 0.0
    # Use tqdm for a nice progress bar
    for i, (inputs, labels) in enumerate(tqdm(trainloader, desc=f"Epoch {epoch+1}/{EPOCHS}")):
        inputs, labels = inputs.to('cuda'), labels.to('cuda')
        
        optimizer.zero_grad()
        
        # 1. Get embeddings for the entire batch
        embeddings = net(inputs)
        # The output of our net is [batch_size, 2048, 1, 1], so we flatten it
        embeddings = embeddings.flatten(start_dim=1)

        # 2. Triplet Mining: Find anchor, positive, and negative samples
        # This is a simple but effective "in-batch" mining strategy
        anchors, positives, negatives = [], [], []
        for j in range(len(labels)):
            anchor_label = labels[j]
            anchor_embedding = embeddings[j]
            
            # Find indices of positive and negative samples in the batch
            positive_indices = (labels == anchor_label) & (torch.arange(len(labels)).cuda() != j)
            negative_indices = (labels != anchor_label)
            
            # If we have at least one positive and one negative...
            if torch.any(positive_indices) and torch.any(negative_indices):
                # Select one positive and one negative
                positive_idx = torch.where(positive_indices)[0][0]
                negative_idx = torch.where(negative_indices)[0][0]
                
                anchors.append(anchor_embedding)
                positives.append(embeddings[positive_idx])
                negatives.append(embeddings[negative_idx])
        
        # 3. If we found any valid triplets, calculate the loss
        if len(anchors) > 0:
            # Convert lists to tensors
            anchor_tensor = torch.stack(anchors)
            positive_tensor = torch.stack(positives)
            negative_tensor = torch.stack(negatives)
            
            # Calculate the triplet loss
            loss = criterion(anchor_tensor, positive_tensor, negative_tensor)
            
            # Backpropagation
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()

    # Calculate and print average loss for the epoch
    avg_loss = running_loss / (i + 1)
    print(f"\nEnd of Epoch {epoch+1}, Average Triplet Loss: {avg_loss:.4f}")
    
    # Step the scheduler
    scheduler.step(avg_loss)

print('Training Done')
end_time = time.time()

Epoch 1/30: 100%|██████████| 200/200 [00:14<00:00, 13.80it/s]



End of Epoch 1, Average Triplet Loss: 0.2378


Epoch 2/30: 100%|██████████| 200/200 [00:14<00:00, 14.20it/s]



End of Epoch 2, Average Triplet Loss: 0.2362


Epoch 3/30: 100%|██████████| 200/200 [00:23<00:00,  8.41it/s]



End of Epoch 3, Average Triplet Loss: 0.1854


Epoch 4/30: 100%|██████████| 200/200 [00:16<00:00, 11.92it/s]



End of Epoch 4, Average Triplet Loss: 0.1617


Epoch 5/30: 100%|██████████| 200/200 [00:14<00:00, 14.16it/s]



End of Epoch 5, Average Triplet Loss: 0.1736


Epoch 6/30: 100%|██████████| 200/200 [00:18<00:00, 10.61it/s]



End of Epoch 6, Average Triplet Loss: 0.1548


Epoch 7/30: 100%|██████████| 200/200 [00:23<00:00,  8.41it/s]



End of Epoch 7, Average Triplet Loss: 0.1428


Epoch 8/30: 100%|██████████| 200/200 [00:25<00:00,  7.79it/s]



End of Epoch 8, Average Triplet Loss: 0.1302


Epoch 9/30: 100%|██████████| 200/200 [00:21<00:00,  9.45it/s]



End of Epoch 9, Average Triplet Loss: 0.1246


Epoch 10/30: 100%|██████████| 200/200 [00:14<00:00, 13.95it/s]



End of Epoch 10, Average Triplet Loss: 0.1347


Epoch 11/30: 100%|██████████| 200/200 [00:14<00:00, 13.49it/s]



End of Epoch 11, Average Triplet Loss: 0.1345


Epoch 12/30: 100%|██████████| 200/200 [00:14<00:00, 13.68it/s]



End of Epoch 12, Average Triplet Loss: 0.1272


Epoch 13/30: 100%|██████████| 200/200 [00:14<00:00, 13.37it/s]



End of Epoch 13, Average Triplet Loss: 0.1091


Epoch 14/30: 100%|██████████| 200/200 [00:14<00:00, 13.73it/s]



End of Epoch 14, Average Triplet Loss: 0.1206


Epoch 15/30: 100%|██████████| 200/200 [00:15<00:00, 12.89it/s]



End of Epoch 15, Average Triplet Loss: 0.1146


Epoch 16/30: 100%|██████████| 200/200 [00:14<00:00, 13.63it/s]



End of Epoch 16, Average Triplet Loss: 0.1084


Epoch 17/30: 100%|██████████| 200/200 [00:15<00:00, 13.32it/s]



End of Epoch 17, Average Triplet Loss: 0.1064


Epoch 18/30: 100%|██████████| 200/200 [00:23<00:00,  8.50it/s]



End of Epoch 18, Average Triplet Loss: 0.1176


Epoch 19/30: 100%|██████████| 200/200 [00:25<00:00,  7.91it/s]



End of Epoch 19, Average Triplet Loss: 0.1129


Epoch 20/30: 100%|██████████| 200/200 [00:25<00:00,  7.74it/s]



End of Epoch 20, Average Triplet Loss: 0.1134


Epoch 21/30: 100%|██████████| 200/200 [00:23<00:00,  8.60it/s]



End of Epoch 21, Average Triplet Loss: 0.1038


Epoch 22/30: 100%|██████████| 200/200 [00:15<00:00, 13.18it/s]



End of Epoch 22, Average Triplet Loss: 0.1067


Epoch 23/30: 100%|██████████| 200/200 [00:14<00:00, 13.49it/s]



End of Epoch 23, Average Triplet Loss: 0.1051


Epoch 24/30: 100%|██████████| 200/200 [00:14<00:00, 14.06it/s]



End of Epoch 24, Average Triplet Loss: 0.0986


Epoch 25/30: 100%|██████████| 200/200 [00:14<00:00, 13.50it/s]



End of Epoch 25, Average Triplet Loss: 0.0985


Epoch 26/30: 100%|██████████| 200/200 [00:14<00:00, 13.62it/s]



End of Epoch 26, Average Triplet Loss: 0.1020


Epoch 27/30: 100%|██████████| 200/200 [00:14<00:00, 13.79it/s]



End of Epoch 27, Average Triplet Loss: 0.0973


Epoch 28/30: 100%|██████████| 200/200 [00:14<00:00, 13.80it/s]



End of Epoch 28, Average Triplet Loss: 0.1039


Epoch 29/30: 100%|██████████| 200/200 [00:14<00:00, 13.98it/s]



End of Epoch 29, Average Triplet Loss: 0.1009


Epoch 30/30: 100%|██████████| 200/200 [00:15<00:00, 12.87it/s]


End of Epoch 30, Average Triplet Loss: 0.0973
Training Done





In [None]:
net(images)


Predicted tensor shape: torch.Size([5])
Labels tensor shape:    torch.Size([5])
------------------------------
Predicted tensor dtype: torch.int64
Labels tensor dtype:    torch.int64
------------------------------
First 10 Predictions: tensor([8, 8, 6, 8, 6], device='cuda:0')
First 10 Labels:      tensor([0, 0, 0, 0, 0], device='cuda:0')
Accuracy on 624 test images: 0.0 %


In [20]:
net.eval()

# --- Lists to store true labels and predictions ---
all_labels = []
all_predictions = []

# --- No gradient calculation needed for inference ---
with torch.no_grad():
    # Iterate over the test data
    for data in testloader:
        inputs, labels = data
        inputs, labels = inputs.to('cuda'), labels.to('cuda')

        # Get model outputs
        outputs = net(inputs)
        
        # Get the predicted class (the one with the highest score)
        _, predicted = torch.max(outputs.data, 1)
        
        # Append batch predictions and labels to the lists
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
all_features = all_predictions

In [23]:
import torch
import numpy as np

net.eval() # Set model to evaluation mode

# --- Lists to store true labels and feature vectors ---
all_labels = []
all_features = []

# --- No gradient calculation needed ---
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        inputs, labels = inputs.to('cuda'), labels.to('cuda')

        # Get model outputs and features
        outputs, features = net(inputs)
        
        # Append batch features and labels to the lists
        all_features.append(features.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Concatenate all feature batches into a single numpy array
all_features = np.concatenate(all_features, axis=0)

print(f"Shape of extracted features: {all_features.shape}")
print(f"Number of labels: {len(all_labels)}")

ValueError: too many values to unpack (expected 2)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report

# 1. Split data into a training set and a testing set for the k-NN
# This is crucial to fairly evaluate the k-NN's performance.
X_train, X_test, y_train, y_test = train_test_split(
    all_features, 
    all_labels, 
    test_size=0.3,  # Use 30% of the data for testing the k-NN
    random_state=42,
    stratify=all_labels # Ensures proportional representation of labels in splits
)

# 2. Initialize the k-NN classifier
# 'k' is the number of neighbors. 5 is a common starting point.
k = 5
knn = KNeighborsClassifier(n_neighbors=k)

# 3. Train the k-NN model on the training data
print(f"\nTraining k-NN with k={k}...")
knn.fit(X_train, y_train)
print("Training complete.")

# 4. Make predictions on the test data
print("Making predictions on the test set...")
knn_predictions = knn.predict(X_test)

# 5. Evaluate the performance of the k-NN classifier
accuracy = accuracy_score(y_test, knn_predictions)
print(f"\nAccuracy of k-NN on CNN features: {accuracy * 100:.2f}%")

# You can also get a more detailed report
print("\nClassification Report:")
print(classification_report(y_test, knn_predictions))

In [18]:
print(all_predictions)

[np.int64(8), np.int64(8), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(8), np.int64(6), np.int64(8), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(8), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6), np.int64(8), np.int64(6), np.int64(6), np.int64(6), np.int64(6)