# **Computer-Aided Diagnosis System for Lung Fibrosis: From the Effect of Radiomic Features**

**Import Necessary Packages**

In [None]:

import os
import cv2
import yaml
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import SimpleITK as sitk
import matplotlib.pyplot as plt
from radiomics import featureextractor
from tensorflow.keras import layers, models
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix, ConfusionMatrixDisplay


## UNET

**Set Paramaeters**

In [None]:
input_shape = (128, 128, 1)
batch_size = 16
epochs = 30
data_dir = '/content/drive/MyDrive/FYP/UNET/train'  # Path to your training data
model_path = '/content/drive/MyDrive/FYP/UNET/lung_segmentation_unet.h5'

**Utils**

In [None]:
def load_data(data_dir):
    images = []
    masks = []
    for filename in os.listdir(os.path.join(data_dir, 'images')):
        img = cv2.imread(os.path.join(data_dir, 'images', filename), cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (128, 128))  # Resize to desired size
        images.append(img)

        mask = cv2.imread(os.path.join(data_dir, 'masks', filename), cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (128, 128))  # Resize to desired size
        masks.append(mask)

    images = np.array(images).astype('float32') / 255.0  # Normalize
    masks = np.array(masks).astype('float32') / 255.0  # Normalize
    masks = np.expand_dims(masks, axis=-1)  # Expand dims for channel

    return images, masks

In [None]:
def get_data_splits(data_dir):
    images, masks = load_data(data_dir)
    return train_test_split(images, masks, test_size=0.2, random_state=42)

**Functions**

In [None]:
def dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true_flat = y_true.flatten()
    y_pred_flat = y_pred.flatten()
    intersection = np.sum(y_true_flat * y_pred_flat)
    return (2. * intersection + smooth) / (np.sum(y_true_flat) + np.sum(y_pred_flat) + smooth)

In [None]:
def iou(y_true, y_pred, smooth=1e-6):
    y_true_flat = y_true.flatten()
    y_pred_flat = y_pred.flatten()
    intersection = np.sum(y_true_flat * y_pred_flat)
    return intersection / (np.sum(y_true_flat) + np.sum(y_pred_flat) - intersection + smooth)

In [None]:
def pixel_accuracy(y_true, y_pred):
    y_true_flat = y_true.flatten()
    y_pred_flat = y_pred.flatten()
    return np.sum(y_true_flat == y_pred_flat) / y_true_flat.shape[0]

In [None]:
def evaluate(y_true, y_pred):
    return {
        "Dice Coefficient": dice_coefficient(y_true, y_pred),
        "IoU": iou(y_true, y_pred),
        "Pixel Accuracy": pixel_accuracy(y_true, y_pred),
    }


In [None]:
def unet(input_shape):
    inputs = layers.Input(input_shape)

    # Contracting Path
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D((2, 2))(c1)

    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D((2, 2))(c2)

    c3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(c3)
    p3 = layers.MaxPooling2D((2, 2))(c3)

    c4 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(p3)
    c4 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(c4)
    p4 = layers.MaxPooling2D((2, 2))(c4)

    c5 = layers.Conv2D(1024, (3, 3), activation='relu', padding='same')(p4)
    c5 = layers.Conv2D(1024, (3, 3), activation='relu', padding='same')(c5)

    # Expanding Path
    u6 = layers.Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = layers.concatenate([u6, c4])
    c6 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(u6)
    c6 = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(c6)

    u7 = layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = layers.concatenate([u7, c3])
    c7 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(u7)
    c7 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(c7)

    u8 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c7)
    u8 = layers.concatenate([u8, c2])
    c8 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(u8)
    c8 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c8)

    u9 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c8)
    u9 = layers.concatenate([u9, c1])
    c9 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(u9)
    c9 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c9)

    outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c9)

    model = models.Model(inputs=[inputs], outputs=[outputs])
    return model

**Load Data**

In [None]:
# Assuming get_data_splits, unet, and other variables like model_path, batch_size, and epochs are defined elsewhere

X_train, X_val, y_train, y_val = get_data_splits(data_dir)

