# Initializing Directory

In [None]:
import os
import shutil

In [None]:
# ROOT PATH
ROOT_PATH = '.'
DATA_PATH = f'{ROOT_PATH}/data'
PROCESSED_PATH = f'{DATA_PATH}/processed'

os.makedirs(PROCESSED_PATH, exist_ok=True)

# Setup for Kaggle

In [None]:
# Run this if you are in Kaggle and has linked via inline kaggle dataset:

# if os.environ.get('MPLBACKEND') == 'agg':
#     source_path = '/kaggle/input/tuberculosis-tb-chest-x-ray-cleaned-database/TB Chest Radiography Database/TB Chest Radiography Database/Cleaned Data'
#     destination_path = f'{DATA_PATH}/clean'
#     external_dataset_path = '/kaggle/input/tuberculosis-chest-xrays-shenzhen/images/images'
#     destination_external_dataset = f'{DATA_PATH}/external_test'
#     shutil.copytree(source_path, destination_path)
#     print(f"File copied from {source_path} to {destination_path}")


external_dataset_path = '/kaggle/input/tuberculosis-chest-xrays-shenzhen/images/images'
destination_external_dataset = f'{DATA_PATH}/external_test'
shutil.copytree(external_dataset_path, destination_external_dataset)

In [None]:
import uuid
import pandas as pd
from PIL import Image

In [None]:
def create_image_dataframe(directory, class_name, class_id):
    if class_name is None or class_id is None:
        return Exception('Please insert class name and class id')

    data = []  # List to store dictionary of image properties

    # Iterate through all files in the directory
    for root, dirs, files in os.walk(directory):
        for filename in files:
            # Construct the full file path
            file_path = os.path.join(root, filename)
            try:
                # Open the image to get its size
                with Image.open(file_path) as img:
                    width, height = img.size
                    size = f"{width}x{height}"
                format = os.path.splitext(filename)[1].lstrip('.').upper()
                data.append({
                    'FILE NAME': filename.split('.')[0],
                    'SIZE': size,
                    'CLASS_ID': class_id,
                    'CLASS_NAME': class_name,
                    'FORMAT': format
                })
            except IOError:
                # Skip files that are not images
                continue

    # Create a DataFrame from the list of image properties
    df = pd.DataFrame(data, columns=['FILE NAME', 'SIZE', 'CLASS_ID', 'CLASS_NAME', 'FORMAT'])
    return df


def prepare_clean_dataset():
    # Specify the directory containing the images
    normal_directory = f'{DATA_PATH}/clean/Normal'
    tb_directory = f'{DATA_PATH}/clean/Tuberculosis'

    # Create the DataFrame
    normal_df_from_files = create_image_dataframe(normal_directory, class_name='Normal', class_id=0)
    tb_df_from_files = create_image_dataframe(tb_directory, class_name='Tuberculosis', class_id=1)

    tb_csv_path = f'{DATA_PATH}/clean/tuberculosis.csv'
    normal_csv_path = f'{DATA_PATH}/clean/normal.csv'
    normal_df_from_files.to_csv(normal_csv_path)
    normal_df_from_files.to_csv(tb_csv_path)

    shutil.copy(normal_csv_path, f'{PROCESSED_PATH}/normal.csv')
    shutil.copy(tb_csv_path, f'{PROCESSED_PATH}/tuberculosis.csv')

    print(
        f'\n\n✅ Cleaned Dataset CSV is now available in the following path :\nNormal\t\t:\t\t{PROCESSED_PATH}/normal.csv\nTuberculosis \t:\t\t{PROCESSED_PATH}/tuberculosis.csv\n\n')
    return tb_df_from_files, normal_df_from_files

In [None]:
tb_df, normal_df = prepare_clean_dataset();

In [None]:
tb_df.head()

In [None]:
normal_df.head()

In [None]:
def convert_to_csv(file_path, output_path=f'{PROCESSED_PATH}', file_name=f'{uuid.uuid4()}'):
    os.makedirs(output_path, exist_ok=True)
    excel_file_path = file_path
    out = f'{output_path}/{file_name}.csv'
    df = pd.read_excel(excel_file_path)
    df.to_csv(out, index=False)
    print(f'File converted and saved as {out}')


def read_csv(file_path):
    return pd.read_csv(file_path)

