In [1]:
!pip install numpy pandas scikit-image opencv-python scikit-learn lightgbm torch torchvision pywavelets matplotlib tqdm shap

import os
import cv2
import numpy as np
import pandas as pd
from skimage.feature import graycomatrix, graycoprops, local_binary_pattern, hog
import pywt
from scipy.stats import skew, kurtosis
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import roc_auc_score, roc_curve, accuracy_score, f1_score, precision_score, recall_score, cohen_kappa_score
import lightgbm as lgb
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from tqdm.auto import tqdm
import joblib
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')



In [2]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"prathamramesh2004","key":"2f3aa36f999366aa8e0bcd128128a106"}'}

In [3]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [4]:
!pip install -q kaggle

!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

!unzip -q chest-xray-pneumonia.zip -d ./data

DATA_DIR = "./data/chest_xray"


Dataset URL: https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia
License(s): other
Downloading chest-xray-pneumonia.zip to /content
 98% 2.26G/2.29G [00:18<00:01, 32.8MB/s]
100% 2.29G/2.29G [00:18<00:00, 135MB/s] 


In [5]:
DATA_DIR = "./data/chest_xray"
IMG_SIZE = 256
BATCH_SIZE = 32
NUM_WORKERS = 4
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

In [6]:
def read_img(path, size=IMG_SIZE):
  im = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
  if im is None:
    raise FileNotFoundError(path)

  h, w = im.shape
  if h < w:
    new_h = size
    new_w = int(w * size / h)
  else:
    new_w = size
    new_h = int(h * size / w)
  im = cv2.resize(im, (new_w, new_h), interpolation=cv2.INTER_AREA)
  startx = (new_w - size) // 2
  starty = (new_h - size) // 2
  im = im[starty:starty+size, startx:startx+size]
  return im

def clahe_eq(im):
  clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
  return clahe.apply(im)

In [7]:
def intensity_stats(im):
  vals = im.ravel().astype(np.float32)
  hist = np.bincount(vals.astype(np.int32), minlength=256)
  probs = hist / hist.sum()
  entropy = -np.sum([p * np.log2(p) for p in probs if p > 0])
  features = {
    "mean": float(vals.mean()),
    "median": float(np.median(vals)),
    "std": float(vals.std()),
    "skew": float(skew(vals)),
    "kurt": float(kurtosis(vals)),
    "p5": float(np.percentile(vals, 5)),
    "p25": float(np.percentile(vals, 25)),
    "p75": float(np.percentile(vals, 75)),
    "p95": float(np.percentile(vals, 95)),
    "entropy": float(entropy),
    "dark_ratio": float((vals < 30).sum() / vals.size)
  }
  return features

In [8]:
def glcm_features(im, distances=[1,2], angles=[0, np.pi/4], levels=256):
  imq = im.astype(np.uint8)
  props = {}
  glcm = graycomatrix(imq, distances=distances, angles=angles, levels=levels, symmetric=True, normed=True)
  for p in ['contrast','dissimilarity','homogeneity','energy','correlation','ASM']:
    arr = graycoprops(glcm, p)
    props[f'glcm_{p}_mean'] = float(arr.mean())
    props[f'glcm_{p}_std'] = float(arr.std())
  return props

In [9]:
def lbp_hist(im, P=8, R=1, n_bins=10):
  lbp = local_binary_pattern(im, P, R, method='uniform')
  max_val = lbp.max()
  hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, max_val+1), density=True)
  return {f'lbp_{i}': float(hist[i]) for i in range(len(hist))}

In [10]:
def hog_features(im, pixels_per_cell=(16,16), cells_per_block=(2,2), orientations=9):
  fd = hog(im, orientations=orientations, pixels_per_cell=pixels_per_cell,
  cells_per_block=cells_per_block, block_norm='L2-Hys', feature_vector=True)
  N = 64
  res = {}
  res['hog_mean'] = float(fd.mean())
  res['hog_std'] = float(fd.std())
  for i, v in enumerate(fd[:N]):
    res[f'hog_{i}'] = float(v)
  return res

In [11]:
def wavelet_feats(im, wavelet='db1', level=2):
  coeffs = pywt.wavedec2(im.astype(np.float32), wavelet=wavelet, level=level)
  energies = {}
  for i, c in enumerate(coeffs[1:], start=1):
    for j, band in enumerate(c):
      energies[f'wl_l{ i }_b{ j }_energy'] = float(np.sum(np.square(band)))
      energies[f'wl_l{ i }_b{ j }_mean'] = float(np.mean(band))
  return energies

