
# Lung X‑ray Classification with VGG16 + InceptionV3 Features and Random Forest
This notebook builds a **feature-extraction pipeline** using two CNN backbones (VGG16 and InceptionV3) pretrained on ImageNet, then trains a **Random Forest** on the concatenated deep features to classify chest X‑rays.

**Dataset assumption:** images are arranged in a folder with one subfolder per class, e.g.
```
dataset_root/
  Normal/
  Viral Pneumonia/
  Bacterial Pneumonia/
  COVID-19/
  Tuberculosis/
```
> Update `DATA_DIR` below to your dataset path on Kaggle/Colab/Local.


## 1) Setup

In [None]:

import os, random, math, json, itertools, pathlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras.applications import VGG16, InceptionV3
from tensorflow.keras.applications.vgg16 import preprocess_input as vgg_preprocess
from tensorflow.keras.applications.inception_v3 import preprocess_input as inc_preprocess

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.utils.class_weight import compute_class_weight

import joblib

print(tf.__version__)
tf.get_logger().setLevel("ERROR")

# Determinism (best-effort)
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)


## 2) Configuration

In [None]:

# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# CHANGE THIS to your dataset root (folder that contains class subfolders)
DATA_DIR = "/kaggle/input/lungs-disease-dataset-4-types/Lung Disease Dataset 4 types"  # update path if needed
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# Model/image params
BATCH_SIZE = 32
IMG_SIZE_VGG = (224, 224)
IMG_SIZE_INC = (299, 299)

# Train/val/test split
TEST_SIZE = 0.15
VAL_SIZE  = 0.15  # of the remaining after test split (stratified)

# RandomForest params (tune as needed)
RF_PARAMS = dict(
    n_estimators=600,
    max_depth=None,
    min_samples_split=2,
    min_samples_leaf=1,
    n_jobs=-1,
    random_state=SEED,
    class_weight=None # we'll pass sample_weight from class weights instead
)

USE_PCA = False   # switch to True to enable PCA
PCA_VARIANCE = 0.98  # keep 98% variance if PCA is used

OUTPUT_DIR = "./artifacts_rf_vgg16_incv3"
os.makedirs(OUTPUT_DIR, exist_ok=True)


## 3) Load file paths & labels

In [None]:

def list_images_and_labels(root_dir):
    classes = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
    filepaths, labels = [], []
    for idx, cls in enumerate(classes):
        cls_dir = os.path.join(root_dir, cls)
        for fname in os.listdir(cls_dir):
            fp = os.path.join(cls_dir, fname)
            if os.path.isfile(fp) and fname.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
                filepaths.append(fp)
                labels.append(cls)  # keep label as string
    return filepaths, labels, classes

filepaths, labels, classes = list_images_and_labels(DATA_DIR)
print(f"Found {len(filepaths)} images across {len(classes)} classes: {classes}")


## 4) Stratified Train/Val/Test split

In [None]:

# First split off test
X_trainval, X_test, y_trainval, y_test = train_test_split(
    filepaths, labels, test_size=TEST_SIZE, random_state=SEED, stratify=labels)

# Then split train/val from trainval
val_ratio = VAL_SIZE / (1.0 - TEST_SIZE)
X_train, X_val, y_train, y_val = train_test_split(
    X_trainval, y_trainval, test_size=val_ratio, random_state=SEED, stratify=y_trainval)

def show_split_counts(y_tr, y_va, y_te):
    import collections
    for name, y in [('train', y_tr), ('val', y_va), ('test', y_te)]:
        c = collections.Counter(y)
        print(name, dict(c))
show_split_counts(y_train, y_val, y_test)


## 5) Create tf.data pipelines (no labels shuffling leakage)

In [None]:

AUTO = tf.data.AUTOTUNE

class_to_index = {c:i for i,c in enumerate(classes)}
y_train_idx = np.array([class_to_index[y] for y in y_train])
y_val_idx   = np.array([class_to_index[y] for y in y_val])
y_test_idx  = np.array([class_to_index[y] for y in y_test])

def decode_and_resize(path, img_size, preprocess_fn):
    img = tf.io.read_file(path)
    img = tf.image.decode_image(img, channels=3, expand_animations=False)
    img = tf.image.resize(img, img_size, method=tf.image.ResizeMethod.BILINEAR)
    img = tf.cast(img, tf.float32)
    img = preprocess_fn(img)
    return img

