### Setup

In [1]:
import os 
import pandas as pd
import numpy as np 
import json
import pydicom
import shutil
import matplotlib.pyplot as plt

# Setup:
# root_dir should be folder containing 'fracactlas' tar file or folder and
# UNIFEST zip file or folder

root_dir = os.path.join(os.path.expanduser('~'), 'Desktop', 'acvdl_final')
frac_dir = os.path.join(root_dir, 'fracatlas-DatasetNinja')
uni_dir = os.path.join(root_dir, 'archive')
total_dir = os.path.join(root_dir, 'total_images')
os.chdir(root_dir)
os.getcwd()


'/Users/Siobhan/Desktop/acvdl_final'

In [2]:
import zipfile

def extract_zip(zip_path, extract_to='.'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
        print(f'Extracted {zip_path} to {extract_to}')

import tarfile

def extract_tar(tar_path, extract_to='.'):
    with tarfile.open(tar_path, 'r:*') as tar_ref:
        tar_ref.extractall(extract_to)
        print(f'Extracted {tar_path} to {extract_to}')


In [None]:
if os.path.exists(frac_dir):
    print('Deleting existing fracatlas dir')
    shutil.rmtree(frac_dir)
    extract_tar('fracatlas-DatasetNinja.tar', extract_to='fracatlas-DatasetNinja')
if os.path.exists(uni_dir):
    print('Deleting existing UNIFEST dir')
    shutil.rmtree(uni_dir) 
    extract_zip('archive.zip', extract_to='archive')
if os.path.exists(total_dir):
    shutil.rmtree(total_dir)

Deleting existing fracatlas dir
Extracted fracatlas-DatasetNinja.tar to fracatlas-DatasetNinja
Deleting existing UNIFEST dir


### Fracatlas

In [None]:
os.chdir(frac_dir)

In [None]:
!tree -L 2

In [None]:
!find "not fractured" -type f -name "*.json" | wc -l


In [None]:
!find "test" -type f -name "*.json" | wc -l


In [None]:
!find "train" -type f -name "*.json" | wc -l


In [None]:
!find "val" -type f -name "*.json" | wc -l


In [None]:
val_prop = 82/(574+82)

print(val_prop)

In [None]:
# Exmaple json

file = os.path.join(frac_dir, 'test/ann/IMG0003297.jpg.json')

with open(file) as json_data:
    d = json.load(json_data)
print(d)

In [None]:
import os 
import json
import pandas as pd 
import numpy as np

os.chdir(frac_dir)
os.getcwd()

fracdata = pd.DataFrame(columns=['image_file_name', 'Target', 'height', 'width', 'File_path', 'ttv_type', 'dataset'])

for root, dirs, filenames in os.walk("."):
    if "not fractured" in dirs:
        dirs.remove("not fractured")

    if "img" in dirs:
        dirs.remove("img")

    for file in filenames:      
        if file.startswith("."):
            continue

        filepath = os.path.join(root, file)
        filename, file_extension = os.path.splitext(file)
        
        if file_extension == '.json':
            try:
                with open(filepath) as json_data:
                    d = json.load(json_data)
                    
                    # Extract 'body_part'
                    target = d['tags'][0]['name'] if d['tags'] else None
                    
                    # Extract 'height' and 'width'
                    height = d['size']['height']
                    width = d['size']['width']

                    # Extract image filepath
                    img_path = filepath.replace('ann', 'img')
                    img_path = img_path.replace('.json', '')
                    img_path = img_path.replace("./", "")
                    img_path = os.path.join(root_dir, img_path)

                    # Extract file type - test, train, val
                    ttv_type = os.path.dirname(filepath)
                    ttv_type = os.path.dirname(ttv_type)
                    ttv_type = ttv_type[2:]
                    
                    # Add the extracted data to the DataFrame
                    fracdata = fracdata._append({'image_file_name': filename, 'File_path': img_path, 'ttv_type': ttv_type, 'Target': target, 'height': height, 'width': width, 'dataset': 'frac'}, ignore_index=True)
            except Exception as e:
                print(f'Cannot extract json values for {filename}: {e}')
        else:
            print(f'{filename} not a json')


In [None]:
print(fracdata)

In [None]:
print("Body parts:")
print(fracdata['Target'].unique())

### UNIFEST Dataset

#### Collate images

In [None]:
os.chdir(root_dir)
os.getcwd()

os.makedirs(total_dir, exist_ok=True)

uni_train = os.path.join(uni_dir, 'train')
uni_test = os.path.join(uni_dir, 'test')
frac_train = os.path.join(frac_dir, 'train')
frac_test = os.path.join(frac_dir, 'test')
frac_val = os.path.join(frac_dir, 'val')
total_train = os.path.join(total_dir, 'train')
total_test = os.path.join(total_dir, 'test')
total_val = os.path.join(total_dir, 'val')


In [None]:
import shutil

# function to move images
def move_images(source_root, destination_root):
    for root, dirs, files in os.walk(source_root):
        os.makedirs(destination_root, exist_ok=True)
        
        # Move each file to the destination folder
        for file_name in files:
            # Ignore hidden/.DStore files
            if file.startswith("."):
                continue
            
            source_file_path = os.path.join(root, file_name)
            destination_file_path = os.path.join(destination_root, file_name)
            
            # Move the file
            try:
                shutil.move(source_file_path, destination_file_path)
            except Exception as e:
                print(f'Could not move file {file_name}, encountered error {e}')


In [None]:
# UNI dataset
# move images from 'uni' archive folder to new total_images/train folder

move_images(uni_train, total_train)

os.chdir(total_train)


In [None]:
# number of train dicoms
!find . -type f -name "*.dcm" | wc -l


In [None]:

uni_val_no = int(round(0.125*1738, 0))
print(f'Number of UNI training files for validation: {uni_val_no}')

In [None]:
# Take some of the UNI training set and use for validation:

import random

if os.path.exists(total_val):
    print('UNIFEST validation set already created')
else:
    print('Creating UNIFEST validation set...')
    os.makedirs(total_val, exist_ok=True)
    dirContents = os.listdir(total_train)
    selected_files = random.sample(dirContents, uni_val_no)
    
    for file_name in selected_files:
        source_file_path = os.path.join(total_train, file_name)
        destination_file_path = os.path.join(total_val, file_name)
        shutil.move(source_file_path, destination_file_path)
    
    print(f"Moved {len(selected_files)} files to {total_val}")


In [None]:
# Move the rest of the files over

move_images(uni_test, total_test)
move_images(frac_train, total_train)
move_images(frac_test, total_test)
move_images(frac_val, total_val)


#### Get labels for UNI train set

In [None]:

os.chdir(total_dir)
os.getcwd()

uni_data = pd.DataFrame(columns=['image_file_name', 'Target', 'height', 'width', 'File_path', 'ttv_type', 'dataset'])

data_df = pd.read_csv(f'{uni_dir}/train.csv')
data_df = data_df.rename(columns = {'SOPInstanceUID' : 'image_file_name'})
data_df.head

for root, dirs, filenames in os.walk("."):

    for file in filenames:      
        if file.startswith("."):
            continue

        filepath = os.path.join(root, file)
        filename, file_extension = os.path.splitext(file)
        filename = filename[:-2]
        
        if file_extension == '.dcm':
            
            # Extract 'body_part'
            target_row = data_df[data_df['image_file_name'] == filename]
            if not target_row.empty:
                target = target_row['Target'].values[0]
            else:
                target = None
                
            # Get image dimensions
            try:
                dicom = pydicom.dcmread(filepath)
                height = dicom.Rows
                width = dicom.Columns
            except:
                print(f'Could not open dicom {filename}')
                height = None
                width = None

            # Extract file type - test, train, val
            if 'test' in str(filepath):
                ttv_type = 'test'
            elif 'train' in str(filepath):
                ttv_type = 'train'
            elif 'val' in str(filepath):
                ttv_type = 'val'
            else:
                ttv_type = None

            filepath = filepath.replace("./", "")
            filepath = os.path.join(root_dir, filepath)
            
            # Add the extracted data to the DataFrame
            uni_data = uni_data._append({'image_file_name': filename, 'File_path': filepath, 'ttv_type': ttv_type, 'Target': target, 'height': height, 'width': width, 'dataset': 'uni'}, ignore_index=True)

print(uni_data)

In [None]:
print('Unique values before remapping:')
print(uni_data['Target'].unique())
      
train_data = uni_data[uni_data['ttv_type'] != 'test']
train_data['Target'] = train_data['Target'].str.replace(' ', '').astype(int)
train_data['Target'] = train_data['Target'].apply(lambda x: 22 if x > 21 else x)

uni_data.update(train_data)

print(f'\nUnique values after remapping:')
print(uni_data['Target'].unique())

uni_data['Target'] = uni_data['Target'].fillna('None')
uni_data

In [None]:
body_dict = {
    0: "abdomen",
    1: "akle",
    2: "cervical spine",
    3: "chest",
    4: "clavicles",
    5: "elbow",
    6: "feet",
    7: "finger",
    8: "forearm",
    9: "hand",
    10: "hip",
    11: "knee",
    12: "leg",
    13: "lumbar spine",
    14: "others",
    15: "pelvis",
    16: "shoulder",
    17: "sinus",
    18: "skull",
    19: "thigh",
    20: "thoracic spine",
    21: "wrist"
}


In [None]:
uni_data['Target'] = uni_data['Target'].map(body_dict)
print(uni_data['Target'].unique())
uni_data

#### Data Cleaning

In [None]:
# confirm image totals
os.chdir(total_dir)
!du sh "test"
!du sh "train"
!du sh "val"

In [None]:
# Resize all jpgs and resave
# remove jsons
from PIL import Image

for root, dirs, files in os.walk(total_dir):
    for file in files:
        
        file_path = os.path.join(root, file)
        filename, file_extension = os.path.splitext(file)
        if file_extension.lower() == '.jpg' or file_extension.lower() == '.jpeg':
            try:
                img = Image.open(file_path)
                img_resized = img.resize((255, 255), Image.LANCZOS)
                img_resized.save(file_path)
            except Exception as e:
                print(f"Error converting {file_path}: {e}") 
        elif file_extension =='.json':
            os.remove(file_path)


In [None]:
# Convert DICOMs to jpgs
import matplotlib.pyplot as plt

for root, dirs, files in os.walk(total_dir):
    for file in files:
        file_path = os.path.join(root, file)
        filename, file_extension = os.path.splitext(file)
        if file_extension.lower() == '.dcm':
            try:
                dcm = pydicom.dcmread(file_path)
                pixel_array = dcm.pixel_array
                new_filename = filename + ".jpg"
                new_filename = new_filename.replace('-c', "")

                new_file_path = os.path.join(root, new_filename)
                plt.imsave(new_file_path, pixel_array, cmap='gray')
                
                os.remove(file_path)
                
            except Exception as e:
                print(f"Error converting {file_path}: {e}")


In [None]:
# Re-confirm image totals
!du sh "test"
!du sh "train"
!du sh "val"

In [None]:
import os
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np

def plot_mean_image_intensity_histogram(directory, bins=50):
    mean_intensities = []

    # Walk through each directory, subdirectory, and file
    for root, dirs, files in os.walk(directory):
        for filename in files:
            if filename.lower().endswith(('.jpg', '.jpeg')):
                filepath = os.path.join(root, filename)
                try:
                    # Open the image
                    img = Image.open(filepath).convert('L')  # Convert image to grayscale
                    # Convert image to numpy array
                    img_array = np.array(img)
                    
                    # Check if img_array is empty or has unexpected shape
                    if img_array.size == 0:
                        print(f"Image array is empty: {filepath}")
                        continue
                    
                    # Calculate the mean intensity
                    mean_intensity = img_array.mean()
                    
                    # Collect mean intensity value
                    mean_intensities.append(mean_intensity)
                except Exception as e:
                    print(f"Error processing {filepath}: {e}")
                    continue

    # Debugging print
    print(f"Number of images processed: {len(mean_intensities)}")
    print(f"Mean intensities range from {min(mean_intensities, default=0)} to {max(mean_intensities, default=0)}")

    # Plot the histogram of mean intensity values
    plt.figure(figsize=(10, 6))
    plt.hist(mean_intensities, bins=bins, color='blue', edgecolor='black')
    plt.title('Histogram of Mean Image Intensity Values')
    plt.xlabel('Mean Intensity Value')
    plt.ylabel('Frequency')
    plt.show()

plot_mean_image_intensity_histogram(total_dir)


### Image Augmentation

In [None]:
image_files = [f for f in os.listdir(total_train) if f.lower().endswith(('.jpg', '.jpeg'))]
for filename in image_files:
    img_path = os.path.join(total_train, filename)
    img = Image.open(img_path)
    img.convert('L')
    
    img.save(img_path)

In [None]:
def showimg(original_img, augmented_img):
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))
    
    # Display Original Image
    ax[0].imshow(original_img)
    ax[0].set_title('Original')
    ax[0].axis('off')
    
    # Display Augmented Image
    ax[1].imshow(augmented_img)
    ax[1].set_title('Augmented')
    ax[1].axis('off')
    
    plt.show()