In [12]:
class TinyCNN(nn.Module):
    def __init__(self, out_dim=128):
      super().__init__()
      self.features = nn.Sequential(
        nn.Conv2d(1, 16, kernel_size=3, padding=1),
        nn.BatchNorm2d(16),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(2),


        nn.Conv2d(16, 32, kernel_size=3, padding=1),
        nn.BatchNorm2d(32),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(2),


        nn.Conv2d(32, 64, kernel_size=3, padding=1),
        nn.BatchNorm2d(64),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(2),


        nn.Conv2d(64, 128, kernel_size=3, padding=1),
        nn.BatchNorm2d(128),
        nn.ReLU(inplace=True),
        nn.AdaptiveAvgPool2d(1)
        )
      self.fc = nn.Linear(128, out_dim)


    def forward(self, x):
      x = self.features(x)
      x = x.view(x.size(0), -1)
      x = self.fc(x)
      return x

In [13]:
class CXRFolderDataset(Dataset):
  def __init__(self, filepaths, labels, transform=None):
    self.paths = filepaths
    self.labels = labels
    self.transform = transform


  def __len__(self):
    return len(self.paths)


  def __getitem__(self, idx):
    p = self.paths[idx]
    im = read_img(p, size=IMG_SIZE)
    im = clahe_eq(im)
    im = im.astype(np.float32) / 255.0
    if self.transform:
      im = self.transform(im)
    else:
      im = torch.from_numpy(im).unsqueeze(0)
    y = int(self.labels[idx])
    return im, y

In [14]:
def build_file_list(data_dir):
  splits = ['train', 'val', 'test']
  records = []
  for sp in splits:
    spath = os.path.join(data_dir, sp)
    if not os.path.exists(spath):
      continue
    for cls in ['NORMAL', 'PNEUMONIA']:
      cpath = os.path.join(spath, cls)
      if not os.path.exists(cpath):
        continue
      for fn in os.listdir(cpath):
        if fn.lower().endswith(('.png', '.jpg', '.jpeg')):
          records.append({'split': sp, 'path': os.path.join(cpath, fn), 'label': 0 if cls=='NORMAL' else 1})
  df = pd.DataFrame(records)
  return df

In [15]:
def extract_features_for_paths(paths, model=None, pca=None, cnn_out_dim=64):
  rows = []
  for p in tqdm(paths, desc='extracting'):
    try:
      im = read_img(p, size=IMG_SIZE)
      imc = clahe_eq(im)
      feats = {}
      feats.update(intensity_stats(imc))
      feats.update(glcm_features(imc))
      feats.update(lbp_hist(imc))
      feats.update(hog_features(imc))
      feats.update(wavelet_feats(imc))

      if model is not None:
        inp = torch.from_numpy(imc.astype(np.float32) / 255.0).unsqueeze(0).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
          vec = model(inp).cpu().numpy().ravel()
        if pca is not None:
          vec = pca.transform(vec.reshape(1, -1)).ravel()
        for i, v in enumerate(vec):
          feats[f'cnn_{i}'] = float(v)
      else:
        for i in range(cnn_out_dim):
          feats[f'cnn_{i}'] = 0.0
      rows.append(feats)
    except FileNotFoundError:
      print('missing', p)
      continue
  df = pd.DataFrame(rows)
  return df

In [16]:
df_files = build_file_list(DATA_DIR)
print('found samples:', len(df_files))
print(df_files['split'].value_counts())

found samples: 5856
split
train    5216
test      624
val        16
Name: count, dtype: int64


In [17]:
df_files['patient_id'] = df_files['path'].apply(lambda x: os.path.basename(x).split('_')[0])

from sklearn.model_selection import GroupShuffleSplit


gss = GroupShuffleSplit(n_splits=1, train_size=0.7, random_state=RANDOM_SEED)
train_idx, temp_idx = next(gss.split(df_files, groups=df_files['patient_id']))
train_df = df_files.iloc[train_idx]
temp_df = df_files.iloc[temp_idx]

gss2 = GroupShuffleSplit(n_splits=1, train_size=0.5, random_state=RANDOM_SEED)
val_idx, test_idx = next(gss2.split(temp_df, groups=temp_df['patient_id']))
val_df = temp_df.iloc[val_idx]
test_df = temp_df.iloc[test_idx]

train_paths = train_df['path'].tolist()
train_labels = train_df['label'].tolist()
val_paths = val_df['path'].tolist()
val_labels = val_df['label'].tolist()
test_paths = test_df['path'].tolist()
test_labels = test_df['label'].tolist()


In [18]:
train_transform = transforms.Compose([
transforms.ToTensor(),
])