def make_ds(paths, labels_idx, img_size, preprocess_fn, batch_size=BATCH_SIZE, shuffle=False):
    ds_paths = tf.data.Dataset.from_tensor_slices(paths)
    ds_labels = tf.data.Dataset.from_tensor_slices(labels_idx)
    ds = tf.data.Dataset.zip((ds_paths, ds_labels))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(paths), seed=SEED, reshuffle_each_iteration=True)
    ds = ds.map(lambda p, y: (decode_and_resize(p, img_size, preprocess_fn), y), num_parallel_calls=AUTO)
    ds = ds.batch(batch_size).prefetch(AUTO)
    return ds

train_ds_vgg = make_ds(X_train, y_train_idx, IMG_SIZE_VGG, vgg_preprocess, shuffle=True)
val_ds_vgg   = make_ds(X_val,   y_val_idx,   IMG_SIZE_VGG, vgg_preprocess)
test_ds_vgg  = make_ds(X_test,  y_test_idx,  IMG_SIZE_VGG, vgg_preprocess)

train_ds_inc = make_ds(X_train, y_train_idx, IMG_SIZE_INC, inc_preprocess, shuffle=True)
val_ds_inc   = make_ds(X_val,   y_val_idx,   IMG_SIZE_INC, inc_preprocess)
test_ds_inc  = make_ds(X_test,  y_test_idx,  IMG_SIZE_INC, inc_preprocess)


## 6) Load VGG16 and InceptionV3 (feature extractors)

In [None]:

# VGG16 (no top) + global pooling
vgg = VGG16(weights="imagenet", include_top=False, input_shape=IMG_SIZE_VGG + (3,))
vgg_out = tf.keras.Sequential([vgg, tf.keras.layers.GlobalAveragePooling2D()], name="VGG16_GAP")

# InceptionV3 (no top) + global pooling
inc = InceptionV3(weights="imagenet", include_top=False, input_shape=IMG_SIZE_INC + (3,))
inc_out = tf.keras.Sequential([inc, tf.keras.layers.GlobalAveragePooling2D()], name="InceptionV3_GAP")

# Freeze (we only use them for inference/feature extraction)
vgg.trainable = False
inc.trainable = False

# Inspect output feature dims
tmp_v = next(iter(train_ds_vgg.take(1)))[0]
tmp_i = next(iter(train_ds_inc.take(1)))[0]
print("VGG16 feature dim:", vgg_out(tmp_v).shape[-1])
print("InceptionV3 feature dim:", inc_out(tmp_i).shape[-1])


## 7) Extract deep features

In [None]:

def extract_features(model, ds):
    feats = []
    ys    = []
    for batch_x, batch_y in ds:
        f = model(batch_x, training=False).numpy()
        feats.append(f); ys.append(batch_y.numpy())
    return np.vstack(feats), np.concatenate(ys)

Xv_train, y_train_np = extract_features(vgg_out, train_ds_vgg)
Xv_val,   y_val_np   = extract_features(vgg_out, val_ds_vgg)
Xv_test,  y_test_np  = extract_features(vgg_out, test_ds_vgg)

Xi_train, _ = extract_features(inc_out, train_ds_inc)
Xi_val,   _ = extract_features(inc_out, val_ds_inc)
Xi_test,  _ = extract_features(inc_out, test_ds_inc)

# Concatenate features
X_train_feats = np.hstack([Xv_train, Xi_train])
X_val_feats   = np.hstack([Xv_val,   Xi_val])
X_test_feats  = np.hstack([Xv_test,  Xi_test])

print("Train features shape:", X_train_feats.shape)
print("Val   features shape:", X_val_feats.shape)
print("Test  features shape:", X_test_feats.shape)


## 8) Train Random Forest on concatenated features

In [None]:

# Compute class weights for imbalanced data (used as sample_weight)
unique_classes = np.unique(y_train_np)
cls_weights = compute_class_weight(class_weight='balanced', classes=unique_classes, y=y_train_np)
cls_w_dict = {cls: w for cls, w in zip(unique_classes, cls_weights)}
sample_weight_train = np.array([cls_w_dict[y] for y in y_train_np])