In [None]:
import os
import random
from PIL import Image
import numpy as np
import skimage.util

def random_rotation(img):
    """
    Rotate image by a random direction of 6 degrees
    """
    direction = random.choice([1, -1])
    return img.rotate(6 * direction)

def random_noise(img):
    """
    Add some noise to the image
    """
    img_array = np.array(img)
    noisy_img_array = skimage.util.random_noise(img_array, mode='gaussian', var=0.001)
    return Image.fromarray((noisy_img_array * 255).astype(np.uint8))

def horizontal_flip(img):
    """
    Flip image horizontally
    """
    return img.transpose(Image.FLIP_LEFT_RIGHT)

def zoom(img, lim=12):
    """
    Zoom image to account for rotation
    """
    width, height = img.size
    img = img.crop((lim, lim, width - lim, height - lim))
    return img.resize((width, height), Image.LANCZOS)

def apply_image_augmentation(img, scaling_size):
    """
    Applying augmentation to image
    """
    transformations = [horizontal_flip, random_rotation, zoom]
    for transform in transformations:
        img = transform(img)
    
    return img

In [None]:
# Get list of all image files in the directory
image_files = [f for f in os.listdir(total_train) if f.lower().endswith(('.jpg', '.jpeg'))]
total_images = len(image_files)