In [19]:
cnn_train_dataset = CXRFolderDataset(train_paths, train_labels, transform=None)
cnn_train_loader = DataLoader(cnn_train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

In [20]:
cnn_out_dim = 64
model = TinyCNN(out_dim=cnn_out_dim).to(DEVICE)

In [21]:
def train_tiny_cnn(model, dataloader, epochs=5, lr=1e-3):
  opt = torch.optim.Adam(model.parameters(), lr=lr)
  criterion = nn.CrossEntropyLoss()
  model.train()
  for epoch in range(epochs):
    pbar = tqdm(dataloader, desc=f'epoch {epoch+1}/{epochs}')
    running_loss = 0.0
    correct = 0
    total = 0
    for xb, yb in pbar:
      xb = xb.to(DEVICE)
      yb = yb.to(DEVICE)
      logits = model(xb)
      loss = criterion(logits, yb)
      opt.zero_grad()
      loss.backward()
      opt.step()
      running_loss += loss.item() * xb.size(0)
      preds = logits.argmax(dim=1)
      correct += (preds==yb).sum().item()
      total += xb.size(0)
      pbar.set_postfix({'loss': running_loss/total, 'acc': correct/total})
  return model

In [22]:
joblib.dump({'cnn_out_dim': cnn_out_dim}, 'model_meta.joblib')

['model_meta.joblib']

In [23]:
all_paths = df_files['path'].tolist()

print('extracting CNN bottleneck vectors for PCA fit (sample of training images)')
sample_paths = train_paths[:min(500, len(train_paths))]
cnn_vecs = []

for p in tqdm(sample_paths):
  im = read_img(p, size=IMG_SIZE)
  imc = clahe_eq(im)
  inp = torch.from_numpy(imc.astype(np.float32) / 255.0).unsqueeze(0).unsqueeze(0).to(DEVICE)
  with torch.no_grad():
    vec = model(inp).cpu().numpy().ravel()
  cnn_vecs.append(vec)
cnn_vecs = np.stack(cnn_vecs, axis=0)
print('cnn vecs shape:', cnn_vecs.shape)

pca = PCA(n_components=min(16, cnn_out_dim), random_state=RANDOM_SEED)
pca.fit(cnn_vecs)
joblib.dump(pca, 'cnn_pca.joblib')


print('extracting full feature table (this may take a while)')
feature_df = extract_features_for_paths(all_paths, model=model, pca=pca, cnn_out_dim=cnn_out_dim)
feature_df['path'] = all_paths
merged = feature_df.merge(df_files, on='path')
print('feature table shape:', merged.shape)
merged.to_pickle('feature_table_raw.pkl')

extracting CNN bottleneck vectors for PCA fit (sample of training images)


  0%|          | 0/500 [00:00<?, ?it/s]

cnn vecs shape: (500, 64)
extracting full feature table (this may take a while)


extracting:   0%|          | 0/5856 [00:00<?, ?it/s]

feature table shape: (5856, 131)


In [None]:
feature_cols = train_df.select_dtypes(include=['float64','int64']).columns.tolist()
feature_cols = [c for c in feature_cols if c != 'label']


X_train = train_df[feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0).values
y_train = train_df['label'].values

X_val = val_df[feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0).values
y_val = val_df['label'].values

X_test = test_df[feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0).values
y_test = test_df['label'].values



In [None]:
feature_cols = [c for c in merged.columns if c not in ['split','path','label', 'patient_id']]


X_train = merged[merged['split'] == 'train'][feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0).values
y_train = merged[merged['split'] == 'train']['label'].values

X_val = merged[merged['split'] == 'val'][feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0).values
y_val = merged[merged['split'] == 'val']['label'].values

X_test = merged[merged['split'] == 'test'][feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0).values
y_test = merged[merged['split'] == 'test']['label'].values


scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
joblib.dump(scaler, 'feature_scaler.joblib')


from sklearn.feature_selection import VarianceThreshold
sel = VarianceThreshold(threshold=1e-5)
X_train_sel = sel.fit_transform(X_train_scaled)
X_val_sel = sel.transform(X_val_scaled)
X_test_sel = sel.transform(X_test_scaled)
joblib.dump(sel, 'variance_selector.joblib')
print('selected feature dim:', X_train_sel.shape[1])

selected feature dim: 127


In [None]:
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)
params = {
  'objective': 'binary',
  'metric': 'auc',
  'verbosity': -1,
  'boosting_type': 'gbdt',
  'learning_rate': 0.05,
  'num_leaves': 31,
  'seed': RANDOM_SEED
}


