# CMSC 636, Project: cheXpert analysis
## Basic CNN approach

In [None]:
import sys              
import json         
import os               
import argparse         
import zipfile      
import math         

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from types import SimpleNamespace
%matplotlib inline

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy
from tensorflow.keras.utils import to_categorical

# Setup constants

In [None]:
# File paths will need to be setup on the host somwhere. These values provide overridable defaults
default_label_file = 'label/findings_fixed.json'
default_input_file = 'CheXpert-v1.0 batch 2 (train 1).zip'

epochs = 10  # Number of epochs for training
batch_size = 4  # Adjust based on your system's memory
bounding_square = 2880 # Maximum image size we are prepared to consider. Larger will be scaled down
image_ext = '.jpg'

# cheXpert labels
labels_in_order = ["Enlarged Cardiomediastinum", "Cardiomegaly", "Lung Opacity", "Lung Lesion",
                   "Edema", "Consolidation", "Pneumonia", "Atelectasis",
                   "Pneumothorax", "Pleural Effusion", "Pleural Other", "Fracture",
                   "Support Devices", "No Finding"]

# Load Data (from zip)

In [None]:
def count_jpg_in_zip(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as zf:
        return sum(1 for name in zf.namelist() if name.endswith(image_ext))

# pre-labeled data is in jpg format
def jpg_from_zip_generator(zip_path, labels):
    with zipfile.ZipFile(zip_path) as zf:
        for name in zf.namelist():
            if name.endswith(image_ext):
                if name not in labels:
                    print(f"Warning: Found JPG '{name}' in zip but no corresponding label entry. Skipping.")
                    continue
                with zf.open(name) as f:
                    yield f.read(), labels[name]

# Image Conformance
All inputs to the model must be the same size, and the convolutions want square images.
Selecting a large square that fits almost all images, and padding the shorter dimensions with black. The few images that are larger must be scaled down to fit.

In [None]:
def pad_to_fixed_size(image, target_size=(bounding_square, bounding_square)):
    # Get original dimensions
    current_height = tf.shape(image)[0]
    current_width = tf.shape(image)[1]
    
    scale = tf.minimum(
        target_size[0] / tf.cast(current_height, tf.float32),
        target_size[1] / tf.cast(current_width, tf.float32)
    )
    
    # scale down if it is too large
    def resize_needed():
        new_height = tf.cast(tf.cast(current_height, tf.float32) * scale, tf.int32)
        new_width = tf.cast(tf.cast(current_width, tf.float32) * scale, tf.int32)
        return tf.image.resize(image, [new_height, new_width], method='bilinear')
    
    def no_resize_needed():
        return image
    
    # Only resize if the image is larger
    image = tf.cond(
        tf.logical_or(current_height > target_size[0], current_width > target_size[1]),
        resize_needed,
        no_resize_needed
    )
    
    # Get dimensions after possible resize
    current_height = tf.shape(image)[0]
    current_width = tf.shape(image)[1]
    
    # Compute padding
    pad_height = target_size[0] - current_height
    pad_width = target_size[1] - current_width

    pad_top = pad_height // 2
    pad_bottom = pad_height - pad_top
    pad_left = pad_width // 2
    pad_right = pad_width - pad_left

    padded = tf.pad(
        image,
        paddings=[[pad_top, pad_bottom], [pad_left, pad_right], [0,0]],
        mode='CONSTANT',
        constant_values=0
    )
    # The model needs to know the shape of the input tensor, so we set it explicitly here
    padded.set_shape([bounding_square, bounding_square, 1])
    return padded

# Create the Dataset
Use the defined functions to:
- read the zip file
- convert the jgp files to an image
- pad (and occasionally scale down) the images
- attach the labels to each image
- convert to a "generated" dataset (so it does not reside in memory)

Note that the `labels` argument is a dictionary of file path to the associated label vector.

In [None]:
def dataset_from_zip(zip_path, labels):
    length = count_jpg_in_zip(zip_path)
    dataset = tf.data.Dataset.from_generator(
        lambda: jpg_from_zip_generator(zip_path, labels),
        output_types=(tf.string, tf.float32),  # output types of the generator
        output_shapes=((), (len(labels_in_order),))
    )
    def pair_images_and_labels(x, y):
        image = tf.image.convert_image_dtype(tf.io.decode_jpeg(x, channels=1), dtype=tf.float32)  # Decode the JPEG
        return pad_to_fixed_size(image), y

    dataset = dataset.map(pair_images_and_labels)
    return dataset, length

# Create the label dictionary
The `labels` dictionary is an unzipped text file of JSON lines. Each line is a single dictionary, containing the path of the image, and each of the image labels as either a `null` or a floating point value between 0.0 and 1.0.

In [None]:
def vector_encoding(labels):
    vector = []
    for label in labels_in_order:
        if label in labels:
            value = labels[label]
            if value is not None:
                vector.append(value)
            else:
                vector.append(0.0)
        else:
            print(f"Warning: Label '{label}' not found in the input data. Defaulting to 0.")
            vector.append(0.0)
    # Return as a TensorFlow constant for compatibility with the dataset
    return tf.constant(vector, dtype=tf.float32, shape=(len(labels_in_order),))

def load_labels(label_file):
    label_data = {}
    with open(label_file, 'r') as f:
        for line in f:
            try:
                entry = json.loads(line)
                if 'path_to_image' in entry:
                    path = entry['path_to_image']
                    if path.startswith('train/'):
                        path = path.replace('train/', 'CheXpert-v1.0 batch 2 (train 1)/', 1)
                    elif path.startswith('valid/'):
                        path = path.replace('valid/', 'CheXpert-v1.0 batch 2 (valid 1)/', 1)
                    else:
                        print(f"Unexpected path format: {path}. Expected to start with 'train/'.")
                        continue
                    # convert to an encoded vector for findings
                    label_data[path] = vector_encoding(entry)
                else:
                    print(f"Skipping entry without 'path_to_image' on line {f.tell()}: {entry}")
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
                continue
    return label_data

# Model Definition

In [None]:
def create_model():
    model = Sequential([
        Input(shape=(bounding_square,bounding_square,1)),
        Conv2D(64, (5,5), activation='relu', padding='valid'),
        MaxPooling2D(pool_size=(2,2), strides=2),

        Conv2D(128, (5,5), activation='relu', padding='valid'),
        MaxPooling2D(pool_size=(2,2), strides=2),

        Conv2D(256, (5,5), activation='relu', padding='valid'),
        MaxPooling2D(pool_size=(2,2), strides=2),

        Flatten(),

        # Dense(512, activation='relu'),
        Dense(128, activation='relu'),
        # Dropout(0.5),
        Dense(14, activation='sigmoid')
    ])
    model.compile(optimizer=Adam(learning_rate=1e-3),
                  loss=BinaryCrossentropy(),
                  metrics=[BinaryAccuracy()])
    model.summary()
    return model

# Model Training

In [None]:
def process_dataset(model, dataset, length, validate_dataset, vlength):
    steps_per_epoch = math.ceil(length / batch_size)
    dataset = dataset.batch(batch_size)
    dataset = dataset.repeat()  # Repeat the dataset for multiple epochs
    validation_steps = math.ceil(vlength / batch_size)
    validate_dataset = validate_dataset.batch(batch_size)

    #time the epochs to see how long it takes to train
    start_time = tf.timestamp()  # Start time for timing the training
    model.fit(dataset,
              batch_size=batch_size,
              epochs=epochs,
              steps_per_epoch=steps_per_epoch,
              validation_data=validate_dataset,
              validation_steps=validation_steps)
    end_time = tf.timestamp()  # End time for timing the training
    elapsed_time = end_time - start_time
    print(f"Training completed in {elapsed_time.numpy()} seconds.")
    # Save the model
    model.save('model.h5')
    print("Model saved as 'model.h5'.")
    return model

# Load Data

In [None]:
input_file = default_input_file
label_file = default_label_file
validate_file = default_validate_file

# Load the labels. These are the labels for all data and validation files
label_data = load_labels(label_file)

# Load the images
dataset, length = dataset_from_zip(input_file, label_data)
print(f"Total entries processed: {len(label_data)}")

dataset, length = dataset_from_zips(input_files, label_data)
validate_dataset, vlength = dataset_from_zip(validate_file, label_data)

# Create the Model and Train

In [None]:
model = create_model()
model = process_dataset(model, dataset, length, validate_dataset, vlength)

# 4. Test the Model on the testing dataset

This is still a TODO item