In [None]:
import zipfile
import os

# Path to the main RBC zip file
main_zip_path = "data/RBCdataset.zip"
extracted_root = "data/RBC_extracted"

# Step 1: Extract main zip (RBCdataset.zip)
os.makedirs(extracted_root, exist_ok=True)

with zipfile.ZipFile(main_zip_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_root)
    print("✅ Extracted main RBCdataset.zip")

# Step 2: Extract each nested zip file (1 Elliptocyte 1211.zip, etc.)
rbc_types_dir = os.path.join(extracted_root, "RBCdataset")  # this folder is inside the main zip
final_dataset_dir = os.path.join("data", "RBC_final")
os.makedirs(final_dataset_dir, exist_ok=True)

for file_name in os.listdir(rbc_types_dir):
    if file_name.lower().endswith(".zip"):
        file_path = os.path.join(rbc_types_dir, file_name)
        folder_name = file_name.replace(".zip", "").replace(" ", "_")
        extract_path = os.path.join(final_dataset_dir, folder_name)
        os.makedirs(extract_path, exist_ok=True)

        with zipfile.ZipFile(file_path, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
        print(f"✅ Extracted: {file_name}")


In [None]:
import zipfile
import os

# Correct path based on your screenshot
peripheral_zip_path = "data/PeripheralBloodSmear.zip"
peripheral_extract_root = "data/Peripheral_extracted"
final_pbs_dir = "data/PBS_final"

# Make sure output folders exist
os.makedirs(peripheral_extract_root, exist_ok=True)
os.makedirs(final_pbs_dir, exist_ok=True)

# Step 1: Extract the main PeripheralBloodSmear.zip
with zipfile.ZipFile(peripheral_zip_path, 'r') as zip_ref:
    zip_ref.extractall(peripheral_extract_root)
    print("✅ Extracted PeripheralBloodSmear.zip")

# Step 2: Extract all nested zip files inside Peripheral_extracted
for root, dirs, files in os.walk(peripheral_extract_root):
    for file_name in files:
        if file_name.lower().endswith(".zip"):
            file_path = os.path.join(root, file_name)
            folder_name = file_name.replace(".zip", "").replace(" ", "_")
            extract_path = os.path.join(final_pbs_dir, folder_name)
            os.makedirs(extract_path, exist_ok=True)

            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                zip_ref.extractall(extract_path)
            print(f"✅ Extracted: {file_name}")


In [1]:
!pip install opencv-python


Defaulting to user installation because normal site-packages is not writeable


In [2]:
!pip install pillow


Defaulting to user installation because normal site-packages is not writeable


In [3]:
!pip install tqdm


Defaulting to user installation because normal site-packages is not writeable


In [4]:
!pip install matplotlib

Defaulting to user installation because normal site-packages is not writeable


In [5]:
!pip install scikit-learn

Defaulting to user installation because normal site-packages is not writeable


In [6]:
!pip install torchvision

Defaulting to user installation because normal site-packages is not writeable


In [7]:
pip install timm


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import timm
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split



  from .autonotebook import tqdm as notebook_tqdm


In [1]:
import os
import zipfile
import numpy as np
import cv2
from tqdm import tqdm
from sklearn.utils import shuffle, resample, class_weight
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torchvision.models import convnext_tiny

# ⚙️ Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 📂 Paths
TARGET_SIZE = (224, 224)
RBC_ROOT = "data/RBC_final"
PBS_ROOT = "data/PBS_final"

In [2]:

# 🔧 Image preprocessing
def preprocess_image(img_path, target_size=(224, 224)):
    try:
        img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
        if img is None:
            raise Exception("Image could not be loaded")
        img = cv2.resize(img, target_size)

        if len(img.shape) == 2 or (len(img.shape) == 3 and img.shape[2] == 1):
            img = cv2.fastNlMeansDenoising(img, None, h=10, templateWindowSize=7, searchWindowSize=21)
            img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
        else:
            img = cv2.fastNlMeansDenoisingColored(img, None, h=10, hColor=10, templateWindowSize=7, searchWindowSize=21)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        return img
    except Exception as e:
        print(f"❌ Error processing {img_path}: {e}")
        return None

# 🧹 Dataset loading
def load_dataset_from_folder(root_folder, label):
    data, labels, image_paths = [], [], []
    for subdir, _, files in os.walk(root_folder):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                image_paths.append(os.path.join(subdir, file))

    print(f"📸 Found {len(image_paths)} images in {root_folder}")
    for img_path in tqdm(image_paths, desc=f"Processing {os.path.basename(root_folder)}"):
        img = preprocess_image(img_path, TARGET_SIZE)

        if img is not None:
            data.append(img)
            labels.append(label)
    return data, labels

# 🧪 Load both datasets
print("🔄 Loading RBC dataset...")
rbc_data, rbc_labels = load_dataset_from_folder(RBC_ROOT, label=0)
print("🔄 Loading PBS dataset...")
pbs_data, pbs_labels = load_dataset_from_folder(PBS_ROOT, label=1)



🔄 Loading RBC dataset...
📸 Found 7108 images in data/RBC_final


Processing RBC_final: 100%|████████████████████████████████████████████████████████| 7108/7108 [46:02<00:00,  2.57it/s]


🔄 Loading PBS dataset...
📸 Found 80 images in data/PBS_final


Processing PBS_final: 100%|████████████████████████████████████████████████████████████| 80/80 [00:51<00:00,  1.57it/s]


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import models
from sklearn.model_selection import train_test_split
from sklearn.utils import resample, shuffle
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
import cv2
from tqdm import tqdm
from collections import Counter

# ----------------------
# Custom Dataset
# ----------------------
class CustomImageDataset(Dataset):
    def __init__(self, data, labels, augment=False):
        self.data = data
        self.labels = labels
        self.augment = augment

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.data[idx].astype(np.float32) / 255.0
        label = int(self.labels[idx])

        if self.augment:
            if np.random.rand() > 0.5:
                image = np.fliplr(image).copy()
            if np.random.rand() > 0.5:
                image = np.flipud(image).copy()
            if np.random.rand() > 0.5:
                angle = np.random.choice([90, 180, 270])
                image = cv2.rotate(image, {
                    90: cv2.ROTATE_90_CLOCKWISE,
                    180: cv2.ROTATE_180,
                    270: cv2.ROTATE_90_COUNTERCLOCKWISE
                }[angle])
        
        # Normalize to [-1, 1]
        image = (image - 0.5) / 0.5
        image = torch.tensor(image.transpose(2, 0, 1), dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.long)

        return image, label

# ----------------------
# Step 1: Prepare Data
# ----------------------
# Assume rbc_data, rbc_labels, pbs_data, pbs_labels already defined
from sklearn.utils import resample

print("🔁 Oversampling PBS to balance...")
pbs_data_upsampled, pbs_labels_upsampled = resample(
    np.array(pbs_data), np.array(pbs_labels),
    replace=True,
    n_samples=len(rbc_data),
    random_state=42
)

# Merge and shuffle
all_data = np.array(rbc_data + list(pbs_data_upsampled))
all_labels = np.array(rbc_labels + list(pbs_labels_upsampled))
all_data, all_labels = shuffle(all_data, all_labels, random_state=42)
print(f"✅ Final balanced dataset: {len(all_data)} images")

# ----------------------
# Step 2: Split + Balance Train
# ----------------------
X_train, X_val, y_train, y_val = train_test_split(
    all_data, all_labels,
    test_size=0.2,
    stratify=all_labels,
    random_state=42
)

rbc_train = X_train[y_train == 0]
pbs_train = X_train[y_train == 1]
rbc_labels_train = y_train[y_train == 0]
pbs_labels_train = y_train[y_train == 1]

pbs_upsampled, pbs_labels_upsampled = resample(
    pbs_train, pbs_labels_train,
    replace=True,
    n_samples=len(rbc_train),
    random_state=42
)

X_train_bal = np.concatenate([rbc_train, pbs_upsampled])
y_train_bal = np.concatenate([rbc_labels_train, pbs_labels_upsampled])
X_train_bal, y_train_bal = shuffle(X_train_bal, y_train_bal, random_state=42)

# Check distribution
print("🔍 Train label distribution:", Counter(y_train_bal))
print("🔍 Val label distribution:", Counter(y_val))

# ----------------------
# Step 3: Dataloaders
# ----------------------
train_dataset = CustomImageDataset(X_train_bal, y_train_bal, augment=True)
val_dataset = CustomImageDataset(X_val, y_val, augment=False)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=2)