In [None]:
# convert_to_csv(f'{DATA_PATH}/org/Normal.metadata.xlsx', file_name='normal')
# convert_to_csv(f'{DATA_PATH}/org/Tuberculosis.metadata.xlsx', file_name='tuberculosis')

In [None]:
# NORMAL_CSV_PATH = f'{PROCESSED_PATH}/normal.csv'
# TUBERCULOSIS_CSV_PATH = f'{PROCESSED_PATH}/tuberculosis.csv'
# 
# # Load Normal Data CSV
# normal_df = read_csv(file_path=NORMAL_CSV_PATH)
# normal_df = normal_df.drop(columns=['URL'])
# normal_df['CLASS_ID'] = 0
# normal_df['CLASS_NAME'] = 'NORMAL'
# 
# # Load Tuberculosis Data CSV
# tb_df = read_csv(file_path=TUBERCULOSIS_CSV_PATH)
# tb_df = tb_df.drop(columns=['URL'])
# tb_df['CLASS_ID'] = 1
# tb_df['CLASS_NAME'] = 'TUBERCULOSIS'

In [None]:
tb_images_path_list = tb_df['FILE NAME'].apply(lambda x: f'{DATA_PATH}/clean/Tuberculosis/{x}.png')
tb_images_path_list[0:4]

# Augmentation

In [None]:
augmentation_required = len(normal_df) - len(tb_df);
print(
    f"\n\nData Info :\nTuberculosis\t:\t\t{len(tb_df)} [REQUIRED AUGMENTATION : {augmentation_required} ]\nNormal\t\t\t:\t\t{len(normal_df)}")

In [None]:
import albumentations as A
from albumentations.core.composition import Compose
import random

In [None]:
# Define your augmentation pipeline
AT = Compose([
    A.Rotate(limit=(-5, 5), p=1),  # Rotation between -5 and 5 degrees
    A.RandomScale(scale_limit=(0.01, 0.02), p=1)  # Small scaling
])


In [None]:
# Function to perform augmentation and save the augmented images
import numpy as np
from tqdm import  tqdm


def augment_image(save_dir, num_augmented_images=5):
    properties = []
    for i in tqdm(range(num_augmented_images), desc="Augmenting Images"):
        image_path = random.choice(tb_images_path_list)
        image = Image.open(image_path)
        image_np = np.array(image)
        # Perform augmentation
        augmented = AT(image=image_np)
        augmented_image = Image.fromarray(augmented['image'])

        extension = 'png'
        # Save the augmented image
        filename = f"augmented_{i}_{os.path.basename(image_path).split('.')[0]}"
        augmented_image_path = os.path.join(save_dir, f'{filename}.{extension}')
        augmented_image.save(augmented_image_path)

        # Get image size
        width, height = augmented_image.size

        # Append properties to the list
        properties.append({
            'FILE NAME': filename,
            'FORMAT': extension.upper(),  # Assuming you're saving as PNG
            'SIZE': f"{width}x{height}",
            'CLASS_ID': "1",
            'CLASS_NAME': "TUBERCULOSIS",
        })

    return properties


def augment_and_save():
    save_dir = f'{DATA_PATH}/clean/Tuberculosis'
    # Ensure the save directory exists
    os.makedirs(save_dir, exist_ok=True)

    # DataFrame to store properties of all augmented images
    df_properties = pd.DataFrame(columns=['FILE NAME', 'FORMAT', 'SIZE', 'CLASS_ID', 'CLASS_NAME'])

    image_properties = augment_image(save_dir, num_augmented_images=augmentation_required)
    df_properties = pd.concat([df_properties, pd.DataFrame(image_properties)], ignore_index=True)

    # Export the DataFrame to CSV
    csv_file_path = f'{PROCESSED_PATH}/tb_augmented.csv'  # Specify your desired CSV file path
    df_properties.to_csv(csv_file_path, index=False)

    print(f"✅ Augmentation completed and properties saved to CSV. \n📁Output directory : {save_dir}")


In [None]:
augment_and_save()

In [None]:
augmented_tb_df = read_csv(f'{PROCESSED_PATH}/tb_augmented.csv')
augmented_tb_df.head()

In [None]:
tb_final_df = pd.concat([tb_df, augmented_tb_df])

In [None]:
print(
    f"[ ✅ Data set ready for processing ]\nNormal Data Count \t\t\t:\t\t\t{len(normal_df)}\nTuberculosis Data Count\t\t:\t\t\t{len(tb_final_df)}")