aucs = []
fold = 0
models = []
for train_idx, val_idx in skf.split(X_train_sel, y_train):
  fold += 1
  X_tr, X_val = X_train_sel[train_idx], X_train_sel[val_idx]
  y_tr, y_val = y_train[train_idx], y_train[val_idx]
  dtrain = lgb.Dataset(X_tr, label=y_tr)
  dval = lgb.Dataset(X_val, label=y_val)
  model_lgb = lgb.train(params, dtrain, num_boost_round=1000, valid_sets=[dval],
                        callbacks=[lgb.early_stopping(50, verbose=50)])
  preds = model_lgb.predict(X_val)
  auc = roc_auc_score(y_val, preds)
  print(f'Fold {fold} AUC: {auc:.4f}')
  aucs.append(auc)
  models.append(model_lgb)

print('Mean AUC:', np.mean(aucs), 'Std:', np.std(aucs))
joblib.dump(models, 'lgb_models.pkl')

Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[393]	valid_0's auc: 0.983499
Fold 1 AUC: 0.9835
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[365]	valid_0's auc: 0.986986
Fold 2 AUC: 0.9870
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[499]	valid_0's auc: 0.992768
Fold 3 AUC: 0.9928
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[322]	valid_0's auc: 0.983722
Fold 4 AUC: 0.9837
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[409]	valid_0's auc: 0.986663
Fold 5 AUC: 0.9867
Mean AUC: 0.9867277689976784 Std: 0.003347242044006433


['lgb_models.pkl']

In [None]:
if 'test' in merged['split'].unique():
    test_df = merged[merged['split']=='test'].copy()
    X_test = test_df.drop(columns=['split','path','label', 'patient_id'])
    X_test = X_test.replace([np.inf, -np.inf], np.nan).fillna(0.0)
    X_test = scaler.transform(X_test)
    X_test = sel.transform(X_test)
    y_test = test_df['label'].values

    preds = np.mean([m.predict(X_test) for m in models], axis=0)
    thresholds = np.linspace(0.1, 0.9, 81)
    best_kappa = -1
    best_thresh = 0.5

    for t in thresholds:
        y_temp = (preds >= t).astype(int)
        kappa = cohen_kappa_score(y_test, y_temp)
        if kappa > best_kappa:
            best_kappa = kappa
            best_thresh = t

    print(f"Best threshold for Cohen's Kappa: {best_thresh:.3f}, Score: {best_kappa:.4f}")
    y_pred = (preds >= best_thresh).astype(int)

    auc = roc_auc_score(y_test, preds)
    f1 = f1_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred)
    rec = recall_score(y_test, y_pred)
    cks = cohen_kappa_score(y_test, y_pred)

    print('Test AUC:', auc)
    print('Test F1:', f1, 'Prec:', prec, 'Rec:', rec, 'Cohen Kappa:', cks)


Best threshold for Cohen's Kappa: 0.900, Score: 0.5375
Test AUC: 0.8990247644093798
Test F1: 0.856156501726122 Prec: 0.7766179540709812 Rec: 0.9538461538461539 Cohen Kappa: 0.5374653098982424


In [None]:
merged.to_csv('feature_table_with_meta.csv', index=False)

In [None]:
try:
    import shap
    explainer = shap.TreeExplainer(models[0])
    shap_vals = explainer.shap_values(X_test_sel[:100])
    shap.summary_plot(shap_vals, features=merged[feature_cols].columns, show=False)
    plt.savefig('shap_summary.png', bbox_inches='tight')
    print('Saved shap_summary.png')
except Exception as e:
    print('SHAP unavailable or failed:', e)

In [None]:
def infer_image(path, models=models, scaler=scaler, selector=sel, cnn_model=model, pca=pca):
    feats = extract_features_for_paths([path], model=cnn_model, pca=pca, cnn_out_dim=cnn_out_dim)
    Xn = feats.replace([np.inf, -np.inf], np.nan).fillna(0.0)
    Xn = scaler.transform(Xn)
    Xn = selector.transform(Xn)
    preds = np.mean([m.predict(Xn) for m in models], axis=0)
    return float(preds[0])

In [None]:
from sklearn.metrics import f1_score
import numpy as np


thresholds = np.arange(0.1, 0.9, 0.01)
f1_scores = [f1_score(y_test, preds >= t) for t in thresholds]

best_threshold = thresholds[np.argmax(f1_scores)]
print("Best threshold for max F1:", best_threshold)

In [None]:
score = infer_image("/content/p.jpeg")
print("Pneumonia probability:", score)

if score >= best_threshold:
    print("Pneumonia detected")
else:
    print("Normal")

In [None]:
score = infer_image("/content/n.jpeg")
print("Pneumonia probability:", score)

if score >= best_threshold:
    print("Pneumonia detected")
else:
    print("Normal")

In [None]:
score = infer_image("/content/normal.jpg")
print("Pneumonia probability:", score)

if score >= best_threshold:
    print("Pneumonia detected")
else:
    print("Normal")