# Check if the saved model exists
if os.path.exists(model_path):
    # Load the saved model in TensorFlow's SavedModel format
    model = tf.keras.models.load_model(model_path)
    print(f"Loaded saved model from {model_path}")
else:
    # Create a U-Net model and train it
    input_shape = (128, 128, 1)  # Define input shape without batch dimension
    model = unet(input_shape)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    # Train the model
    history = model.fit(X_train, y_train, validation_data=(X_val, y_val), batch_size=batch_size, epochs=epochs)

    # Save the trained model in SavedModel format
    model.save(model_path)  # No .h5 extension to use SavedModel format
    print(f"Model saved to {model_path}")

**Prediction and Validation**

In [None]:
# Make predictions on validation set
predictions = model.predict(X_val)

# Save the predictions
np.save('/content/drive/MyDrive/FYP/UNET/predictions.npy', predictions)

def visualize_predictions(X_val, y_val, predictions):
    thresholded_preds = (predictions > 0.5).astype(np.uint8)  # Threshold at 0.5
    for i in range(5):  # Display 5 samples
        plt.figure(figsize=(15, 5))

        plt.subplot(1, 3, 1)
        plt.imshow(X_val[i].squeeze(), cmap='gray')
        plt.title('Input Image')

        plt.subplot(1, 3, 2)
        plt.imshow(y_val[i].squeeze(), cmap='gray')
        plt.title('Ground Truth')

        plt.subplot(1, 3, 3)
        plt.imshow(thresholded_preds[i].squeeze(), cmap='gray')
        plt.title('Prediction')

        plt.show()

# Visualize the predictions
visualize_predictions(X_val, y_val, predictions)

**Metrics**

In [None]:
# After training, evaluate metrics
def evaluate_metrics(y_true, y_pred):
    dice = dice_coefficient(y_true, y_pred)
    iou_score = iou(y_true, y_pred)
    accuracy = pixel_accuracy(y_true, y_pred)
    print(f"Dice Coefficient: {dice}")
    print(f"IoU: {iou_score}")
    print(f"Pixel Accuracy: {accuracy}")

# Evaluate on validation set
evaluate_metrics(y_val, (predictions > 0.5).astype(np.uint8))

**Training History**

In [None]:
# Plot training & validation accuracy and loss values over epochs
def plot_training_history(history):
    plt.figure(figsize=(12, 4))

    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()

    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    plt.show()

# Plot the training history
if 'history' in locals():  # Make sure history exists from training
    plot_training_history(history)

# Visualize the predictions
visualize_predictions(X_val, y_val, predictions)

## Segmentation

**Set model parameters and paths**

In [None]:
input_shape = (128, 128, 1)
model_path = '/content/drive/MyDrive/FYP/DataSet/lung_segmentation_unet.h5'
covid_data_dir = '/content/drive/MyDrive/FYP/segmentation/Covid'  # Path to COVID images folder
output_dir = '/content/drive/MyDrive/FYP/Masks'  # Directory to save the predicted masks

**Load the pre-trained UNet model**

In [None]:
from tensorflow.keras.models import load_model

# Load the pre-trained model in .h5 format
model_path_h5 = '/content/drive/MyDrive/FYP/DataSet/lung_segmentation_unet.h5'  # Path to your .h5 model
saved_model_dir = '/content/drive/MyDrive/FYP/DataSet/saved_model'  # Directory to save the new SavedModel

# Load the .h5 model
model = load_model(model_path_h5, compile=False)

# Save the model in SavedModel format
model.save(saved_model_dir)
print(f"Model saved in SavedModel format at {saved_model_dir}")

**Ensure output directory exists**

In [None]:
os.makedirs(output_dir, exist_ok=True)

**Loop through each image in the COVID folder, segment, and save the masks**