In [None]:
merged_df = pd.concat([tb_df, normal_df]).sample(frac=1).reset_index(drop=True)
merged_df.to_csv(f'{PROCESSED_PATH}/merged_dataset.csv')
merged_df.head()

In [None]:

# Define a function that constructs the path based on the class
def construct_path(row):
    if row['CLASS_ID'] == 0:
        return f'{DATA_PATH}/clean/Normal/{row["FILE NAME"]}.png'
    elif row['CLASS_ID'] == 1:
        return f'{DATA_PATH}/clean/Tuberculosis/{row["FILE NAME"]}.png'
    else:
        return None  # Or some default path


# Apply the function to each row of the DataFrame to generate the paths
merged_df['IMAGE PATH'] = merged_df.apply(construct_path, axis=1)
merged_images_path_list = merged_df['IMAGE PATH'].apply(lambda x: x)
merged_images_path_list[0:5]

# Data preparation for Tuberculosis Classification

## Train, test and validation split

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
train_val_df, test_df = train_test_split(merged_df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.125, random_state=42)  # 0.125 x 0.8 = 0.1

## Preparing Image Dataset

In [None]:
import re

def to_sentence_case(text):
    # Split the text into sentences using a regular expression
    sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
    
    # Capitalize the first letter of each sentence and join them back
    sentence_case_text = '. '.join(sentence.capitalize() for sentence in sentences)
    
    return sentence_case_text

In [None]:
from torch.utils.data import Dataset


class ImageDataset(Dataset):
    def __init__(self, dataframe, data_path, transform=None):
        self.dataframe = dataframe
        self.data_path = data_path
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        img_name = row['FILE NAME']
        class_name = row['CLASS_NAME']
        format = row['FORMAT'].lower()
        img_path = os.path.join(self.data_path, 'clean', to_sentence_case(class_name), f'{img_name}.{format}')
        image = Image.open(img_path)
        label = row['CLASS_ID']

        if self.transform:
            image = self.transform(image)

        sample = {'image': image, 'label': label, 'img_path': img_path, 'attributes': row.to_dict()}
        return sample


In [None]:
import matplotlib.pyplot as plt

def plot_samples(dataset, name, num_samples=5):
    print(f"Samples from {name} dataset:")
    plt.figure(figsize=(10, num_samples * 2))  # Set the figure size

    for i in range(num_samples):
        sample = dataset[i]  # Using the modified __getitem__ method
        image, img_path, attributes = sample['image'], sample['img_path'], sample['attributes']

        # Adjust the image for plotting
        if image.shape[0] == 1:  # Single-channel image
            image = image.squeeze(0)  # Remove the channel dimension

        # Plotting the image
        ax = plt.subplot(num_samples, 1, i + 1)  # Change the layout to have 1 column
        if image.ndim == 2:  # Grayscale image
            ax.imshow(image, cmap='gray')  # Use cmap='gray' for grayscale images
        else:
            ax.imshow(image.permute(1, 2, 0))  # For RGB images, permute the dimensions
        ax.axis('off')  # Hide the axes

    plt.tight_layout()
    plt.show()
    print("\n")


In [None]:
IMAGE_SIZE = (256, 256)

In [None]:
import torch
from torch.utils.data import DataLoader

## Pre transformation for normalization values

In [None]:
from torchvision import transforms
import os


# Assuming you have a list of image paths

def calculate_mean_and_std():
    # Define the initial transformations (without normalization)
    pre_transforms = transforms.Compose([
#         transforms.Grayscale(num_output_channels=3),
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),
    ])

    # Initialize lists to store all pixel values for each channel
    pixels = []

    # Loop through all images
    for image_path in tqdm(merged_images_path_list, desc="Calculating ... "):
        # Load image
        img = Image.open(image_path)

        # Apply initial transformations
        img_tensor = pre_transforms(img)

        # Flatten image tensor and add it to the list
        pixels.append(img_tensor.view(-1))

    # Concatenate all pixels to have a single tensor
    all_pixels = torch.cat(pixels, dim=0)

    # Calculate mean and std
    mean = all_pixels.mean()
    std = all_pixels.std()

    return mean, std


In [None]:
mean, std = calculate_mean_and_std()

In [None]:
import cv2


