In [2]:
import os
import zipfile
import shutil
import random
import numpy as np
import pandas as pd

#from tqdm import tqdm

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers, models
from tensorflow.keras.models import Model

from sklearn.cluster import KMeans
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score


  if not hasattr(np, "object"):


In [4]:
ZIP_PATH = "covid+pneumonia+normal.zip"

WORKING_DIR = "CPN_working"
DATA_DIR = os.path.join(WORKING_DIR, "CPN")
MIXED_DIR = os.path.join(WORKING_DIR, "mixed_images")

os.makedirs(WORKING_DIR, exist_ok=True)
os.makedirs(MIXED_DIR, exist_ok=True)


In [5]:
with zipfile.ZipFile(ZIP_PATH, 'r') as zip_ref:
    zip_ref.extractall(WORKING_DIR)
print("‚úÖ Dataset unzipped completely")


‚úÖ Dataset unzipped completely


In [7]:
# Initialize
image_records = []
image_counter = 0

# Dataset root (class folders inside this)
dataset_root = WORKING_DIR
print("üìÇ Dataset root detected:", dataset_root)

# Loop through each class folder
for class_name in sorted(os.listdir(dataset_root)):
    class_path = os.path.join(dataset_root, class_name)

    # Skip non-directories and mixed_images folder
    if not os.path.isdir(class_path) or class_name == "mixed_images":
        continue

    print(f"‚û°Ô∏è Processing class: {class_name}")

    for image_name in sorted(os.listdir(class_path)):
        if image_name.lower().endswith(('.png', '.jpg', '.jpeg')):

            image_counter += 1
            new_image_name = f"img_{image_counter:06d}.png"

            src_path = os.path.join(class_path, image_name)
            dst_path = os.path.join(MIXED_DIR, new_image_name)

            shutil.copy(src_path, dst_path)

            # Store mapping
            image_records.append({
                "image_name": new_image_name,
                "label": class_name
            })

print(f"‚úÖ Mixed images created: {image_counter}")

# Save labels.csv
labels_df = pd.DataFrame(image_records)
LABELS_CSV = os.path.join(WORKING_DIR, "labels.csv")
labels_df.to_csv(LABELS_CSV, index=False)

print("‚úÖ labels.csv saved successfully")

üìÇ Dataset root detected: CPN_working
‚û°Ô∏è Processing class: COVID
‚û°Ô∏è Processing class: NORMAL
‚û°Ô∏è Processing class: PNEUMONIA
‚úÖ Mixed images created: 5228
‚úÖ labels.csv saved successfully


In [8]:
IMG_SIZE = (224, 224)
BATCH_SIZE = 16

datagen = ImageDataGenerator(rescale=1./255)

data_gen = datagen.flow_from_dataframe(
    dataframe=labels_df,
    directory=MIXED_DIR,
    x_col="image_name",
    y_col="label",
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode="categorical",
    shuffle=False
)


Found 5228 validated image filenames belonging to 3 classes.


In [None]:
def build_resnet152():
    base = tf.keras.applications.ResNet152(
        include_top=False, weights=None, input_shape=(*IMG_SIZE,3)
    )
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(3, activation='softmax')(x)
    return Model(base.input, out)

In [10]:
def build_vgg19():
    base = tf.keras.applications.VGG19(
        include_top=False, weights=None, input_shape=(*IMG_SIZE,3)
    )
    x = layers.Flatten()(base.output)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(3, activation='softmax')(x)
    return Model(base.input, out)

In [11]:
def build_xception():
    base = tf.keras.applications.Xception(
        include_top=False, weights=None, input_shape=(*IMG_SIZE,3)
    )
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(3, activation='softmax')(x)
    return Model(base.input, out)

In [12]:

def build_mobilenet_v3():
    base = tf.keras.applications.MobileNetV3Large(
        include_top=False, weights=None, input_shape=(*IMG_SIZE,3)
    )
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(3, activation='softmax')(x)
    return Model(base.input, out)

In [None]:
from tensorflow.keras import layers, Model
import tensorflow as tf

def build_efficientnetb0():
    base = tf.keras.applications.EfficientNetB0(
        include_top=False,
        weights=None,          # keep None to match ResNet152 setup
        input_shape=(*IMG_SIZE, 3)
    )

    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(3, activation='softmax')(x)

    return Model(inputs=base.input, outputs=out)


In [None]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Encode labels
labels = labels_df["label"].values
classes = np.unique(labels)

class_weights = compute_class_weight(
    class_weight="balanced",
    classes=classes,
    y=labels
)

class_weight_dict = dict(zip(range(len(classes)), class_weights))

print("Class Weights:", class_weight_dict)


In [None]:
FEATURE_DIR = "/content/saved_features"
LABEL_DIR = "/content/saved_labels"

os.makedirs(FEATURE_DIR, exist_ok=True)
os.makedirs(LABEL_DIR, exist_ok=True)


In [None]:
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
true_labels = le.fit_transform(labels_df["label"])

np.save(f"{LABEL_DIR}/true_labels.npy", true_labels)
print("‚úÖ True labels saved:", true_labels.shape)


In [None]:
from tensorflow.keras.models import Model
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

data_gen.shuffle = False  # VERY IMPORTANT

results = []

def train_and_extract(model, name):
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    model.fit(
        data_gen,
        class_weight=class_weight_dict,
        epochs=10,
        verbose=1
    )

   

    feature_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output
    )

    features = feature_model.predict(data_gen)

    np.save(f"{FEATURE_DIR}/{name}_features.npy", features)
    model.save(f"/content/{name}.h5")

    print(f"‚úÖ {name} features saved:", features.shape)

    return features


def cluster_with_scaling_pca(features, model_name):
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)

    pca = PCA(n_components=0.95, random_state=42)
    features_pca = pca.fit_transform(features_scaled)

    kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
    cluster_labels = kmeans.fit_predict(features_pca)

    label_change_percent = np.mean(cluster_labels != true_labels) * 100

    results.append({
        "Model": model_name,
        "Label Change (%)": round(label_change_percent, 2),
        "PCA Components": features_pca.shape[1]
    })

np.save(f"{LABEL_DIR}/true_labels.npy", true_labels)


In [None]:
resnet_model = build_resnet152()
resnet_features = train_and_extract(resnet_model, "ResNet152")
cluster_with_scaling_pca(resnet_features, "ResNet152")


In [None]:
vgg_model = build_vgg19()
vgg_features = train_and_extract(vgg_model, "VGG19")
cluster_with_scaling_pca(vgg_features, "VGG19")


In [None]:
xception_model = build_xception()
xception_features = train_and_extract(xception_model, "Xception")
cluster_with_scaling_pca(xception_features, "Xception")


In [None]:
mobilenet_model = build_mobilenet_v3()
mobilenet_features = train_and_extract(mobilenet_model, "MobileNetV3")
cluster_with_scaling_pca(mobilenet_features, "MobileNetV3")


In [None]:
efficientnet_model = build_efficientnetb0()
efficientnet_features = train_and_extract(
    efficientnet_model,
    "EfficientNetB0"
)

cluster_with_scaling_pca(efficientnet_features, "EfficientNetB0")


In [None]:
import pandas as pd

results_df = pd.DataFrame(results)
results_df
results_df.to_csv("clustering_results.csv", index=False)

Epoch 1/5
