In [2]:


import pandas as pd 
import numpy as np 
from glob import glob
import cv2 
import matplotlib.pylab as plt 
import random
import os
import re

# view sample images
categories = ['benign', 'malignant', 'normal']
base_path = 'breast-ultrasound-images-dataset/'
for category in categories:
    # Get all image paths for the current category
    images_path = glob(os.path.join(base_path, category, "*.png"))
    if images_path:
        
        sample_path = images_path[0]
        sample_img = cv2.imread(sample_path)
        
        if sample_img is not None:
            sample_img_rgb = cv2.cvtColor(sample_img, cv2.COLOR_BGR2RGB)
            
            # Find corresponding mask
            mask_path = sample_path.replace('.png', '_mask.png')
            
            plt.figure(figsize=(12, 6))
            plt.subplot(1, 2, 1)
            plt.imshow(sample_img_rgb)
            print(sample_img_rgb.shape)
            plt.title(f"{category}")
            if os.path.exists(mask_path):
                mask_img = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
                plt.subplot(1, 2, 2)
                plt.imshow(mask_img, cmap='gray')
                print(mask_img.shape)
                plt.title(f"Sample {category} Mask")
            plt.axis('off')
            plt.show()
        else:
            print(f"Error: Unable to load image at {sample_path}")
    else:
        print(f"No images found in category: {category}")




In [None]:
# count number images
for category in categories:
    print(f'sample path: {len(glob(os.path.join(base_path, category, "*.png")))} images in {category}')
    images_path = glob(os.path.join(base_path, category, "*.png"))
    print(images_path[0]) 

In [None]:
# Resize images
for category in categories:
    
    images_path = glob(os.path.join(base_path, category, "*.png"))
    
    for i in images_path:
        original = cv2.imread(i, 0)  
        if original is not None:  
            resized = cv2.resize(original, (256, 256))  
            cv2.imwrite(i, resized)  
            
        else:
            print(f"Error: Unable to read image at {i}")

    print(f"Resized images in category: {category}")

In [None]:
# verify all images are 256x256 

for category in categories:

    images = glob(os.path.join(base_path, category, '*.png'))
    
    for image in images:
        current_image = cv2.imread(image, cv2.IMREAD_UNCHANGED)  
        
        if current_image is not None: 
            current_shape = current_image.shape

            if current_shape != (256, 256) and current_shape != (256, 256, 3):
                print(f"Image Path: {image}")
                print(f"Shape: {current_shape}")
        else:
            print(f"Error: Unable to read image at {image}")

In [None]:
# Run thistraining code as separate .ipynb file 

from tqdm import tqdm
import sys
import torch 
import torch.nn as nn 
import torch.optim as optim 
from torch.utils.data import Dataset, DataLoader 
import torchvision 
from torchvision import models, transforms 
from sklearn.model_selection import train_test_split 
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score 




# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


device = torch.device("cpu")



DATASET_PATH = "breast-ultrasound-images-dataset"
BENIGN_PATH = os.path.join(DATASET_PATH, "benign")
MALIGNANT_PATH = os.path.join(DATASET_PATH, "malignant")
NORMAL_PATH = os.path.join(DATASET_PATH, "normal")


BATCH_SIZE = 16
EPOCHS = 30
LEARNING_RATE = 0.0001
IMAGE_SIZE = 224  
NUM_CLASSES = 3
VALIDATION_SPLIT = 0.2


class_map = {
    'benign': 0,
    'malignant': 1,
    'normal': 2
}
idx_to_class = {v: k for k, v in class_map.items()}

# Check dataset structure
def inspect_dataset():
    benign_imgs = glob(os.path.join(BENIGN_PATH, "benign (*).png"))
    benign_masks = glob(os.path.join(BENIGN_PATH, "benign (*)_mask.png"))
    malignant_imgs = glob(os.path.join(MALIGNANT_PATH, "malignant (*).png"))
    malignant_masks = glob(os.path.join(MALIGNANT_PATH, "malignant (*)_mask.png"))
    normal_imgs = glob(os.path.join(NORMAL_PATH, "normal (*).png"))
    normal_masks = glob(os.path.join(NORMAL_PATH, "normal (*)_mask.png"))

    print(f"Benign images: {len(benign_imgs)}")
    print(f"Benign masks: {len(benign_masks)}")
    print(f"Malignant images: {len(malignant_imgs)}")
    print(f"Malignant masks: {len(malignant_masks)}")
    print(f"Normal images: {len(normal_imgs)}")
    print(f"Normal masks: {len(normal_masks)}")
    return {
        'benign': [img for img in benign_imgs if '_mask' not in img],
        'malignant': [img for img in malignant_imgs if '_mask' not in img],
        'normal': normal_imgs
    }


class BreastUltrasoundDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None, use_masks=False):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
        self.use_masks = use_masks
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        if self.use_masks:
            mask_path = img_path.replace('.png', '_mask.png')
            if os.path.exists(mask_path):
                mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
                mask = mask / 255.0  
                img = img * mask[..., None]  
        
        if self.transform:
            img = self.transform(img)
        
        return img, self.labels[idx]