class CLAHETransform:
    def __init__(self, clip_limit=2.0, tile_grid_size=(8, 8)):
        self.clip_limit = clip_limit
        self.tile_grid_size = tile_grid_size

    def __call__(self, img):
        # Convert PIL image to numpy array
#         img_np = np.array(img)

        # If the image is grayscale, convert it to a 3-channel image for CLAHE
        if len(img_np.shape) == 2:
            img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2BGR)

        # Initialize CLAHE
        clahe = cv2.createCLAHE(clipLimit=self.clip_limit, tileGridSize=self.tile_grid_size)

        # Split the image into channels
        channels = cv2.split(img_np)

        # Apply CLAHE to each channel
        clahe_channels = [clahe.apply(channel) for channel in channels]

        # Merge the channels back
        clahe_img = cv2.merge(clahe_channels)

        # If the original image was grayscale, convert back to single channel
        if len(img_np.shape) == 2:
            clahe_img = cv2.cvtColor(clahe_img, cv2.COLOR_BGR2GRAY)

        # Convert numpy array back to PIL Image
        img = Image.fromarray(clahe_img)

        return img

In [None]:
std.item()

In [None]:
T = transforms.Compose([
#     CLAHETransform(),  # Uncomment or add your custom transforms as needed
    transforms.Grayscale(num_output_channels=3),  # Convert images to 3-channel grayscale
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor(),
    # Normalize with mean and std for RGB images; adjust these values as per your dataset
    transforms.Normalize(mean=[mean.item(), mean.item(), mean.item()], std=[std.item(), std.item(), std.item()]),
])


In [None]:

# Initialize your custom dataset with transformations
train_dataset = ImageDataset(train_df, DATA_PATH, transform=T)
val_dataset = ImageDataset(val_df, DATA_PATH, transform=T)
test_dataset = ImageDataset(test_df, DATA_PATH, transform=T)

# Create DataLoader for each set
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Print a few samples from each dataset
plot_samples(train_dataset, 'Train')
plot_samples(val_dataset, 'Validation')
plot_samples(test_dataset, 'Test')

# Tuberculosis Classification
The model training involves the following steps:
## 1. Preparing the model

In [None]:
import torch.nn as nn

IMAGE_INPUT_CHANNEL = 'RGB'
KERNEL_SIZE = 3
POOL_KERNEL_SIZE = 2