if USE_PCA:
    # Standardize -> PCA -> RF
    steps = [
        ("scaler", StandardScaler(with_mean=True, with_std=True)),
        ("pca", PCA(n_components=PCA_VARIANCE, svd_solver="full", random_state=SEED)),
        ("rf", RandomForestClassifier(**RF_PARAMS))
    ]
else:
    # Optional standardization before RF (helps some datasets; RF doesn't always need it)
    steps = [
        ("scaler", StandardScaler(with_mean=True, with_std=True)),
        ("rf", RandomForestClassifier(**RF_PARAMS))
    ]

pipeline = Pipeline(steps)
pipeline.fit(X_train_feats, y_train_np, rf__sample_weight=sample_weight_train)

# Evaluate on val
y_val_pred = pipeline.predict(X_val_feats)
val_acc = accuracy_score(y_val_np, y_val_pred)
print(f"Validation accuracy: {val_acc:.4f}")
print(classification_report(y_val_np, y_val_pred, target_names=classes))


## 9) Final evaluation on test set

In [None]:

y_test_pred = pipeline.predict(X_test_feats)
test_acc = accuracy_score(y_test_np, y_test_pred)
print(f"Test accuracy: {test_acc:.4f}\n")
print(classification_report(y_test_np, y_test_pred, target_names=classes))

cm = confusion_matrix(y_test_np, y_test_pred)
fig, ax = plt.subplots(figsize=(6,6))
im = ax.imshow(cm, interpolation='nearest')
ax.figure.colorbar(im, ax=ax)
ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]),
       xticklabels=classes, yticklabels=classes,
       ylabel='True label', xlabel='Predicted label', title='Confusion Matrix')
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        ax.text(j, i, format(cm[i, j], 'd'), ha="center", va="center")
plt.tight_layout()
plt.show()


## 10) Save artifacts (model + label mapping)

In [None]:

pipeline_path = os.path.join(OUTPUT_DIR, "rf_vgg16_incv3_pipeline.joblib")
labels_json   = os.path.join(OUTPUT_DIR, "classes.json")
joblib.dump(pipeline, pipeline_path)
with open(labels_json, "w") as f:
    json.dump({"classes": classes}, f, ensure_ascii=False, indent=2)
print("Saved:", pipeline_path)
print("Saved:", labels_json)


## 11) Inference helper (single image)

In [None]:

def load_pipeline_and_predict(img_path):
    # Load artifacts
    pipe = joblib.load(os.path.join(OUTPUT_DIR, "rf_vgg16_incv3_pipeline.joblib"))
    with open(os.path.join(OUTPUT_DIR, "classes.json"), "r") as f:
        classes_local = json.load(f)["classes"]

    # Prepare both inputs
    def _prep(img_path, size, pre_fn):
        img = tf.io.read_file(img_path)
        img = tf.image.decode_image(img, channels=3, expand_animations=False)
        img = tf.image.resize(img, size, method=tf.image.ResizeMethod.BILINEAR)
        img = tf.cast(img, tf.float32)
        img = pre_fn(img)
        return img[None, ...]

    v = _prep(img_path, IMG_SIZE_VGG, vgg_preprocess)
    i = _prep(img_path, IMG_SIZE_INC, inc_preprocess)

    # Feature extract
    fv = vgg_out(v, training=False).numpy()
    fi = inc_out(i, training=False).numpy()
    feats = np.hstack([fv, fi])

    # Predict
    pred_idx = pipe.predict(feats)[0]
    proba = None
    if hasattr(pipe.named_steps["rf"], "predict_proba"):
        proba = pipe.named_steps["rf"].predict_proba(feats)[0]
    return classes_local[pred_idx], proba

# Example (uncomment and set your image path)
# pred, proba = load_pipeline_and_predict("/kaggle/input/.../example.png")
# print(pred, proba)



## 12) Tips to improve accuracy
- **Tune RF hyperparameters**: `n_estimators` (e.g., 800–1200), `max_depth`, `min_samples_split`, `min_samples_leaf`.
- **Enable PCA** if feature dimensionality is large and noisy. Try keeping 95–99% variance.
- **Balanced sampling**: The notebook uses `sample_weight` via computed class weights.
- **Data cleaning**: Ensure corrupted or mislabeled images are removed.
- **Backbone variations**: Try EfficientNetB0/B3 or DenseNet201 as additional feature sources and concatenate.
- **More data**: Use augmented variants or external datasets if allowed by your project rules.
