# DSC 180B CNN Prototype

### Data Loading and Getting the Labels

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
import matplotlib.pylab as plt
import numpy as np
import torch
from PIL import Image
import cv2
import tensorflow as tf
from sklearn.model_selection import train_test_split
import os
from torchvision import models
import torch.nn as nn
import shutil

In [None]:
df = pd.read_csv('/content/drive/MyDrive/data/X-Ray Data/Data_Entry_2017.csv')
df = df.sample(frac=1).reset_index(drop=True)
df.head()

In [None]:
def get_label(row):
    if 'No Finding' in row['Finding Labels']:
        return 0
    return 1

In [None]:
df['Finding Labels'] = df.apply(lambda row: get_label(row), axis=1)
df.head()

In [None]:
# Create a directory to store the downloaded images
download_dir = "/content/downloaded_images"
os.makedirs(download_dir, exist_ok=True)

# Iterate through the dataframe and download the images
image_paths = df['Image Index']
for image_name in image_paths:
  source_path = '/content/drive/MyDrive/data/X-Ray Data/' + image_name
  destination_path = os.path.join(download_dir, image_name)
  try:
    shutil.copy2(source_path, destination_path) # copy2 preserves metadata
  except FileNotFoundError:
    continue
  except Exception as e:
    continue

print(f"Images downloaded to: {download_dir}")

In [None]:
import torchvision.transforms as transforms

def normalize_image(img):
    # normalize according to imagenet
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    img = img.astype(np.float32) / 255.0
    img = (img - mean) / std
    return img

def random_horizontal_flip(img):
    if np.random.rand() < 0.5:
        img = cv2.flip(img, 1)
    return img

def color_jitter(img):
    # Randomly adjust brightness, contrast, saturation, and hue
    transform = transforms.ColorJitter(brightness=0.25, contrast=0.25)
    img = transform(Image.fromarray(img))
    return np.array(img)

def resize_image(img, target_size=(224, 224)):
    return cv2.resize(img, target_size)

def convert_to_rgb(img):
    return cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)

def add_random_rotation(img, max_angle=5):
    angle = np.random.uniform(-max_angle, max_angle)
    M = cv2.getRotationMatrix2D((img.shape[1] // 2, img.shape[0] // 2), angle, 1)
    return cv2.warpAffine(img, M, (img.shape[1], img.shape[0]))

In [None]:
X = []
y = []

# load png images into X and y
for i in range(len(df)):
    image_name = df['Image Index'][i]
    png_path = os.path.join('/content/downloaded_images', image_name) # Use os.path.join to create the path

    # Check if the file exists before trying to read it
    if not os.path.exists(png_path):
        print(f"Image file not found: {png_path}")
        continue

    print("Image found")

    img = cv2.imread(png_path)

    # Check if the image was loaded successfully
    if img is None:
        print(f"Failed to load image: {png_path}")
        continue

    img = resize_image(img)
    img = random_horizontal_flip(img)
    img = color_jitter(img)
    img = add_random_rotation(img)
    img = normalize_image(img)

    X.append(img)
    y.append(df['Finding Labels'][i])

    print(len(X))
    print(len(y))

X = np.array(X)
y = np.array(y)

In [None]:
# Convert to tensor to be fed to ResNet50
X = torch.tensor(X)

# train test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
X_train.shape, X_test.shape, y_train.shape, y_test.shape

### Healthy vs Unhealthy Lung Comparison

In [None]:
healthy_x_rays = []
abnormal_x_rays = []
i = 0
while len(healthy_x_rays) < 10 or len(abnormal_x_rays) < 10:
    if y_train[i] == 0:
        healthy_x_rays.append(X_train[i])
    else:
        abnormal_x_rays.append(X_train[i])
    i += 1

for i in range(10):
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))
    ax[0].imshow(healthy_x_rays[i], cmap='gray')
    ax[0].set_title('Healthy X-Ray')
    ax[1].imshow(abnormal_x_rays[i], cmap='gray')
    ax[1].set_title('Abnormal X-Ray')
    plt.show()


In [None]:
import numpy as np
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Flatten
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.optimizers.schedules import CosineDecayRestarts

# Compute class weights
from sklearn.utils.class_weight import compute_class_weight

class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights_dict = dict(enumerate(class_weights))

# Load ResNet50V2
resnet50 = ResNet50V2(include_top=False, weights='imagenet', input_shape=(224, 224, 3))

# Unfreeze deeper layers for fine-tuning
for layer in resnet50.layers[-10:]:
    layer.trainable = True

# Define the model
model = Sequential([
    resnet50,
    Flatten(),
    Dense(256, activation="relu", kernel_regularizer=l2(1e-4)),
    Dropout(0.3),
    Dense(128, activation="relu"),
    Dropout(0.2),
    Dense(1, activation="sigmoid")
])

# Implement Cosine Annealing
initial_lr = 1e-4
cosine_decay_restarts = CosineDecayRestarts(initial_learning_rate=initial_lr,
                                            first_decay_steps=5000,
                                            t_mul=2.0,  # Increases period after each restart
                                            m_mul=0.8,  # Reduce max learning rate after each restart
                                            alpha=1e-6)

# Compile model with Cosine Annealing scheduler
model.compile(optimizer=Adam(learning_rate=cosine_decay_restarts),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
model_checkpoint = ModelCheckpoint('best_model.keras', monitor='val_loss', save_best_only=True, verbose=1)

# Train the model
history = model.fit(X_train, y_train,
                    validation_data=(X_test, y_test),
                    epochs=100,
                    batch_size=16,
                    class_weight=class_weights_dict,
                    callbacks=[early_stopping, model_checkpoint])

In [None]:
# create a confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns

y_pred = model.predict(X_test)
y_pred = y_pred > 0.5
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')

# output precision, recall, f1-score
from sklearn.metrics import classification_report
print(classification_report(y_test, y_pred))



In [None]:
# prompt: Save the .keras and .h5 file to my google drive

# Save the model to your Google Drive
model.save('/content/drive/MyDrive/best_model.h5')
model.save('/content/drive/MyDrive/best_model.keras')