class TuberculosisCNNReduced(nn.Module):
    def __init__(self):
        super(TuberculosisCNNReduced, self).__init__()
        # Define a simplified CNN architecture with 10 layers in total
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=KERNEL_SIZE, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=POOL_KERNEL_SIZE, stride=2),  # Layer 1

            nn.Conv2d(32, 64, kernel_size=KERNEL_SIZE, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=POOL_KERNEL_SIZE, stride=2),  # Layer 2

            nn.Conv2d(64, 128, kernel_size=KERNEL_SIZE, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=POOL_KERNEL_SIZE, stride=2),  # Layer 3

            nn.Conv2d(128, 256, kernel_size=KERNEL_SIZE, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=POOL_KERNEL_SIZE, stride=2),  # Layer 4
        )

        self.classifier = nn.Sequential(
            nn.Linear(256 * (256 // (2 ** 4)) * (256 // (2 ** 4)), 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),  # Layer 5
            nn.Linear(1024, 256),
            nn.ReLU(inplace=True),  # Layer 6
            nn.Linear(256, 2),  # Layer 7
        )

    def forward(self, x):
        x = self.features(x)  # Pass input through feature extractor
        x = torch.flatten(x, 1)  # Flatten the features for the classifier
        x = self.classifier(x)  # Pass through the classifier
        return x

# Initialize the reduced model
tb_model = TuberculosisCNNReduced()


In [None]:
tb_model

## Plotting Functions

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, confusion_matrix
import seaborn as sns

def plot_metrics(train_accuracies, val_accuracies, test_accuracies, true_labels, predicted_labels):
    # Set up the matplotlib figure
    plt.figure(figsize=(12, 10))

    # Train vs Test Accuracy
    plt.subplot(2, 2, 1)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(test_accuracies, label='Test Accuracy')
    plt.title('Train vs Test Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    # Train vs Validation Accuracy
    plt.subplot(2, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Train vs Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    # Precision-Recall Curve
    plt.subplot(2, 2, 3)
    precision, recall, _ = precision_recall_curve(true_labels, predicted_labels)
    plt.plot(recall, precision, marker='.')
    plt.title('Precision-Recall Curve')
    plt.xlabel('Recall')
    plt.ylabel('Precision')

    # Confusion Matrix
    plt.subplot(2, 2, 4)
    cm = confusion_matrix(true_labels, predicted_labels)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')

    plt.tight_layout()
    plt.show()
    
     # Define the directory path for saving the plots
    save_dir = f'{DATA_PATH}/logs'
    
    # Create the directory if it does not exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # Save the figure
    plt.savefig(f'{save_dir}/training_metrics.png')
    plt.close()  # Close the figure to free memory



## Training Classification Model

In [None]:
LEARNING_RATE = 0.0001
NUM_OF_EPOCHS = 50

OPTIMIZER = 'SGD'  # Other options include 'SGD', 'RMSprop', 'AdamW', etc.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import os

# Assuming TuberculosisCNN and ImageDataset are defined elsewhere

best_val_accuracy = 0

def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    train_loss = 0.0
    correct = 0
    total = 0
    for batch in tqdm(train_loader, desc="Training \t"):
        images = batch['image'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_accuracy = correct / total
    return train_loss / len(train_loader.dataset), train_accuracy

def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Validating \t"):
            images = batch['image'].to(device)
            labels = batch['label'].to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_accuracy = correct / total
    return val_loss / len(val_loader.dataset), val_accuracy

def visualize_training_data(train_accuracies, val_accuracies, train_losses, val_losses, epoch=0, save_path='training_plots'):
    plt.figure(figsize=(12, 6))

    # Train vs Validation Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Train vs Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    # Train vs Validation Loss
    plt.subplot(1, 2, 2)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Train vs Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    # Check if save_path directory exists, if not, create it
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    # Save the figure
    plt.savefig(os.path.join(save_path, f'training_validation_plot_epoch_{OPTIMIZER}_{epoch + 1}.png'))
    plt.show()
    plt.close()  # Close the figure to free up memory

def save_and_log(model, epoch, train_loss, val_loss, val_accuracy, train_accuracy, logs_dir):
    os.makedirs(logs_dir, exist_ok=True)
    last_model_path = os.path.join(logs_dir, 'last.pth')
    torch.save(model.state_dict(), last_model_path)
    print(f'Last model saved to {last_model_path}')

    # Save the best model if the current validation accuracy is better than the best seen so far
    if epoch > 10:
        torch.save(model.state_dict(), f'{logs_dir}/check_point_{epoch+1}.pth')

    new_row = {
        'Epoch': epoch + 1,
        'Train Loss': train_loss,
        'Validation Loss': val_loss,
        'Train Accuracy': train_accuracy,
        'Validation Accuracy': val_accuracy
    }
    # Print each item in the dictionary on a new line with a tab indent
    for key, value in new_row.items():
        print(f"{key}\t\t:\t\t{value}")

    csv_path = f'{DATA_PATH}/logs/training_data.csv'

    # Check if the file exists
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path)
        new_df = pd.DataFrame([new_row])
        df = pd.concat([df, new_df], ignore_index=True)
    else:
        df = pd.DataFrame([new_row])

    df.to_csv(csv_path, index=False)
    print(new_row)
    print(f'Training data updated in {csv_path}')
    
    
def train(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    train_accuracies = []
    val_accuracies = []
    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        train_loss, train_accuracy = train_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_accuracy = validate(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)
        
        best_val_accuracy = max(val_accuracies)

        
        save_and_log(tb_model, epoch, train_losses[-1], val_losses[-1], val_accuracies[-1], train_accuracies[-1],  f'{DATA_PATH}/logs')
        print("\n\n\nVisualization\n\n\n")
        visualize_training_data(train_accuracies, val_accuracies, train_losses, val_losses, epoch)
        print(f"\n\n\nEpoch [{epoch + 1} / {NUM_OF_EPOCHS}] completed ✅ \n\n\n")

    print('\n\n✅ Training completed.\n\n')



In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tb_model = TuberculosisCNNReduced().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(tb_model.parameters(), lr=LEARNING_RATE)


# Optimizer setup based on the OPTIMIZER constant
if OPTIMIZER == 'Adam':
    optimizer = optim.Adam(tb_model.parameters(), lr=LEARNING_RATE)
elif OPTIMIZER == 'SGD':
    optimizer = optim.SGD(tb_model.parameters(), lr=LEARNING_RATE, momentum=0.9)
elif OPTIMIZER == 'RMSprop':
    optimizer = optim.RMSprop(tb_model.parameters(), lr=LEARNING_RATE)
elif OPTIMIZER == 'AdamW':
    optimizer = optim.AdamW(tb_model.parameters(), lr=LEARNING_RATE)
# Add more optimizers as needed
else:
    raise ValueError("Unsupported optimizer")

In [None]:
# Remove all logs that consume most of the storage before training a new set
# !rm -r data/logs
# !rm -r /kaggle/working/data/output_2024_02_24_06_13_42.zip
# !rm -r /kaggle/working/training_plots

In [None]:
train(tb_model, train_loader, val_loader, criterion, optimizer, NUM_OF_EPOCHS, device)

In [None]:
# !rm -r /kaggle/working/training_plots_2024_03_04.zip
# !rm -r /kaggle/working/logs_2024_03_04.zip
# !rm -r /kaggle/working/training_plots_2024_03_04_14_29_34.zip

In [None]:
import shutil
import datetime

def zip_directories():
    # Get the current date in snake_case format
    date_str = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
    # Define the directories to zip
    training_plots_dir = '/kaggle/working/training_plots'
    logs_dir = '/kaggle/working/data/logs'

    # Define the base directory for the zip files
    base_dir = '/kaggle/working'

    # Zip the training_plots directory
    shutil.make_archive(f'{base_dir}/training_plots_{date_str}', 'zip', training_plots_dir)

    # Zip the logs directory with the current date in the filename
#     shutil.make_archive(f'{base_dir}/logs_{date_str}', 'zip', logs_dir)

    print(f"Created zip: {base_dir}/training_plots_{date_str}.zip")
#     print(f"Created zip: {base_dir}/logs_{date_str}.zip")


In [None]:
# Call the function to create the zip files
zip_directories()

In [None]:
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import torch
import os
from tqdm import tqdm

from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
from itertools import cycle


def plot_roc_curve(fpr, tpr, roc_auc, num_classes):
    plt.figure(figsize=(7, 7))
    colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])
    for i, color in zip(range(num_classes), colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label='ROC curve of class {0} (area = {1:0.2f})'.format(i, roc_auc[i]))

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) - multi-class')
    plt.legend(loc="lower right")
    plt.show()

    
def plot_confusion_matrix(true_labels, predicted_labels):
    cm = confusion_matrix(true_labels, predicted_labels)
    plt.figure(figsize=(8, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', square=True, xticklabels=True, yticklabels=True)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title('Confusion Matrix')
    
    save_path = 'training_plots'
    # Check if save_path directory exists, if not, create it
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    # Save the figure
    plt.savefig(os.path.join(save_path, 'confusion_matrix.png'))
    plt.show()
    plt.close()

def prf1_table(true_labels, predicted_labels):
    precision, recall, f1_score, _ = precision_recall_fscore_support(true_labels, predicted_labels)
    metrics_df = pd.DataFrame({
        'Class': range(len(precision)),
        'Precision': precision,
        'Recall': recall,
        'F1 Score': f1_score
    })
    print(metrics_df)

    
def test(model, test_loader, criterion, device, checkpoint_path):
    # Load the trained model from the checkpoint
    model.load_state_dict(torch.load(checkpoint_path))
    model.to(device)
    model.eval()

    test_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_true = []
    all_scores = []  # List to store all the model's output scores for the positive class

    with torch.no_grad():
        for batch in tqdm(test_loader, desc="Testing"):
            images = batch['image'].to(device)
            labels = batch['label'].to(device)
            outputs = model(images)

            loss = criterion(outputs, labels)
            test_loss += loss.item() * images.size(0)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_true.extend(labels.cpu().numpy())
            # For binary classification, store the score of the positive class
            all_scores.extend(outputs.cpu().numpy()[:, 1])  # Assuming index 1 is the positive class

    test_accuracy = correct / total
    print(f'Test Loss: {test_loss / len(test_loader.dataset):.4f}, Test Accuracy: {test_accuracy:.4f}')

    # Convert true labels and scores to numpy arrays
    all_true_array = np.array(all_true)
    all_scores_array = np.array(all_scores)

    # Compute ROC curve and ROC area
    fpr, tpr, _ = roc_curve(all_true_array, all_scores_array)
    roc_auc = auc(fpr, tpr)

    cm = confusion_matrix(all_true, all_preds)  # Compute confusion matrix for further analysis
    # Plot ROC curve
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC)')
    plt.legend(loc="lower right")
    plt.show()

    return all_true, all_preds, cm


In [None]:
import pandas as pd
import os
import shutil

def zip_best_model_and_plot(csv_path, models_dir, plots_dir, output_zip_path):
    # Load the training data
    print("Zipping best epochs logs (.pth and corresponding log images) ....")
    df = pd.read_csv(csv_path)
    
    # Select the best epoch: here we use the highest validation accuracy
    # You can change it to use the lowest validation loss if you prefer
    best_epoch_row = df.loc[df['Validation Accuracy'].idxmax()]
    best_epoch = int(best_epoch_row['Epoch'])
    
    # Construct the filenames for the best model and corresponding plot
    # Adjust the filename patterns based on your actual naming conventions
    best_model_filename = f'check_point_{best_epoch}.pth'
    best_plot_filename = f'training_validation_plot_epoch_{OPTIMIZER}_{best_epoch}.png'
    
    # Define the paths for the best model and plot
    best_model_path = os.path.join(models_dir, best_model_filename)
    best_plot_path = os.path.join(plots_dir, best_plot_filename)
    
    # Check if the files exist
    if not os.path.exists(best_model_path) or not os.path.exists(best_plot_path):
        raise FileNotFoundError("The best model or plot file does not exist.")
    
    # Create a temporary directory to hold the files for zipping
    temp_dir = os.path.join('/tmp', 'best_model_and_plot')
    os.makedirs(temp_dir, exist_ok=True)
    
    # Copy the best model and plot to the temporary directory
    shutil.copy(best_model_path, temp_dir)
    shutil.copy(best_plot_path, temp_dir)
    
    # Create a zip file containing the best model and plot
    shutil.make_archive(output_zip_path, 'zip', temp_dir)
    
    # Clean up the temporary directory
    shutil.rmtree(temp_dir)
    
    print(f"Created zip file with the best model and plot: {output_zip_path}.zip")
    return best_epoch

# Example usage
csv_path = '/kaggle/working/data/logs/training_data.csv'
models_dir = '/kaggle/working/data/logs'
plots_dir = '/kaggle/working/training_plots'
output_zip_path = '/kaggle/working/best_model_and_plot'

# best_epoch = zip_best_model_and_plot(csv_path, models_dir, plots_dir, output_zip_path)

print(f" 🟢 Best epoch : {best_epoch}\n ✅ Logs saved on : {output_zip_path}")


In [None]:
# Define the path to your saved model checkpoint
checkpoint_path = f'{DATA_PATH}/logs/check_point_20.pth'

# Call the test function
true_labels, predicted_labels,cm = test(tb_model, test_loader, criterion, device, checkpoint_path)

# Plot the confusion matrix
plot_confusion_matrix(true_labels, predicted_labels)


In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
from itertools import cycle
import numpy as np

def plot_roc_curve(true_labels, scores, num_classes):
    # Compute ROC curve and ROC area for each class
    fpr = dict()
    tpr = dict()
    roc_auc = dict()

    # Convert labels to one-hot encoding
    true_labels_one_hot = np.eye(num_classes)[true_labels]

    for i in range(num_classes):
        fpr[i], tpr[i], _ = roc_curve(true_labels_one_hot[:, i], scores[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Plot all ROC curves
    plt.figure()
    colors = cycle(['blue', 'red', 'green', 'cyan', 'magenta', 'yellow', 'black', 'pink', 'lightblue', 'lightgreen', 'gray', 'indigo', 'orange', 'brown', 'purple'])
    for i, color in zip(range(num_classes), colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label='ROC curve of class {0} (area = {1:0.2f})'
                 ''.format(i, roc_auc[i]))

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) for multi-class')
    plt.legend(loc="lower right")
    plt.show()



In [None]:
prf1_table(true_labels, predicted_labels)

## End of training

# External Testing

In [None]:
class ExternalTestDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        img_name = row['FILE NAME']
        class_name = row['CLASS_NAME']
        format = row['FORMAT'].lower()
        img_path = row['IMAGE PATH']
        image = Image.open(img_path)
        label = row['CLASS_ID']

        if self.transform:
            image = self.transform(image)

        sample = {'image': image, 'label': label, 'img_path': img_path, 'attributes': row.to_dict()}
        return sample


In [None]:
import pandas as pd

def convert_external_dataset(external_csv_path, output_data_path, image_format='PNG'):
    
    def get_clean_filename(x):
        file_name = x.split('\\n')[0]
        return f"{DATA_PATH}/external_test/{file_name}"
        
    
    # Read the external dataset CSV into a DataFrame
    external_df = pd.read_csv(external_csv_path)

    # Convert the external DataFrame into the format of your own dataset DataFrame
    converted_df = external_df[['study_id', 'findings']].copy()
    converted_df.rename(columns={'study_id': 'FILE NAME', 'findings': 'CLASS_NAME'}, inplace=True)

    # Add the 'SIZE', 'CLASS_ID', and 'FORMAT' columns, filling them with default or derived values
    converted_df['SIZE'] = 'Unknown'  # Assuming the actual size is unknown; adjust if you have this information
    converted_df['CLASS_ID'] = converted_df['CLASS_NAME'].apply(lambda x: 0 if x.lower() == 'normal' else 1)
    converted_df['IMAGE PATH'] = converted_df['FILE NAME'].apply(lambda x: get_clean_filename(x))
#     converted_df['CLASS_NAME'] = converted_df['CLASS_ID'].apply(lambda x: 'Tuberculosis' if x == 1 else 'Normal')
    
    converted_df['FORMAT'] = image_format

    # Reorder the columns to match your own dataset DataFrame
    converted_df = converted_df[['FILE NAME', 'SIZE', 'CLASS_ID', 'CLASS_NAME', 'FORMAT', 'IMAGE PATH']]

    # Save the converted DataFrame to a new CSV file
    output_csv_path = f'{output_data_path}/converted_shenzhen_metadata.csv'
    converted_df.to_csv(output_csv_path, index=False)

    print(f'Converted DataFrame saved to {output_csv_path}')

In [None]:
# Define the paths
external_csv_path = '/kaggle/input/tuberculosis-chest-xrays-shenzhen/shenzhen_metadata.csv'
output_data_path = DATA_PATH  # Replace 'your_data_path_here' with the actual path

# Call the function
convert_external_dataset(external_csv_path, output_data_path)

external_df = pd.read_csv('/kaggle/working/data/converted_shenzhen_metadata.csv')
external_df.head()

In [None]:
def get_random_external_samples(df, class_column, n_samples_per_class):
    # Separate the DataFrame into two DataFrames based on the class
    df_class_0 = df[df[class_column] == 0]
    df_class_1 = df[df[class_column] == 1]
    
    # Sample n_samples_per_class from each DataFrame
    # If a class has fewer instances than n_samples_per_class, all instances are returned
    # The replace=True parameter allows for sampling with replacement if needed
    df_class_0_sampled = df_class_0.sample(min(len(df_class_0), n_samples_per_class), replace=True, random_state=42)
    df_class_1_sampled = df_class_1.sample(min(len(df_class_1), n_samples_per_class), replace=True, random_state=42)
    
    # Concatenate the sampled DataFrames to create a balanced DataFrame
    balanced_df = pd.concat([df_class_0_sampled, df_class_1_sampled], ignore_index=True)
    
    # Shuffle the DataFrame before returning
    shuffled_balanced_df = balanced_df.sample(frac=1, random_state=42).reset_index(drop=True)
    
    return shuffled_balanced_df

# Specify the number of samples you want for each class
n_samples = 70  # Adjust this number as needed

# Create a new balanced and shuffled DataFrame
balanced_shuffled_external_df = get_random_external_samples(external_df, 'CLASS_ID', n_samples)

final_test_df = pd.concat([test_df.head(len(test_df) - int(n_samples) * 2), balanced_shuffled_external_df])
final_test_df.tail()

In [None]:
external_test_dataset = ExternalTestDataset(final_test_df, transform=T)
external_test_loader = DataLoader(external_test_dataset, batch_size=32, shuffle=False)

In [None]:
len(final_test_df)

In [None]:
# Define the path to your saved model checkpoint
checkpoint_path = f'{DATA_PATH}/logs/check_point_20.pth'

# Call the test function
true_labels, predicted_labels,cm = test(tb_model, external_test_loader, criterion, device, checkpoint_path)

# Plot the confusion matrix
plot_confusion_matrix(true_labels, predicted_labels)

In [None]:
prf1_table(true_labels, predicted_labels)