# ----------------------
# Step 4: Model Setup
# ----------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.convnext_tiny(weights=models.ConvNeXt_Tiny_Weights.DEFAULT)

# Freeze feature extractor (optional, for stable training)
# for param in model.features.parameters():
#     param.requires_grad = False

# Replace classifier head
model.classifier[2] = nn.Sequential(
    nn.Dropout(0.3),
    nn.Linear(model.classifier[2].in_features, 2)
)
model.to(device)

# ----------------------
# Step 5: Loss & Optimizer
# ----------------------
# Use plain cross-entropy first for stability
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

# ----------------------
# Step 6: Training Loop
# ----------------------
num_epochs = 20
best_val_loss = float('inf')
patience = 3
epochs_no_improve = 0

for epoch in range(num_epochs):
    model.train()
    train_loss, correct = 0.0, 0

    for images, labels in tqdm(train_loader, desc=f"🔁 Epoch {epoch+1}/{num_epochs}"):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * images.size(0)
        correct += (outputs.argmax(1) == labels).sum().item()

    train_loss /= len(train_dataset)
    train_acc = correct / len(train_dataset)
    print(f"✅ Train Loss: {train_loss:.4f} | Accuracy: {train_acc:.4f}")

    # Validation
    model.eval()
    val_loss, val_correct = 0.0, 0
    all_preds, all_targets = [], []

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * images.size(0)
            val_correct += (outputs.argmax(1) == labels).sum().item()
            all_preds.extend(outputs.argmax(1).cpu().numpy())
            all_targets.extend(labels.cpu().numpy())

    val_loss /= len(val_dataset)
    val_acc = val_correct / len(val_dataset)
    print(f"🧪 Val Loss: {val_loss:.4f} | Accuracy: {val_acc:.4f}")

    # LR Scheduler & Early Stop
    scheduler.step(val_loss)
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save(model.state_dict(), "best_model.pth")
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("⛔ Early stopping triggered.")
            break

    # Classification report for train set (optional)
    print("\n📊 Detailed Train Report:")
    model.eval()
    train_preds, train_true = [], []
    with torch.no_grad():
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            train_preds.extend(outputs.argmax(1).cpu().numpy())
            train_true.extend(labels.cpu().numpy())
    print(classification_report(train_true, train_preds, target_names=["RBC", "PBS"]))

# ----------------------
# Step 7: Final Report
# ----------------------
print("\n📊 Classification Report:")
print(classification_report(all_targets, all_preds, target_names=["RBC", "PBS"]))

print("\n📉 Confusion Matrix:")
print(confusion_matrix(all_targets, all_preds))