In [None]:
for filename in os.listdir(covid_data_dir):
    # Load and preprocess the image
    img_path = os.path.join(covid_data_dir, filename)
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    img_resized = cv2.resize(img, (128, 128))  # Resize to match model input shape
    img_normalized = img_resized.astype('float32') / 255.0  # Normalize to [0, 1]
    img_input = np.expand_dims(img_normalized, axis=(0, -1))  # Add batch and channel dimensions

    # Predict the segmentation mask
    predicted_mask = model.predict(img_input)[0, :, :, 0]  # Remove batch dimension

    # Threshold the mask to binary (0, 1)
    thresholded_mask = (predicted_mask > 0.5).astype(np.uint8) * 255  # Convert to binary mask for visualization/storage

    # Resize mask back to original image size if necessary
    mask_resized = cv2.resize(thresholded_mask, (img.shape[1], img.shape[0]))

    # Save the mask
    output_path = os.path.join(output_dir, filename)
    cv2.imwrite(output_path, mask_resized)
    print(f"Saved mask to {output_path}")

print("Mask generation for the COVID dataset is complete.")

**Segmentation**

In [None]:
# Paths to your images and masks
covid_data_dir = '/content/drive/MyDrive/FYP/segmentation/Covid'  # Original images
output_dir = '/content/drive/MyDrive/FYP/Masks'  # Generated masks

def visualize_predictions(image_dir, mask_dir, num_samples=5):
    images = sorted(os.listdir(image_dir))[:num_samples]

    for filename in images:
        img_path = os.path.join(image_dir, filename)
        mask_path = os.path.join(mask_dir, filename)

        # Load images and masks for visualization
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        pred_mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        plt.figure(figsize=(10, 5))

        # Display original image
        plt.subplot(1, 2, 1)
        plt.imshow(img, cmap='gray')
        plt.title('Original Image')

        # Display generated mask
        plt.subplot(1, 2, 2)
        plt.imshow(pred_mask, cmap='gray')
        plt.title('Generated Mask')

        plt.show()

**Visualization**

In [None]:
visualize_predictions(covid_data_dir, output_dir, num_samples=5)

## PyRadiomics

**Initialize the extractor with the YAML configuration file**

In [None]:
extractor = featureextractor.RadiomicsFeatureExtractor('/content/drive/MyDrive/FYP/FeatureExtraction/params.yaml')

**Paths to image and mask folders**

In [None]:
image_folder = "/content/drive/MyDrive/FYP/segmentation/Covid"
mask_folder = "/content/drive/MyDrive/FYP/Masks"

**Prepare the output CSV file**

In [None]:
output_csv = "/content/drive/MyDrive/FYP/extracted_features.csv"
feature_data = []

**Get all image and mask files**

In [None]:
image_files = sorted(os.listdir(image_folder))
mask_files = sorted(os.listdir(mask_folder))

**Loop through each image and corresponding mask**

In [None]:
for img_file, mask_file in zip(image_files, mask_files):
    image_path = os.path.join(image_folder, img_file)
    mask_path = os.path.join(mask_folder, mask_file)

    # Read the image and mask
    image = sitk.ReadImage(image_path)
    mask = sitk.ReadImage(mask_path)

    # Convert images to grayscale if necessary
    if image.GetNumberOfComponentsPerPixel() > 1:
        image = sitk.VectorIndexSelectionCast(image, 0)  # Select one channel, e.g., Red channel

    if mask.GetNumberOfComponentsPerPixel() > 1:
        mask = sitk.VectorIndexSelectionCast(mask, 0)  # Ensure mask is single-channel as well

    # Ensure images are of type UInt8 or UInt16, compatible with radiomics processing
    image = sitk.Cast(image, sitk.sitkUInt8)
    mask = sitk.Cast(mask, sitk.sitkUInt8)

    # Check if the image and mask sizes match
    if image.GetSize() != mask.GetSize():
        print(f"Resizing mask for {img_file} to match image dimensions.")
        mask = sitk.Resample(mask, referenceImage=image, transform=sitk.Transform(),
                             interpolator=sitk.sitkNearestNeighbor, defaultPixelValue=0, outputPixelType=sitk.sitkUInt8)

    # Check if the label 255 exists in the mask
    mask_stats = sitk.LabelStatisticsImageFilter()
    mask_stats.Execute(image, mask)

    if 255 in mask_stats.GetLabels():  # Proceed only if label 255 is present
        # Extract only the specified features
        features = extractor.execute(image, mask)

        # Store features in a dictionary, with image filename as identifier
        feature_row = {"Image": img_file}
        feature_row.update({feature_name: feature_value for feature_name, feature_value in features.items()})

    else:
        # If label 255 is missing, add an empty row for this image
        print(f"No segmentation found in {img_file}. Adding empty row.")
        feature_row = {"Image": img_file}

    # Append the row to the data list
    feature_data.append(feature_row)