# Calculate the number of images to replace
num_images_to_replace = int(total_images * 0.1)

# Randomly select images to replace
images_to_replace = random.sample(image_files, num_images_to_replace)

for filename in images_to_replace:
    img_path = os.path.join(total_train, filename)
    img = Image.open(img_path)
    
    # Apply augmentation
    augmented_img = apply_image_augmentation(img, 1.5)
    showimg(img, augmented_img)
    
    # Save augmented image (replacing the original one)
    augmented_img.save(img_path)


### Plots

In [None]:
total_data = pd.concat([fracdata, uni_data], ignore_index=True)
total_data = total_data.dropna()
total_data = total_data[total_data['Target'] != 'nan']

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

def plot_histogram(df, column_name, plot_type, bins=10):
    # Convert column to string type to avoid TypeError
    data = df[column_name].astype(str)
    
    # Plot the histogram
    plt.figure(figsize=(10, 6))
    plt.hist(data, bins=bins, edgecolor='black')

    ax = plt.gca()
    for tick in ax.get_xticklabels():
        tick.set_rotation(45)
        tick.set_horizontalalignment('right')
        tick.set_y(tick.get_position()[1] - 0.05)
    
    # Set the title and labels
    plt.title(f'Count of {plot_type} Types - Total')
    plt.xlabel(plot_type)
    #plt.xticks(rotation=45)
    plt.ylabel('Frequency')
    plt.show()

