# Lung and Colon Cancer Classification
## About Dataset
This dataset contains 25,000 histopathological images with 5 classes. All images are 768 x 768 pixels in size and are in jpeg file format.
The images were generated from an original sample of HIPAA compliant and validated sources, consisting of 750 total images of lung tissue (250 benign lung tissue, 250 lung adenocarcinomas, and 250 lung squamous cell carcinomas) and 500 total images of colon tissue (250 benign colon tissue and 250 colon adenocarcinomas) and augmented to 25,000 using the Augmentor package.
There are five classes in the dataset, each with 5,000 images, being:

* Lung benign tissue
* Lung adenocarcinoma
* Lung squamous cell carcinoma
* Colon adenocarcinoma
* Colon benign tissue


How to Cite this Dataset
If you use in your research, please credit the author of the dataset:

Original Article
Borkowski AA, Bui MM, Thomas LB, Wilson CP, DeLand LA, Mastorides SM. Lung and Colon Cancer Histopathological Image Dataset (LC25000). arXiv:1912.12142v1 [eess.IV], 2019

Relevant Links
https://arxiv.org/abs/1912.12142v1
https://github.com/tampapath/lung_colon_image_set
Dataset BibTeX
@article{,
title= {LC25000 Lung and colon histopathological image dataset},
keywords= {cancer,histopathology},
author= {Andrew A. Borkowski, Marilyn M. Bui, L. Brannon Thomas, Catherine P. Wilson, Lauren A. DeLand, Stephen M. Mastorides},
url= {https://github.com/tampapath/lung_colon_image_set}
}


## Imports

In [None]:
import os
from pathlib import Path
from typing import Any

import hydra
import lightning as pl
import numpy as np
import opendatasets as od
import optuna
import pyrootutils
import torch
import torch.nn as nn
import torch.nn.functional as F
from hydra import compose, initialize
from omegaconf import OmegaConf
from optuna.integration import PyTorchLightningPruningCallback
from torch import Tensor
from torch.utils.data import DataLoader
from torchinfo import summary
from torchmetrics import Accuracy, F1Score
from torchvision.datasets import ImageFolder
from torchvision.transforms import v2

root = pyrootutils.setup_root(
    search_from=os.path.dirname(os.getcwd()),
    indicator=[".git", "pyproject.toml"],
    pythonpath=True,
    dotenv=True,
)

if os.getenv("DATA_ROOT") is None:
    os.environ["DATA_ROOT"] = f"{root}"
torch.set_float32_matmul_precision("medium")

In [3]:
#  Register a resolver for torch dtypes
OmegaConf.register_new_resolver("torch_dtype", lambda name: getattr(torch, name))

## Download datasets

In [4]:
%load_ext autoreload
%autoreload 2

In [5]:
# https://gist.github.com/bdsaglam/586704a98336a0cf0a65a6e7c247d248

with initialize(version_base="1.2", config_path="../configs"):
    cfg = compose(config_name="train")
    print(cfg.paths.train_processed_dir)

datasets/processed/train


In [6]:
DATASET_DIR = Path(root) / cfg.data.dataset_dir

In [7]:
DATASET_DIR.mkdir(exist_ok=True)
if len(list(DATASET_DIR.iterdir())) == 0:
    # Download the dataset
    od.download(dataset_id_or_url=cfg.data.dataset_url, data_dir=str(DATASET_DIR))

## Loading Images

In [None]:
CLASS_NAMES = [
    "colon-adenocarcinoma",
    "colon-benign-tissue",
    "lung-adenocarcinoma",
    "lung-benign-tissue",
    "lung-squamous-cell-carcinoma",
]

{0: 'colon-adenocarcinoma',
 1: 'colon-benign-tissue',
 2: 'lung-adenocarcinoma',
 3: 'lung-benign-tissue',
 4: 'lung-squamous-cell-carcinoma'}

In [None]:
class LungColonDataModule(pl.LightningDataModule):
    def __init__(
        self,
        train_processed_dir: str,
        valid_processed_dir: str,
        test_processed_dir: str,
        augmentations: Any,
        valid_transforms: Any,
        num_workers: int = 8,
        pin_memory: bool = True,
        persistent_workers: bool = True,
        batch_size: int = 32,
        subset_size: float | None = None,
    ):
        super().__init__()
        self.train_data_dir = train_processed_dir
        self.valid_data_dir = valid_processed_dir
        self.test_data_dir = test_processed_dir
        self.augmentations = None
        self.valid_transforms = None
        self.subset_size = subset_size
        if augmentations:
            aug = hydra.utils.instantiate(augmentations)
            self.augmentations = v2.Compose(aug)
        if valid_transforms:
            transforms = hydra.utils.instantiate(valid_transforms)
            self.valid_transforms = v2.Compose(transforms)

        self.kwargs = {
            "batch_size": batch_size,
            "num_workers": num_workers,
            "pin_memory": pin_memory,
            "persistent_workers": persistent_workers,
        }

    def prepare_data(self):
        pass

    def subset_indices(self, dataset, subset_size):
        train_ds_len = len(dataset)
        indices = np.arange(len(dataset))[: int(train_ds_len * self.subset_size)]
        return indices

    def setup(self, stage=None) -> None:
        # Set up the dataset for training and validation
        self.train_dataset = ImageFolder(root=self.train_data_dir, transform=self.augmentations)
        self.val_dataset = ImageFolder(root=self.valid_data_dir, transform=self.valid_transforms)
        self.test_dataset = ImageFolder(root=self.test_data_dir, transform=self.valid_transforms)

        if self.subset_size:
            print(f"Using subset of size {self.subset_size} for training, validation, and testing.")
            # Subset the dataset
            train_indices = self.subset_indices(self.train_dataset, self.subset_size)
            self.train_dataset = torch.utils.data.Subset(self.train_dataset, train_indices)
            val_indices = self.subset_indices(self.val_dataset, self.subset_size)
            self.val_dataset = torch.utils.data.Subset(self.val_dataset, val_indices)
            test_indices = self.subset_indices(self.test_dataset, self.subset_size)
            self.test_dataset = torch.utils.data.Subset(self.test_dataset, test_indices)

    def train_dataloader(self) -> DataLoader:
        return DataLoader(
            self.train_dataset,
            shuffle=True,
            **self.kwargs,
        )

    def val_dataloader(self) -> DataLoader:
        return DataLoader(
            self.val_dataset,
            **self.kwargs,
        )

    def test_dataloader(self) -> DataLoader:
        return DataLoader(
            self.test_dataset,
            **self.kwargs,
        )

In [10]:
data_module = LungColonDataModule(
    train_processed_dir=str(Path(root) / cfg.paths.train_processed_dir),
    valid_processed_dir=str(Path(root) / cfg.paths.valid_processed_dir),
    test_processed_dir=str(Path(root) / cfg.paths.test_processed_dir),
    augmentations=cfg.datamodule.augmentations,
    valid_transforms=cfg.datamodule.valid_transforms,
    num_workers=cfg.datamodule.num_workers,
    pin_memory=cfg.datamodule.pin_memory,
    persistent_workers=cfg.datamodule.persistent_workers,
    batch_size=cfg.datamodule.batch_size,
    subset_size=0.1,
)
data_module.setup()
# for batch in data_module.train_dataloader():
#     x, y = batch
#     print(x.shape, y.shape)
#     break
len(data_module.train_dataset), len(data_module.val_dataset), len(data_module.test_dataset)
# (3600, 900, 500)
# (18000, 4500, 2500)

Using subset of size 0.1 for training, validation, and testing.


(1800, 450, 250)

##  Model Define

In [None]:
class Net(nn.Module):
    def __init__(
        self,
        input_shape: tuple[int, int, int],
        conv_layers: list[int],
        num_classes: int,
        dropout_rate: float,
        num_hidden_layers: int,
    ) -> None:
        super().__init__()
        input_dim = input_shape[0]
        self.output_dims = conv_layers
        layers: list[nn.Module] = []
        # --- Convolutional layers
        for out_dim in conv_layers:
            layers.append(nn.Conv2d(input_dim, out_dim, kernel_size=3, stride=1, padding=1, bias=False))
            layers.append(nn.BatchNorm2d(out_dim))
            layers.append(nn.ReLU())
            layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            layers.append(nn.Dropout(dropout_rate))
            input_dim = out_dim

        self.conv_layers = nn.Sequential(*layers)

        # --- To determine the input size for the linear layer
        self.flattener = nn.Flatten()
        with torch.no_grad():
            dummy_input = torch.randn(1, *input_shape)
            dummy_output = self.conv_layers(dummy_input)
            self.flatten_dim = self.flattener(dummy_output).shape[1]
        print(f"Net.__init__() flatten_dim: {self.flatten_dim}")
        # --- Classification head
        cls_layers = []
        current_fc_input_features = self.flatten_dim
        neuron_per_layer = 32
        for _ in range(num_hidden_layers):
            cls_layers.append(nn.Linear(current_fc_input_features, neuron_per_layer, bias=False))
            cls_layers.append(nn.BatchNorm1d(neuron_per_layer))
            cls_layers.append(nn.ReLU())
            cls_layers.append(nn.Dropout(dropout_rate))

            current_fc_input_features = neuron_per_layer
            neuron_per_layer = neuron_per_layer * 2

        cls_layers.append(nn.Linear(current_fc_input_features, num_classes))

        self.classification_head = nn.Sequential(*cls_layers)
        self.model = nn.Sequential(self.conv_layers, self.flattener, self.classification_head)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.model(x)

In [None]:
input_shape = (1, 3, 320, 320)

net = Net(
    input_shape=input_shape[1:],
    conv_layers=[32],
    dropout_rate=0.5,
    num_classes=len(CLASS_NAMES),
    num_hidden_layers=4,
)
summary(net, input_shape, device="cpu")

Net.__init__() flatten_dim: 819200


Layer (type:depth-idx)                   Output Shape              Param #
Net                                      [1, 5]                    --
├─Sequential: 1-1                        [1, 5]                    --
│    └─Sequential: 2-1                   [1, 32, 160, 160]         --
│    │    └─Conv2d: 3-1                  [1, 32, 320, 320]         864
│    │    └─BatchNorm2d: 3-2             [1, 32, 320, 320]         64
│    │    └─ReLU: 3-3                    [1, 32, 320, 320]         --
│    │    └─MaxPool2d: 3-4               [1, 32, 160, 160]         --
│    │    └─Dropout: 3-5                 [1, 32, 160, 160]         --
│    └─Flatten: 2-2                      [1, 819200]               --
│    └─Sequential: 2-3                   [1, 5]                    --
│    │    └─Linear: 3-6                  [1, 32]                   26,214,400
│    │    └─BatchNorm1d: 3-7             [1, 32]                   64
│    │    └─ReLU: 3-8                    [1, 32]                   --
│    │

In [None]:
class LungColonClassifier(pl.LightningModule):
    def __init__(
        self,
        model: Net,
        optimizer: torch.optim.Optimizer | None = None,
        # scheduler: torch.optim.lr_scheduler.LRScheduler,
    ):
        super().__init__()
        self.model = model
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
        # self.scheduler = scheduler
        self.accuracy = Accuracy(task="multiclass", num_classes=len(CLASS_NAMES))
        self.f1_score = F1Score(task="multiclass", num_classes=len(CLASS_NAMES))

    def forward(self, x) -> torch.Tensor:
        return self.model(x)

    def _common_step(self, batch, batch_idx) -> tuple[torch.Tensor, float, torch.Tensor]:
        x, y = batch
        y_hat = self.forward(x)
        loss: Tensor = F.cross_entropy(y_hat, y)
        score = self.accuracy(y_hat, y)
        return loss, score, y_hat

    def training_step(self, batch, batch_idx) -> torch.Tensor:
        loss, score, y_hat = self._common_step(batch, batch_idx)
        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc", score, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx) -> torch.Tensor:
        loss, score, y_hat = self._common_step(batch, batch_idx)
        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", score, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx) -> torch.Tensor:
        loss, score, y_hat = self._common_step(batch, batch_idx)
        self.log("test_loss", loss, prog_bar=True)
        self.log("test_acc", score, prog_bar=True)
        return loss

    def configure_optimizers(self):
        return self.optimizer  # (self.model.parameters()), self.scheduler

    # return [self.optimizer], [self.scheduler(self.optimizer)]


def objective(trial: optuna.trial.Trial) -> float:
    # Define the hyperparameters to optimize
    total_conv_layers = trial.suggest_int("conv_layers", 1, 6)
    total_cls_hidden_layers = trial.suggest_int("hidden_layers", 1, 6)
    conv_channels = [x * 32 for x in range(1, total_conv_layers)]
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)

    # Create the model
    net = Net(
        input_shape=input_shape[1:],
        conv_layers=conv_channels,
        dropout_rate=dropout_rate,
        num_classes=len(CLASS_NAMES),
        num_hidden_layers=total_cls_hidden_layers,
    )
    # # Create the optimizer
    # optimizer = torch.optim.AdamW

    # Create the scheduler
    # scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

    model = LungColonClassifier(model=net)
    datamododule = LungColonDataModule(
        train_processed_dir=str(Path(root) / cfg.paths.train_processed_dir),
        valid_processed_dir=str(Path(root) / cfg.paths.valid_processed_dir),
        test_processed_dir=str(Path(root) / cfg.paths.test_processed_dir),
        augmentations=cfg.datamodule.augmentations,
        valid_transforms=cfg.datamodule.valid_transforms,
        num_workers=cfg.datamodule.num_workers,
        pin_memory=cfg.datamodule.pin_memory,
        persistent_workers=cfg.datamodule.persistent_workers,
        batch_size=cfg.datamodule.batch_size,
        subset_size=0.1,
    )

    # Train the model
    # callbacks = [PyTorchLightningPruningCallback(trial, monitor="val_acc")]
    trainer = pl.Trainer(logger=True, accelerator="gpu", devices=1, max_epochs=10, enable_progress_bar=True, precision=32, log_every_n_steps=1)
    hyperparameters = {"conv_layers": conv_channels, "hidden_layers": total_cls_hidden_layers, "dropout_rate": dropout_rate}
    trainer.logger.log_hyperparams(hyperparameters)
    trainer.fit(model, datamodule=datamododule)

    # Evaluate the model on the validation set
    # trainer.validate(model, datamodule=datamododule)

    return trainer.callback_metrics["val_acc"].item()


