In [1]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import tree
import cv2 
import os 
from random import shuffle 
from tqdm import tqdm 
from PIL import Image

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.metrics import accuracy_score, classification_report, root_mean_squared_error, confusion_matrix
import seaborn as sns

import warnings
warnings.filterwarnings('ignore')
import os

In [2]:
prune_filtered_data = pd.read_csv("Filtered_Data_Entry_DS541.csv")

In [3]:
train_images = "../data/CXR8/images/train_val_images"

test_images = "../data/CXR8/images/test_images"


In [32]:
label_columns = ['Atelectasis', 'Cardiomegaly', 'Consolidation',
       'Edema', 'Effusion', 'Emphysema', 'Fibrosis', 'Hernia', 'Infiltration',
       'Mass', 'Nodule', 'Pleural Thickening', 'Pneumonia', 'Pneumothorax',
       'Pneumoperitoneum', 'Pneumomediastinum', 'Subcutaneous Emphysema',
       'Tortuous Aorta', 'Calcification of the Aorta', 'No Finding',]

In [33]:
label_map = {label: i for i, label in enumerate(label_columns)}

In [34]:
def train_test_preprocess_data(image_size):
    train_data = []
    train_labels = []

    test_data = []
    test_labels = []

    for image1 in tqdm(os.listdir(train_images)): 
        path = os.path.join(train_images, image1)
        img1 = cv2.imread(path, cv2.IMREAD_GRAYSCALE) 
        img1 = cv2.resize(img1, (image_size, image_size))
        train_data.append(img1)
        train_labels.append(prune_filtered_data[prune_filtered_data['id'] == image1][label_columns].to_numpy())
    
    for image2 in tqdm(os.listdir(test_images)): 
        path = os.path.join(test_images, image2)
        img2 = cv2.imread(path, cv2.IMREAD_GRAYSCALE) 
        img2 = cv2.resize(img2, (image_size, image_size))
        test_data.append(img2)
        test_labels.append(prune_filtered_data[prune_filtered_data['id'] == image2][label_columns].to_numpy()) 
    
    return train_data, np.array(train_labels), test_data, np.array(test_labels)

In [None]:
train_data, train_labels, test_data, test_labels = train_test_preprocess_data(image_size = 256)

In [8]:
x_data=np.concatenate((train_data,test_data),axis=0)

# #min-max scaling
# x_data = (x_data-np.min(x_data))/(np.max(x_data)-np.min(x_data))

In [9]:
y_data=np.concatenate((train_labels,test_labels),axis=0)

In [11]:
y_data = y_data.reshape(30805, len(label_columns))

In [12]:
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.15, random_state=42)
number_of_train = x_train.shape[0]
number_of_test = x_test.shape[0]

In [None]:
x_train_flatten = x_train.reshape(number_of_train,x_train.shape[1]*x_train.shape[2])
x_test_flatten = x_test .reshape(number_of_test,x_test.shape[1]*x_test.shape[2])
print("X train flatten",x_train_flatten.shape)
print("X test flatten",x_test_flatten.shape)

In [14]:
def decision_tree(max_depth, x_train_flatten, y_train, x_test_flatten):
    print("\nCreating decision tree max_depth=" + str(max_depth))
    model = tree.DecisionTreeClassifier(max_depth=max_depth, class_weight='balanced')
    model.fit(x_train_flatten, y_train)
    print("Done ")
    y_pred = model.predict(x_test_flatten)

    return y_pred

In [15]:
def random_forest(estimators, x_train_flatten, y_train, x_test_flatten):
    model = RandomForestClassifier(n_estimators=estimators, class_weight='balanced')
    model.fit(x_train_flatten, y_train)
    print("Done ")
    y_pred = model.predict(x_test_flatten)
    
    return y_pred

In [None]:
dt_y_pred = decision_tree(3, x_train_flatten, y_train, x_test_flatten)


In [None]:
rf_y_pred = random_forest(50, x_train_flatten, y_train, x_test_flatten)


In [None]:
print("Unique classes in true labels:", np.unique(y_test.argmax(axis=1)))
print("Unique classes in predictions:", np.unique(dt_y_pred.argmax(axis=1)))

dt_cm = confusion_matrix(y_test.argmax(axis=1), dt_y_pred.argmax(axis=1))

In [None]:
class_names = list(label_map.keys())

true_labeled_classes = [class_names[i] for i in range(len(class_names))]

plt.figure(figsize=(12, 12))
sns.heatmap(dt_cm, annot=True, fmt="d", xticklabels=true_labeled_classes, yticklabels=true_labeled_classes, cmap="Blues")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.title("Decision Tree Confusion Matrix")
plt.show()

In [None]:
print("Unique classes in true labels:", np.unique(y_test.argmax(axis=1)))
print("Unique classes in predictions:", np.unique(rf_y_pred.argmax(axis=1)))

rf_cm = confusion_matrix(y_test.argmax(axis=1), rf_y_pred.argmax(axis=1))

In [None]:
class_names = list(label_map.keys())

true_labeled_classes = [class_names[i] for i in range(len(class_names))]

plt.figure(figsize=(12, 12))
sns.heatmap(rf_cm, annot=True, fmt="d", xticklabels=true_labeled_classes, yticklabels=true_labeled_classes, cmap="Blues")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.title("Random Forest Confusion Matrix")
plt.show()

In [None]:
print(root_mean_squared_error(y_test, rf_y_pred))
print(accuracy_score(y_test, rf_y_pred))


In [None]:
print(classification_report(y_test, dt_y_pred))

In [None]:
print(root_mean_squared_error(y_test, dt_y_pred))
print(accuracy_score(y_test, dt_y_pred, normalize=True))