plot_histogram(total_data, 'Target', 'Body Part', bins=10)


In [None]:
palette = {'uni': '#1f77b4', 'frac': '#ff7f0e'}

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

def plot_split(df, body_part_column, split_by_column, palette):
    # Convert relevant columns to string type to avoid TypeError
    df[body_part_column] = df[body_part_column].astype(str)
    df[split_by_column] = df[split_by_column].astype(str)
    
    # Plot the count plot
    plt.figure(figsize=(10, 6))
    sns.countplot(data=df, x=body_part_column, hue=split_by_column, palette=palette)
    
    # Rotate x-axis labels and adjust the offset
    ax = plt.gca()
    for tick in ax.get_xticklabels():
        tick.set_rotation(45)
        tick.set_horizontalalignment('right')
        tick.set_y(tick.get_position()[1] - 0.05)
    
    # Set the title and labels
    plt.title(f'Count of Body Part Types by Dataset')
    plt.xlabel('Body Part')
    plt.ylabel('Frequency')
    plt.show()

plot_split(total_data, 'Target', 'dataset', palette)


### CNN

In [None]:
train_df = total_data[total_data['ttv_type'] == 'train']
train_df = train_df.dropna(subset=['Target'])

In [None]:
print(train_df['Target'].unique())

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.optimizers import SGD
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, precision_recall_fscore_support