pruner = optuna.pruners.MedianPruner()
study = optuna.create_study(direction="maximize", pruner=pruner)
study.optimize(objective, n_trials=100, timeout=600)

[32m[I 2025-05-08 14:17:29,516][0m A new study created in memory with name: no-name-f6b3739f-a1ed-4d5e-b5e0-3a7583f4c532[0m
Using default `ModelCheckpoint`. Consider installing `litmodels` package to enable `LitModelCheckpoint` for automatic upload to the Lightning model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
/home/ultron/AI/practice-projects/CV/lung-and-colon-cancer-classification-pytorch/.venv/lib/python3.12/site-packages/lightning/pytorch/trainer/connectors/logger_connector/logger_connector.py:76: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `lightning.pytorch` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by defaul

Net.__init__() flatten_dim: 307200
Using subset of size 0.1 for training, validation, and testing.



  | Name     | Type               | Params | Mode 
--------------------------------------------------------
0 | model    | Net                | 9.8 M  | train
1 | accuracy | MulticlassAccuracy | 0      | train
2 | f1_score | MulticlassF1Score  | 0      | train
--------------------------------------------------------
9.8 M     Trainable params
0         Non-trainable params
9.8 M     Total params
39.323    Total estimated model params size (MB)
12        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]


Detected KeyboardInterrupt, attempting graceful shutdown ...
[33m[W 2025-05-08 14:17:43,338][0m Trial 0 failed because of the following error: NameError("name 'exit' is not defined")[0m
Traceback (most recent call last):
  File "/home/ultron/AI/practice-projects/CV/lung-and-colon-cancer-classification-pytorch/.venv/lib/python3.12/site-packages/lightning/pytorch/trainer/call.py", line 48, in _call_and_handle_interrupt
    return trainer_fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ultron/AI/practice-projects/CV/lung-and-colon-cancer-classification-pytorch/.venv/lib/python3.12/site-packages/lightning/pytorch/trainer/trainer.py", line 599, in _fit_impl
    self._run(model, ckpt_path=ckpt_path)
  File "/home/ultron/AI/practice-projects/CV/lung-and-colon-cancer-classification-pytorch/.venv/lib/python3.12/site-packages/lightning/pytorch/trainer/trainer.py", line 1012, in _run
    results = self._run_stage()
              ^^^^^^^^^^^^^^^^^
  File "/home/ultron/

NameError: name 'exit' is not defined

In [None]:
pruner = optuna.pruners.MedianPruner()
study = optuna.create_study(direction="maximize", pruner=pruner)
study.optimize(objective, n_trials=5, timeout=600)

In [None]:
print("Number of finished trials: {}".format(len(study.trials)))

print("Best trial:")
trial = study.best_trial

print("  Value: {}".format(trial.value))

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))