**Convert the list of dictionaries to a DataFrame**

In [None]:
df = pd.DataFrame(feature_data)

**Save the DataFrame to CSV**

In [None]:
df.to_csv(output_csv, index=False)
print(f"Feature extraction complete. Results saved to {output_csv}")

## Random-Classifier

**Load the YAML file and construct full feature names**

In [None]:
with open('/content/drive/MyDrive/FYP/Old codes/Feature Extraction/params.yaml', 'r') as file:
    params_data = yaml.safe_load(file)

**Construct feature column names based on the hierarchical structure in params.yaml**

In [None]:
feature_columns = [f"original_{feature_class}_{feature}" for feature_class, features in params_data['featureClass'].items() for feature in features]

**Load the data**

In [None]:
normal_df = pd.read_csv('/content/drive/MyDrive/FYP/extracted_features_normal.csv')
covid_df = pd.read_csv('/content/drive/MyDrive/FYP/extracted_features_Covid.csv')

**Assign labels and combine datasets**

In [None]:
normal_df['Target'] = 0  # Label for 'Normal' cases
covid_df['Target'] = 1   # Label for 'COVID' cases
df = pd.concat([normal_df, covid_df], ignore_index=True)

**Check for missing feature columns**

In [None]:
missing_features = [col for col in feature_columns if col not in df.columns]
if missing_features:
    print("The following features are missing from the dataset:", missing_features)

**Select only the feature columns and the target**

In [None]:
X = df[feature_columns]
y = df['Target']

**Data Preprocessing**

In [None]:
X.fillna(X.mean(), inplace=True)  # Fill missing values with column mean
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

**Train-test split**

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

**Initialize and train Random Forest classifier**

In [None]:
rf_clf = RandomForestClassifier(random_state=42)
rf_clf.fit(X_train, y_train)

**Cross-validation**

In [None]:
cv_scores = cross_val_score(rf_clf, X_scaled, y, cv=5)
print("Cross-validation accuracy scores:", cv_scores)
print("Mean cross-validation accuracy:", cv_scores.mean())

**Predictions and evaluation**

In [None]:
y_pred = rf_clf.predict(X_test)
print("Test Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:\n", classification_report(y_test, y_pred))

**Confusion Matrix**

In [None]:
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=rf_clf.classes_)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

**Feature Importance**

In [None]:
feature_importances = rf_clf.feature_importances_
feature_importance_df = pd.DataFrame({'Feature': feature_columns, 'Importance': feature_importances})
feature_importance_df = feature_importance_df.sort_values(by='Importance', ascending=False)

plt.figure(figsize=(10, 6))
sns.barplot(x='Importance', y='Feature', data=feature_importance_df.head(10), palette='viridis')
plt.title("Top 10 Feature Importances")
plt.show()

**Heatmap of Feature Correlations**

In [None]:
corr_matrix = pd.DataFrame(X_scaled, columns=feature_columns).corr()
plt.figure(figsize=(12, 10))
sns.heatmap(corr_matrix, cmap="coolwarm", annot=False, fmt=".2f")
plt.title("Feature Correlation Heatmap")
plt.show()

**Cross-Validation Scores Plot**

In [None]:
plt.figure(figsize=(8, 5))
plt.plot(range(1, 6), cv_scores, marker='o', linestyle='-', color='b')
plt.title("Cross-Validation Scores across Folds")
plt.xlabel("Fold")
plt.ylabel("Accuracy Score")
plt.ylim(0, 1)
plt.grid()
plt.show()