In [None]:

le = LabelEncoder()
y_train = le.fit_transform(train_df['Target'])

x_train = []
for file in train_df['image_file_name']:
    if not file.endswith('.jpg'):
        file = file + '.jpg'
    img_path = os.path.join(total_train, file)
    img = load_img(img_path, target_size=(255, 255))
    img = img_to_array(img)
    x_train.append(img)
x_train = np.array(x_train)

val_df = total_data[total_data['ttv_type'] == 'val']
val_df = val_df.dropna(subset=['Target'])

x_val = []
for file in val_df['image_file_name']:
    if not file.endswith('.jpg'):
        file = file + '.jpg'
    img_path = os.path.join(total_val, file)
    img = load_img(img_path, target_size=(255, 255))
    img = img_to_array(img)
    x_val.append(img)
x_val = np.array(x_val)

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2, random_state=42)

In [None]:

model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(255, 255, 3)),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    Flatten(),
    Dense(128, activation='relu'),  
    Dense(64, activation='relu'),
    Dense(64, activation='relu'),  
    Dense(32, activation='relu'),  
    Dense(len(le.classes_), activation='softmax')
])

optimizer = SGD(learning_rate=1e-4, momentum=0.9)

model.compile(optimizer=optimizer,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
history = model.fit(x_train, y_train, epochs=10, validation_data=(x_val, y_val))

# Plot training and validation accuracy
plt.figure(figsize=(12, 6))

plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')

plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(loc='best')
plt.grid(True)

plt.show()


In [None]:
test_df = total_data[total_data['ttv_type'] == 'test']
test_df = test_df.dropna(subset=['Target'])

test_images = []
test_labels = []
test_filenames = []

for _, row in test_df.iterrows():
    img_path = os.path.join(total_test, row['image_file_name'])
    img = load_img(img_path, target_size=(255, 255))
    img = img_to_array(img)
    test_images.append(img)
    test_labels.append(row['Target'])
    test_filenames.append(row['image_file_name'])

test_labels = le.transform(test_labels)
test_images = np.array(test_images)
test_loss, test_accuracy = model.evaluate(test_images, test_labels)
print(f'Test accuracy: {test_accuracy}\n')

# Make predictions
predictions = model.predict(test_images)
predicted_labels = np.argmax(predictions, axis=1)
predicted_labels = le.inverse_transform(predicted_labels)
pred_df = pd.DataFrame({'filename': test_filenames, 'predicted_labels': predicted_labels})

empty_count = pred_df['predicted_labels'].isna().sum()
print(f'Number of empty predictions: {empty_count}')

# Calculate precision, recall, and F1 score
target_names = le.classes_
report = classification_report(test_labels, predicted_labels, target_names=target_names)
print(report)

precision, recall, f1, _ = precision_recall_fscore_support(test_labels, predicted_labels, average='weighted')
print(f'Precision: {precision}\nRecall: {recall}\nF1 Score: {f1}\n')

# Save the model
model.save("cnn.keras")
model.save("cnn.h5")

In [None]:
#!pip install pydot
#!pip install graphviz

In [None]:
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import load_img

def show_dcm_predict(index, pred_df, test_df):
    filename = pred_df['filename'].iloc[index]
    
    img = load_img(filename)
    plt.imshow(img)
    plt.axis('off')
    plt.show()
    
    print(f'Filename: {filename}')
    row = pred_df.iloc[index]
    
    if row is not None:
        predicted_label = row['predicted_labels']
    else:
        print('No predicted label found')

    actual_label_row = test_df[test_df['image_file_name'] == os.path.basename(filename)]
    
    if not actual_label_row.empty:
        actual_label = actual_label_row['Target'].values[0]
    else:
        actual_label = 'No actual label found'
    
    print(f'Predicted label: {predicted_label}')
    print(f'Actual label: {actual_label}')
    

In [None]:
os.chdir(total_test)
    
predict_label_1 = show_dcm_predict(44, pred_df, test_df)