def prepare_data(use_masks=False):
    
    class_images = inspect_dataset()
    
    all_images = []
    all_labels = []
    
    for class_name, img_paths in class_images.items():
        for img_path in img_paths:
            all_images.append(img_path)
            all_labels.append(class_map[class_name])
    
    
    train_imgs, val_imgs, train_labels, val_labels = train_test_split(
        all_images, all_labels, test_size=VALIDATION_SPLIT, 
        random_state=42, stratify=all_labels
    )
    
    print(f"Training set: {len(train_imgs)} images")
    print(f"Validation set: {len(val_imgs)} images")
    
    # Define transformations
    train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                         std=[0.229, 0.224, 0.225])
    ])
    
    val_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                             std=[0.229, 0.224, 0.225])
    ])
    
    # Create datasets
    train_dataset = BreastUltrasoundDataset(
        train_imgs, train_labels, transform=train_transform, use_masks=use_masks
    )
    val_dataset = BreastUltrasoundDataset(
        val_imgs, val_labels, transform=val_transform, use_masks=use_masks
    )
    
    
    train_loader = DataLoader(
        train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0
    )
    val_loader = DataLoader(
        val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0
    )
    
    return train_loader, val_loader


def create_model(use_masks=False):
    
    model = models.densenet121(weights='IMAGENET1K_V1')
    

    
    
    num_ftrs = model.classifier.in_features
    model.classifier = nn.Sequential(
        nn.Dropout(0.2),
        nn.Linear(num_ftrs, NUM_CLASSES)
    )
    
    return model

# Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    best_val_acc = 0.0
    history = {
        'train_loss': [],
        'val_loss': [],
        'train_acc': [],
        'val_acc': [],
    }
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0
        
        for inputs, labels in tqdm(train_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                
                # Backward pass + optimize
                loss.backward()
                optimizer.step()
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)
        
        print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
        
        history['train_loss'].append(epoch_loss)
        history['train_acc'].append(epoch_acc.item())
        if scheduler is not None:
            scheduler.step(epoch_loss)

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0
        
        for inputs, labels in tqdm(val_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Forward pass
            with torch.no_grad():
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        
        epoch_loss = running_loss / len(val_loader.dataset)
        epoch_acc = running_corrects.double() / len(val_loader.dataset)
        
        print(f'Val Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
        
        history['val_loss'].append(epoch_loss)
        history['val_acc'].append(epoch_acc.item())
        
        
        if epoch_acc > best_val_acc:
            best_val_acc = epoch_acc
            torch.save(model.state_dict(), 'best_densenet121_breast_ultrasound.pth')
            print(f'Best model saved with accuracy: {best_val_acc:.4f}')
    
    return model, history


def plot_training_history(history):
    plt.figure(figsize=(12, 5))
    
    
    plt.subplot(1, 2, 1)
    plt.plot(history['train_acc'])
    plt.plot(history['val_acc'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    
    
    plt.subplot(1, 2, 2)
    plt.plot(history['train_loss'])
    plt.plot(history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.show()


def evaluate_model(model, val_loader):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(val_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    
    accuracy = accuracy_score(all_labels, all_preds)
    conf_matrix = confusion_matrix(all_labels, all_preds)
    class_report = classification_report(all_labels, all_preds, 
                                         target_names=list(class_map.keys()),
                                         output_dict=True)
    
    print(f'Validation Accuracy: {accuracy:.4f}')
    print('\nConfusion Matrix:')
    print(conf_matrix)
    print('\nClassification Report:')
    print(classification_report(all_labels, all_preds, target_names=list(class_map.keys())))
    
    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()
    tick_marks = np.arange(len(class_map))
    plt.xticks(tick_marks, class_map.keys(), rotation=45)
    plt.yticks(tick_marks, class_map.keys())
    
    fmt = 'd'
    thresh = conf_matrix.max() / 2.
    for i in range(conf_matrix.shape[0]):
        for j in range(conf_matrix.shape[1]):
            plt.text(j, i, format(conf_matrix[i, j], fmt),
                    ha="center", va="center",
                    color="white" if conf_matrix[i, j] > thresh else "black")
    
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.savefig('confusion_matrix.png')
    plt.show()
    
    return accuracy, conf_matrix, class_report


def predict_image(model, image_path, use_masks=False):
    model.eval()
    
    
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    
    if use_masks:
        mask_path = image_path.replace('.png', '_mask.png')
        if os.path.exists(mask_path):
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
            mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB)
            img = np.concatenate([img, mask], axis=2)
    
    
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    img = transform(img).unsqueeze(0).to(device)
    
    # Make prediction
    with torch.no_grad():
        outputs = model(img)
        _, preds = torch.max(outputs, 1)
        probs = torch.nn.functional.softmax(outputs, dim=1)
    
    pred_class = idx_to_class[preds.item()]
    pred_prob = probs[0][preds].item()
    
    return pred_class, pred_prob, probs.cpu().numpy()[0]

# Main execution
if __name__ == "__main__":
    
    USE_MASKS = True  # Set to False if you don't want to use mask images
    
  
    train_loader, val_loader = prepare_data(use_masks=USE_MASKS)
    
    
    model = create_model(use_masks=USE_MASKS)
    model = model.to(device)
    
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.1, patience=5
    )
    
    
    model, history = train_model(
        model, train_loader, val_loader, criterion, optimizer, EPOCHS
    )
    
    
    plot_training_history(history)
    
    
    model.load_state_dict(torch.load('best_densenet121_breast_ultrasound.pth'))
    
   
    accuracy, conf_matrix, class_report = evaluate_model(model, val_loader)
    
    
    
    print("Training and evaluation completed!")

In [1]:
# convert model to light format, optimized for edge user 

import os
import torch
import torch.nn as nn
import torchvision.models as models
import numpy as np
import onnx
import onnx_tf
import tensorflow as tf
from onnx_tf.backend import prepare

def create_model(num_classes=3, use_masks=False):
    model = models.densenet121(weights=None)
    
    if use_masks:
        original_conv = model.features.conv0
        new_conv = nn.Conv2d(
            6, original_conv.out_channels, 
            kernel_size=original_conv.kernel_size,
            stride=original_conv.stride,
            padding=original_conv.padding,
            bias=original_conv.bias is not None
        )
        
        model.features.conv0 = new_conv
    
    num_ftrs = model.classifier.in_features
    model.classifier = nn.Sequential(
        nn.Dropout(0.2),
        nn.Linear(num_ftrs, num_classes)
    )
    
    return model

def convert_pytorch_to_tflite(pth_model_path, output_dir, use_masks=False, num_classes=3):
    """
    Convert PyTorch model (.pth) to TensorFlow Lite (.tflite)
    
    Args:
        pth_model_path (str): Path to the .pth model file
        output_dir (str): Directory to save the output models
        use_masks (bool): Whether the model was trained with masks (6 channels input)
        num_classes (int): Number of output classes
    """
    os.makedirs(output_dir, exist_ok=True)
    
    print("Loading PyTorch model...")
    
    pytorch_model = create_model(num_classes=num_classes, use_masks=use_masks)
    
    pytorch_model.load_state_dict(torch.load(pth_model_path, map_location=torch.device('cpu')))
    pytorch_model.eval()
    
    print("Converting to ONNX format...")
    onnx_path = os.path.join(output_dir, "densenet121_model.onnx")
    
    input_channels = 6 if use_masks else 3
    dummy_input = torch.randn(1, input_channels, 224, 224)
    
    torch.onnx.export(
        pytorch_model,                     
        dummy_input,                       
        onnx_path,                         
        export_params=True,                
        opset_version=11,                  
        do_constant_folding=True,          
        input_names=['input'],            
        output_names=['output'],           
        dynamic_axes={'input': {0: 'batch_size'},    
                     'output': {0: 'batch_size'}}
    )
    
    print("Verifying ONNX model...")
    onnx_model = onnx.load(onnx_path)
    onnx.checker.check_model(onnx_model)
    print("ONNX model is valid!")
    
    print("Converting ONNX to TensorFlow...")
    tf_rep = prepare(onnx_model)
    tf_model_path = os.path.join(output_dir, "densenet121_tf_model")
    tf_rep.export_graph(tf_model_path)
    
    
    print("Converting to TensorFlow Lite...")
    converter = tf.lite.TFLiteConverter.from_saved_model(tf_model_path)
    
    
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.target_spec.supported_types = [tf.float16]  
    
    
    tflite_model = converter.convert()
    
    
    tflite_path = os.path.join(output_dir, "densenet121_model.tflite")
    with open(tflite_path, 'wb') as f:
        f.write(tflite_model)
    
    print(f"Conversion completed! TFLite model saved to {tflite_path}")
    
    # Step 6: Verify the TFLite model (optional)
    print("Verifying TFLite model...")
    interpreter = tf.lite.Interpreter(model_path=tflite_path)
    interpreter.allocate_tensors()
    
    # Get input and output details
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    print("Input details:", input_details)
    print("Output details:", output_details)
    
    # Return paths to the generated models
    return {
        'onnx_path': onnx_path,
        'tf_model_path': tf_model_path,
        'tflite_path': tflite_path
    }

if __name__ == "__main__":
    
    PTH_MODEL_PATH = "model.pth"  
    OUTPUT_DIR = "converted_models"  
    USE_MASKS = False  
    NUM_CLASSES = 3 
    
    
    output_paths = convert_pytorch_to_tflite(
        pth_model_path=PTH_MODEL_PATH,
        output_dir=OUTPUT_DIR,
        use_masks=USE_MASKS,
        num_classes=NUM_CLASSES
    )
    
    print("\nConversion Summary:")
    print(f"Original PyTorch model: {PTH_MODEL_PATH}")
    print(f"ONNX model: {output_paths['onnx_path']}")
    print(f"TensorFlow model: {output_paths['tf_model_path']}")
    print(f"TFLite model: {output_paths['tflite_path']}")

<class 'ModuleNotFoundError'>: No module named 'torch'