In [1]:
def cropBorders( img, l=0.01, r=0.01, u=0.04, d=0.04):

    """
    This function crops a specified percentage of border from
    each side of the given image. Default is 1% from the top,
    left and right sides and 4% from the bottom side.

    Parameters
    ----------
    img : {numpy.ndarray}
        The image to crop.

    Returns
    -------
    cropped_img: {numpy.ndarray}
        The cropped image.
    """

    try:
        nrows, ncols = img.shape

        # Get the start and end rows and columns
        l_crop = int(ncols * l)
        r_crop = int(ncols * (1 - r))
        u_crop = int(nrows * u)
        d_crop = int(nrows * (1 - d))

        cropped_img = img[u_crop:d_crop, l_crop:r_crop]

    except Exception as e:
        # logger.error(f'Unable to cropBorders!\n{e}')
        print((f"Unable to get cropBorders!\n{e}"))

    return cropped_img


def minMaxNormalise(img):

    """
    This function does min-max normalisation on
    the given image.

    Parameters
    ----------
    img : {numpy.ndarray}
        The image to normalise.

    Returns
    -------
    norm_img: {numpy.ndarray}
        The min-max normalised image.
    """

    try:
        norm_img = (img - img.min()) / (img.max() - img.min())

    except Exception as e:
        # logger.error(f'Unable to minMaxNormalise!\n{e}')
        print((f"Unable to get minMaxNormalise!\n{e}"))

    return norm_img

def globalBinarise(img, thresh, maxval):

    """
    This function takes in a numpy array image and
    returns a corresponding mask that is a global
    binarisation on it based on a given threshold
    and maxval. Any elements in the array that is
    greater than or equals to the given threshold
    will be assigned maxval, else zero.

    Parameters
    ----------
    img : {numpy.ndarray}
        The image to perform binarisation on.
    thresh : {int or float}
        The global threshold for binarisation.
    maxval : {np.uint8}
        The value assigned to an element that is greater
        than or equals to `thresh`.


    Returns
    -------
    binarised_img : {numpy.ndarray, dtype=np.uint8}
        A binarised image of {0, 1}.
    """

    try:
        binarised_img = np.zeros(img.shape, np.uint8)
        binarised_img[img >= thresh] = maxval

    except Exception as e:
        # logger.error(f'Unable to globalBinarise!\n{e}')
        print((f"Unable to globalBinarise!\n{e}"))

    return binarised_img

def editMask(mask, ksize=(23, 23), operation="open"):

    """
    This function edits a given mask (binary image) by performing
    closing then opening morphological operations.

    Parameters
    ----------
    mask : {numpy.ndarray}
        The mask to edit.
    ksize : {tuple}
        Size of the structuring element.
    operation : {str}
        Either "open" or "close", each representing open and close
        morphological operations respectively.

    Returns
    -------
    edited_mask : {numpy.ndarray}
        The mask after performing close and open morphological
        operations.
    """

    try:
        kernel = cv2.getStructuringElement(shape=cv2.MORPH_RECT, ksize=ksize)

        if operation == "open":
            edited_mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
        elif operation == "close":
            edited_mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)

        # Then dilate
        edited_mask = cv2.morphologyEx(edited_mask, cv2.MORPH_DILATE, kernel)

    except Exception as e:
        # logger.error(f'Unable to editMask!\n{e}')
        print((f"Unable to get editMask!\n{e}"))

    return edited_mask

def sortContoursByArea(contours, reverse=True):

    """
    This function takes in list of contours, sorts them based
    on contour area, computes the bounding rectangle for each
    contour, and outputs the sorted contours and their
    corresponding bounding rectangles.

    Parameters
    ----------
    contours : {list}
        The list of contours to sort.

    Returns
    -------
    sorted_contours : {list}
        The list of contours sorted by contour area in descending
        order.
    bounding_boxes : {list}
        The list of bounding boxes ordered corresponding to the
        contours in `sorted_contours`.
    """

    try:
        # Sort contours based on contour area.
        sorted_contours = sorted(contours, key=cv2.contourArea, reverse=reverse)

        # Construct the list of corresponding bounding boxes.
        bounding_boxes = [cv2.boundingRect(c) for c in sorted_contours]

    except Exception as e:
        # logger.error(f'Unable to sortContourByArea!\n{e}')
        print((f"Unable to get sortContourByArea!\n{e}"))

    return sorted_contours, bounding_boxes

def xLargestBlobs(mask, top_x=None, reverse=True):

    """
    This function finds contours in the given image and
    keeps only the top X largest ones.

    Parameters
    ----------
    mask : {numpy.ndarray, dtype=np.uint8}
        The mask to get the top X largest blobs.
    top_x : {int}
        The top X contours to keep based on contour area
        ranked in decesnding order.


    Returns
    -------
    n_contours : {int}
        The number of contours found in the given `mask`.
    X_largest_blobs : {numpy.ndarray}
        The corresponding mask of the image containing only
        the top X largest contours in white.
    """
    try:
        # Find all contours from binarised image.
        # Note: parts of the image that you want to get should be white.
        contours, hierarchy = cv2.findContours(
            image=mask, mode=cv2.RETR_EXTERNAL, method=cv2.CHAIN_APPROX_NONE
        )

        n_contours = len(contours)

        # Only get largest blob if there is at least 1 contour.
        if n_contours > 0:

            # Make sure that the number of contours to keep is at most equal
            # to the number of contours present in the mask.
            if n_contours < top_x or top_x == None:
                top_x = n_contours

            # Sort contours based on contour area.
            sorted_contours, bounding_boxes = sortContoursByArea(
                contours=contours, reverse=reverse
            )

            # Get the top X largest contours.
            X_largest_contours = sorted_contours[0:top_x]

            # Create black canvas to draw contours on.
            to_draw_on = np.zeros(mask.shape, np.uint8)

            # Draw contours in X_largest_contours.
            X_largest_blobs = cv2.drawContours(
                image=to_draw_on,  # Draw the contours on `to_draw_on`.
                contours=X_largest_contours,  # List of contours to draw.
                contourIdx=-1,  # Draw all contours in `contours`.
                color=1,  # Draw the contours in white.
                thickness=-1,  # Thickness of the contour lines.
            )

    except Exception as e:
        # logger.error(f'Unable to xLargestBlobs!\n{e}')
        print((f"Unable to get xLargestBlobs!\n{e}"))

    return n_contours, X_largest_blobs

def applyMask( img, mask):

    """
    This function applies a mask to a given image. White
    areas of the mask are kept, while black areas are
    removed.

    Parameters
    ----------
    img : {numpy.ndarray}
        The image to mask.
    mask : {numpy.ndarray, dtype=np.uint8}
        The mask to apply.

    Returns
    -------
    masked_img: {numpy.ndarray}
        The masked image.
    """

    try:
        masked_img = img.copy()
        masked_img[mask == 0] = 0

    except Exception as e:
        # logger.error(f'Unable to applyMask!\n{e}')
        print((f"Unable to get applyMask!\n{e}"))

    return masked_img

def checkLRFlip( mask):

    """
    This function checks whether or not an image needs to be
    flipped horizontally (i.e. left-right flip). The correct
    orientation is the breast being on the left (i.e. facing
    right).

    Parameters
    ----------
    mask : {numpy.ndarray, dtype=np.uint8}
        The corresponding mask of the image to flip.

    Returns
    -------
    LR_flip : {boolean}
        True means need to flip horizontally,
        False means otherwise.
    """

    try:
        # Get number of rows and columns in the image.
        nrows, ncols = mask.shape
        x_center = ncols // 2
        y_center = nrows // 2

        # Sum down each column.
        col_sum = mask.sum(axis=0)
        # Sum across each row.
        row_sum = mask.sum(axis=1)

        left_sum = sum(col_sum[0:x_center])
        right_sum = sum(col_sum[x_center:-1])

        if left_sum < right_sum:
            LR_flip = True
        else:
            LR_flip = False

    except Exception as e:
        # logger.error(f'Unable to checkLRFlip!\n{e}')
        print((f"Unable to get checkLRFlip!\n{e}"))

    return LR_flip

def makeLRFlip( img):

    """
    This function flips a given image horizontally (i.e. left-right).

    Parameters
    ----------
    img : {numpy.ndarray}
        The image to flip.

    Returns
    -------
    flipped_img : {numpy.ndarray}
        The flipped image.
    """

    try:
        flipped_img = np.fliplr(img)
    except Exception as e:
        # logger.error(f'Unable to makeLRFlip!\n{e}')
        print((f"Unable to get makeLRFlip!\n{e}"))

    return flipped_img

def clahe( img, clip=2.0, tile=(8, 8)):

    """
    This function applies the Contrast-Limited Adaptive
    Histogram Equalisation filter to a given image.

    Parameters
    ----------
    img : {numpy.ndarray}
        The image to edit.
    clip : {int or floa}
        Threshold for contrast limiting.
    tile : {tuple (int, int)}
        Size of grid for histogram equalization. Input
        image will be divided into equally sized
        rectangular tiles. `tile` defines the number of
        tiles in row and column.

    Returns
    -------
    clahe_img : {numpy.ndarray, np.uint8}
        The CLAHE edited image, with values ranging from [0, 255]
    """

    try:
        # Convert to uint8.
        # img = skimage.img_as_ubyte(img)
        img = cv2.normalize(
            img,
            None,
            alpha=0,
            beta=255,
            norm_type=cv2.NORM_MINMAX,
            dtype=cv2.CV_32F,
        )
        img_uint8 = img.astype("uint8")
        #  img = cv2.normalize(
        #     img, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U
        # )

        clahe_create = cv2.createCLAHE(clipLimit=clip, tileGridSize=tile)
        clahe_img = clahe_create.apply(img_uint8)

    except Exception as e:
        # logger.error(f'Unable to clahe!\n{e}')
        print((f"Unable to get clahe!\n{e}"))

    return clahe_img

def pad( img):

    """
    This function pads a given image with black pixels,
    along its shorter side, into a square and returns
    the square image.

    If the image is portrait, black pixels will be
    padded on the right to form a square.

    If the image is landscape, black pixels will be
    padded on the bottom to form a square.

    Parameters
    ----------
    img : {numpy.ndarray}
        The image to pad.

    Returns
    -------
    padded_img : {numpy.ndarray}
        The padded square image, if padding was required
        and done.
    img : {numpy.ndarray}
        The original image, if no padding was required.
    """

    try:
        nrows, ncols = img.shape

        # If padding is required...
        if nrows != ncols:

            # Take the longer side as the target shape.
            if ncols < nrows:
                target_shape = (nrows, nrows)
            elif nrows < ncols:
                target_shape = (ncols, ncols)

            # pad.
            padded_img = np.zeros(shape=target_shape)
            padded_img[:nrows, :ncols] = img

        # If padding is not required...
        elif nrows == ncols:

            # Return original image.
            padded_img = img

    except Exception as e:
        # logger.error(f'Unable to pad!\n{e}')
        print((f"Unable to pad!\n{e}"))

    return padded_img

def rescale_image(image, new_size=(112, 112)):
    """
    Rescale an image to the specified size.

    Parameters:
    - image: Input image of data type CV_32F.
    - new_size: Tuple representing the target size (height, width).

    Returns:
    - Rescaled image.
    """
    # Ensure the image is in the range [0, 1]
    image = cv2.normalize(image, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)

    # Resize the image to the new size
    rescaled_image = cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)

    return rescaled_image

import cv2  # Assuming you have cv2 imported

def main_preprocess(img):
    new_image = cropBorders(img)
    norm_image = minMaxNormalise(new_image)
    bin_image = globalBinarise(norm_image, 0.1, 1)

    # Initialize edited_mask with the output of globalBinarise
    edited_mask = editMask(bin_image, ksize=(23, 23), operation="open")

    # Process edited_mask further if needed
    _, largest_mask = xLargestBlobs(edited_mask, top_x=1, reverse=True)
    
    masked_img = applyMask(norm_image, largest_mask)
    lr_flip = checkLRFlip(mask=largest_mask)

    if lr_flip:
        flipped_img = makeLRFlip(img=masked_img)
    elif not lr_flip:
        flipped_img = masked_img

    clahe_img = clahe(img=flipped_img, clip=2.0, tile=(8, 8))
    padded_img = pad(img=clahe_img)
    padded_img = cv2.normalize(
        padded_img,
        None,
        alpha=0,
        beta=255,
        norm_type=cv2.NORM_MINMAX,
        dtype=cv2.CV_32F,
    )
    rescaled_img = rescale_image(padded_img, new_size=(112, 112 ))
    processed_image = cv2.merge((rescaled_img, rescaled_img, rescaled_img))

    return processed_image

In [2]:
import os
import numpy as np
import pydicom
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pandas as pd

In [None]:
dicom_image = load_dicom_image("C:/Users/jbale/OneDrive - Queensland University of Technology/manifest-1696323463913/CBIS-DDSM/Calc-Test_P_00038_LEFT_CC/08-29-2017-DDSM-NA-96009/1.000000-full mammogram images-63992/P_00038_LEFT_CC_FULL.dcm")
test_img = main_preprocess(dicom_image)

In [3]:
train_calc = pd.read_csv("C:/Users/jbale/OneDrive - Queensland University of Technology/manifest-1696323463913/calc_case_description_train_set.csv")
test_calc = pd.read_csv("C:/Users/jbale/OneDrive - Queensland University of Technology/manifest-1696323463913/calc_case_description_test_set.csv")

train_mass = pd.read_csv("C:/Users/jbale/OneDrive - Queensland University of Technology/manifest-1696323463913/mass_case_description_train_set.csv")
test_mass = pd.read_csv("C:/Users/jbale/OneDrive - Queensland University of Technology/manifest-1696323463913/mass_case_description_test_set.csv")

In [4]:


# Extract everything up to the first '/' for file paths
train_calc['image file path'] = train_calc['image file path'].str.split('/').str[0]
train_calc['cropped image file path'] = train_calc['cropped image file path'].str.split('/').str[0]
train_calc['ROI mask file path'] = train_calc['cropped image file path'].str.split('/').str[0]

test_calc['image file path'] = test_calc['image file path'].str.split('/').str[0]
test_calc['cropped image file path'] = test_calc['cropped image file path'].str.split('/').str[0]
test_calc['ROI mask file path'] = test_calc['cropped image file path'].str.split('/').str[0]

train_mass['image file path'] = train_mass['image file path'].str.split('/').str[0]
train_mass['cropped image file path'] = train_mass['cropped image file path'].str.split('/').str[0]
train_mass['ROI mask file path'] = train_mass['cropped image file path'].str.split('/').str[0]                                                                                          


In [67]:
train_mass.head(5)

Unnamed: 0,patient_id,breast_density,left or right breast,image view,abnormality id,abnormality type,mass shape,mass margins,assessment,pathology,subtlety,image file path,cropped image file path,ROI mask file path
0,P_00001,3,LEFT,CC,1,mass,IRREGULAR-ARCHITECTURAL_DISTORTION,SPICULATED,4,MALIGNANT,4,Mass-Training_P_00001_LEFT_CC,Mass-Training_P_00001_LEFT_CC_1,Mass-Training_P_00001_LEFT_CC_1
1,P_00001,3,LEFT,MLO,1,mass,IRREGULAR-ARCHITECTURAL_DISTORTION,SPICULATED,4,MALIGNANT,4,Mass-Training_P_00001_LEFT_MLO,Mass-Training_P_00001_LEFT_MLO_1,Mass-Training_P_00001_LEFT_MLO_1
2,P_00004,3,LEFT,CC,1,mass,ARCHITECTURAL_DISTORTION,ILL_DEFINED,4,BENIGN,3,Mass-Training_P_00004_LEFT_CC,Mass-Training_P_00004_LEFT_CC_1,Mass-Training_P_00004_LEFT_CC_1
3,P_00004,3,LEFT,MLO,1,mass,ARCHITECTURAL_DISTORTION,ILL_DEFINED,4,BENIGN,3,Mass-Training_P_00004_LEFT_MLO,Mass-Training_P_00004_LEFT_MLO_1,Mass-Training_P_00004_LEFT_MLO_1
4,P_00004,3,RIGHT,MLO,1,mass,OVAL,CIRCUMSCRIBED,4,BENIGN,5,Mass-Training_P_00004_RIGHT_MLO,Mass-Training_P_00004_RIGHT_MLO_1,Mass-Training_P_00004_RIGHT_MLO_1


In [5]:
# Replace values in the 'pathology' column
train_calc['pathology'] = train_calc['pathology'].apply(lambda x: 'BENIGN' if x.startswith('BENIGN') else x)
test_calc['pathology'] = test_calc['pathology'].apply(lambda x: 'BENIGN' if x.startswith('BENIGN') else x)

train_mass['pathology'] = train_mass['pathology'].apply(lambda x: 'BENIGN' if x.startswith('BENIGN') else x)
test_mass['pathology'] = test_mass['pathology'].apply(lambda x: 'BENIGN' if x.startswith('BENIGN') else x)



In [34]:
print(train_mass.head(1))

  patient_id  breast_density left or right breast image view  abnormality id  \
0    P_00001               3                 LEFT         CC               1   

  abnormality type                          mass shape mass margins  \
0             mass  IRREGULAR-ARCHITECTURAL_DISTORTION   SPICULATED   

   assessment  pathology  subtlety                image file path  \
0           4  MALIGNANT         4  Mass-Training_P_00001_LEFT_CC   

           cropped image file path               ROI mask file path  
0  Mass-Training_P_00001_LEFT_CC_1  Mass-Training_P_00001_LEFT_CC_1  


In [6]:
import matplotlib.pyplot as plt

In [8]:
def mask_preprocess(ROI):
    mask = cropBorders(ROI)
    mask = minMaxNormalise(mask)
    mask = globalBinarise(mask, 0.1, 1)
    mask = pad(mask)
    mask = rescale_image(mask, new_size=(112, 112))
    processed_mask = cv2.merge((mask, mask, mask))

    
    return processed_mask

### Trying with data augmentation

In [10]:
# Example usage
import os
dataset_directory = r'C:\Users\jbale\OneDrive - Queensland University of Technology\manifest-1696323463913\CBIS-DDSM'
batch_size = 16

# Get all file paths
file_paths = []
for root, dirs, files in os.walk(dataset_directory):
    for file in files:
        if file.endswith(".dcm"):
            file_paths.append(os.path.join(root, file))

# Filter file paths to keep only those containing "FULL" to filter for mammograms
mammogram_paths = [path for path in file_paths if "FULL" in os.path.basename(path)]

mask_paths = [path for path in file_paths if "MASK" in os.path.basename(path)]

crop_paths = [path for path in file_paths if "CROP" in os.path.basename(path)]


# Training loop
train_file_paths_mam = [path for path in mammogram_paths if "Training" in path]
train_file_paths_mask = [path for path in mask_paths if "Training" in path]
train_file_paths_crop = [path for path in crop_paths if "Training" in path]


# Testing loop
test_file_paths_mam = [path for path in mammogram_paths if "Test" in path]
test_file_paths_mask = [path for path in mask_paths if "Test" in path]
test_file_paths_crop = [path for path in crop_paths if "Test" in path]

In [11]:
calc= [path for path in train_file_paths_mam if "Calc" in path]
calc_crop = calc= [path for path in train_file_paths_crop if "Calc" in path]

In [12]:
mass= [path for path in train_file_paths_mam if "Mass" in path]
mass_crop= [path for path in train_file_paths_crop if "Mass" in path]

In [13]:
mass_test_crop = mass_test= [path for path in test_file_paths_crop if "Mass" in path]

In [14]:
mass_test= [path for path in test_file_paths_mam if "Mass" in path]

In [15]:
len(mass_test)

361

In [17]:
transformed_paths_mass = [os.path.basename(path).split('_FULL')[0] for path in mass]


In [18]:
transformed_paths_mass_test = [os.path.basename(path).split('_FULL')[0] for path in mass_test]

In [21]:
len(calc)

1546

In [22]:
len(mass)

1231

### Mass

In [23]:
# Function to load DICOM images
def load_dicom_image(path):
    dcm = pydicom.dcmread(path)
    return dcm.pixel_array

In [24]:
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.05,
    height_shift_range=0.2,
    shear_range=0.1,
    zoom_range=0.1,
    fill_mode='nearest',
    rescale=1./255  # rescale the image values to be in the range [0, 1]
)

## example load function

In [45]:
def load_and_preprocess_batch_train(mass, labels_df, batch_size=20):
    # Create a dictionary to map 'mass' values to 'pathology' labels
    path_to_label = dict(zip(labels_df['image file path'], labels_df['pathology']))

    data_dict = {"File Path": [], "Label": []}  # Initialize a dictionary to store data

    np.random.shuffle(mass)
    batch_paths = mass[:batch_size]  # Take a sample of 25 mass values

    for file_path in batch_paths:
        # Get the corresponding label based on the matched value from 'mass'
        pathology = path_to_label.get(file_path, "unknown")

        # Map 'BENIGN' to 0 and 'MALIGNANT' to 1
        label = 1 if pathology == "MALIGNANT" else 0

        data_dict["File Path"].append(file_path)
        data_dict["Label"].append(label)

    return data_dict  # Return the data as a dictionary

# Usage example:
sample_data = load_and_preprocess_batch_train(transformed_paths_mass,train_mass , batch_size=20)
data_df = pd.DataFrame(sample_data)
print(data_df)



                          File Path  Label
0   Mass-Training_P_00475_RIGHT_MLO      0
1    Mass-Training_P_00836_LEFT_MLO      1
2   Mass-Training_P_00080_RIGHT_MLO      1
3    Mass-Training_P_00782_RIGHT_CC      1
4   Mass-Training_P_01654_RIGHT_MLO      1
5    Mass-Training_P_01250_RIGHT_CC      0
6    Mass-Training_P_00430_LEFT_MLO      0
7   Mass-Training_P_00313_RIGHT_MLO      1
8    Mass-Training_P_01754_RIGHT_CC      1
9    Mass-Training_P_01652_RIGHT_CC      1
10  Mass-Training_P_00539_RIGHT_MLO      1
11   Mass-Training_P_00134_LEFT_MLO      1
12   Mass-Training_P_00330_LEFT_MLO      0
13  Mass-Training_P_01755_RIGHT_MLO      0
14   Mass-Training_P_01641_LEFT_MLO      1
15    Mass-Training_P_00298_LEFT_CC      0
16  Mass-Training_P_01103_RIGHT_MLO      1
17  Mass-Training_P_01280_RIGHT_MLO      0
18   Mass-Training_P_00797_RIGHT_CC      0
19   Mass-Training_P_00092_LEFT_MLO      1


## subset (subtlety) load function

In [53]:
def load_and_preprocess_batch_train_subtelty(original_mass_train, labels_df, datagen, batch_size=batch_size):
    # Create a dictionary to map 'mass' values to 'pathology' labels
    path_to_label = dict(zip(labels_df['image file path'], labels_df['pathology']))

    while True:
        np.random.shuffle(original_mass_train)
        train_file_paths = []  # Initialize the list to store training file paths

        for i in range(0, len(original_mass_train), batch_size):
            batch_paths = original_mass_train[i:i + batch_size]
            image_list = []
            labels = []

            for file_path in batch_paths:
                # Use the original file path here for loading the DICOM image
                original_file_path = file_path
                
                dicom_image = load_dicom_image(original_file_path)

                # Apply data augmentation using the provided datagen object
                processed_image = main_preprocess(dicom_image)
                processed_image = datagen.random_transform(processed_image)

                # Transform the file path for label mapping
                transformed_path = transform_file_path(original_file_path)

                subtlety = labels_df.loc[labels_df['image file path'] == transformed_path]['subtlety'].values
                if len(subtlety) > 0:
                    subtlety = subtlety[0]
                else:
                    continue  # Skip this iteration since 'subtelty' is not available for the current file

                # Get the 'abnormality id' value
                abnormality_id = labels_df.loc[labels_df['image file path'] == transformed_path]['abnormality id'].values
                if len(abnormality_id) > 0:
                    abnormality_id = abnormality_id[0]
                else:
                    continue  # Skip this iteration since 'abnormality id' is not available for the current file

                pathology = path_to_label.get(transformed_path, "unknown")

                # Map 'BENIGN' to 0 and 'MALIGNANT' to 1
                label = 1 if pathology == "MALIGNANT" else 0

                # Include only data with 'subtlety' 4 or 5 and 'abnormality id' 1 for training
                if subtlety in [4, 5] and abnormality_id == 1:
                    image_list.append(processed_image)
                    labels.append(label)
                    train_file_paths.append(original_file_path)  # Store the file path

            if image_list:
                yield np.array(image_list), np.array(labels)



## subset (subtlety) load function TEST DATA

In [None]:
def load_and_preprocess_batch_test_subtelty(original_mass_test, labels_df, batch_size=batch_size):
    while True:
        # Shuffle the testing data at the beginning of each epoch
        np.random.shuffle(original_mass_test)

        for i in range(0, len(original_mass_test), batch_size):
            batch_paths = original_mass_test[i:i + batch_size]
            image_list = []
            labels = []

            for file_path in batch_paths:
                # Use the original file path here for loading the DICOM image
                original_file_path = file_path

                # Transform the file path for label mapping
                transformed_path = transform_file_path(original_file_path)

                subtlety = labels_df.loc[labels_df['image file path'] == transformed_path]['subtlety'].values
                if len(subtlety) > 0:
                    subtlety = subtlety[0]
                else:
                    continue  # Skip this iteration since 'subtelty' is not available for the current file

                # Get the 'abnormality id' value
                abnormality_id = labels_df.loc[labels_df['image file path'] == transformed_path]['abnormality id'].values
                if len(abnormality_id) > 0:
                    abnormality_id = abnormality_id[0]
                else:
                    continue  # Skip this iteration since 'abnormality id' is not available for the current file

                pathology = path_to_label.get(transformed_path, "unknown")

                # Map 'BENIGN' to 0 and 'MALIGNANT' to 1
                label = 1 if pathology == "MALIGNANT" else 0

                # Include only data with 'subtlety' 4 or 5 and 'abnormality id' 1 for testing
                if subtlety in [4, 5] and abnormality_id == 1:
                    dicom_image = load_dicom_image(original_file_path)
                    processed_image = main_preprocess(dicom_image)
                    image_list.append(processed_image)
                    labels.append(label)

            if image_list:
                yield np.array(image_list), np.array(labels)

## generators for subtelty 

In [55]:
# Create a data generator for training data
# split the test set into validation and test sets
val_file_paths, test_file_paths = train_test_split(mass_test, test_size=0.5, random_state=42)

train_generator_subtelty = load_and_preprocess_batch_train_subtelty(mass, train_mass, datagen, batch_size=batch_size)

# Create a data generator for testing data
val_generator_subtelty = load_and_preprocess_batch_test_subtelty(val_file_paths, test_mass, batch_size=batch_size)

# Create a data generator for testing data
test_generator_subtelty = load_and_preprocess_batch_test_subtelty(test_file_paths, test_mass, batch_size=batch_size)

# Calculate steps per epoch based on the length of your training and test data and batch size
steps_per_epoch_train = len(mass) // batch_size
steps_per_epoch_val = len(val_file_paths) // batch_size
steps_per_epoch_test = len(test_file_paths) // batch_size





In [56]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, MaxPooling2D
from tensorflow.keras.optimizers import Adam

# Create ResNet50base model
base_model_res = ResNet50(weights='imagenet', include_top=False, input_shape=(112, 112, 3))



# Build the model
model_res = Sequential([
    base_model_res,
    GlobalAveragePooling2D(),
    Dense(1, activation='sigmoid')  # Binary classification, so the output layer has one neuron with sigmoid activation
])


custom_learning_rate = 0.001  # adjust this

# Create a custom optimizer with the desired learning rate
custom_optimizer = Adam(learning_rate=custom_learning_rate)
# Compile the model
model_res.compile(optimizer=custom_optimizer, loss='binary_crossentropy', metrics=['accuracy'])

# Display model summary
model_res.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50 (Functional)       (None, 4, 4, 2048)        23587712  
                                                                 
 global_average_pooling2d_4   (None, 2048)             0         
 (GlobalAveragePooling2D)                                        
                                                                 
 dense_5 (Dense)             (None, 1)                 2049      
                                                                 
Total params: 23,589,761
Trainable params: 23,536,641
Non-trainable params: 53,120
_________________________________________________________________


In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard

# Define a callback to display training progress
class TrainingProgressCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        print(f"Epoch {epoch + 1}/{self.params['epochs']}, Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}, Val Loss: {logs['val_loss']:.4f}, Val Accuracy: {logs['val_accuracy']:.4f}")

# Create the callback
progress_callback = TrainingProgressCallback()

# Train the model with the callback
history = model_res.fit(
    train_generator_subtelty,
    steps_per_epoch=steps_per_epoch_train,
    validation_data=val_generator_subtelty,
    validation_steps=steps_per_epoch_val,
    epochs=10,
    callbacks=[progress_callback]  # Add the progress callback
)


import matplotlib.pyplot as plt

# Plot training and validation accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Plot training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()


from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

# Create empty lists to store true labels and predicted labels
true_labels = []
predicted_labels = []

# Loop through the test generator and make predictions
for batch in test_generator_subtelty:
    images, labels = batch
    predictions = model_res.predict(images)
    true_labels.extend(labels)
    predicted_labels.extend(np.round(predictions).flatten())

# Compute the confusion matrix
confusion = confusion_matrix(true_labels, predicted_labels)
print("Confusion Matrix:")
print(confusion)

# Print a classification report
report = classification_report(true_labels, predicted_labels, target_names=["BENIGN", "MALIGNANT"])
print("Classification Report:")
print(report)


Epoch 1/10

## Appendix

### Triplet loss network

In [28]:
from itertools import combinations
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet

def generate_triplet_batches(original_mass_train, labels_df, datagen, batch_size=batch_size):
    path_to_label = dict(zip(labels_df['image file path'], labels_df['pathology']))

    while True:
        np.random.shuffle(original_mass_train)
        
        for i in range(0, len(original_mass_train), batch_size):
            batch_paths = original_mass_train[i:i + batch_size]
            anchor_images = []
            positive_images = []
            negative_images = []
            
            skip_batch = False  # Flag to skip the current batch
            for file_path in batch_paths:
                dicom_image = load_dicom_image(file_path)
                processed_image = main_preprocess(dicom_image)
                processed_image = datagen.random_transform(processed_image)
                transformed_path = transform_file_path(file_path)
                pathology = path_to_label.get(transformed_path, "unknown")
                label = 1 if pathology == "MALIGNANT" else 0
                
                if label == 1:  # MALIGNANT
                    anchor_images.append(processed_image)
                    while True:
                        random_index = np.random.randint(len(batch_paths))
                        positive_image = load_dicom_image(batch_paths[random_index])
                        if random_index != i:
                            break
                    positive_image = main_preprocess(positive_image)
                    positive_image = datagen.random_transform(positive_image)
                    positive_images.append(positive_image)
                    while True:
                        random_index = np.random.randint(len(batch_paths))
                        negative_image = load_dicom_image(batch_paths[random_index])
                        if random_index != i:
                            break
                    negative_image = main_preprocess(negative_image)
                    negative_image = datagen.random_transform(negative_image)
                    negative_images.append(negative_image)
                
                if not anchor_images or not positive_images or not negative_images:
                    # If any of the lists are empty, skip this batch
                    skip_batch = True
                    break
            
            if not skip_batch:
                yield [np.array(anchor_images), np.array(positive_images), np.array(negative_images)], None


In [25]:
target_shape = (112,112)

base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)

flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

In [26]:
class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

In [21]:
class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]

In [29]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(generate_triplet_batches(mass, train_mass, datagen, batch_size=8), epochs=10, validation_data=load_and_preprocess_batch_test(mass_test, test_mass, batch_size=8))

Epoch 1/10
   6549/Unknown - 68067s 10s/step - loss: 0.5041

KeyboardInterrupt: 

In [None]:
# Create a Binary Classification Head
binary_classification_head = layers.Dense(1, activation="sigmoid")(embedding.output)

#Create the Binary Classification Model
binary_classification_model = Model(inputs=embedding.input, outputs=binary_classification_head)

# Compile the Binary Classification Model
binary_classification_model.compile(
    optimizer=optimizers.Adam(learning_rate=0.0001),
    loss="binary_crossentropy",  # Use binary cross-entropy for binary classification
    metrics=["accuracy"]

In [None]:
def generate_binary_batches(original_mass_train, labels_df, datagen, batch_size=batch_size):
    # Create a dictionary to map 'mass' values to 'pathology' labels
    path_to_label = dict(zip(labels_df['image file path'], labels_df['pathology']))

    while True:
        np.random.shuffle(original_mass_train)
        
        for i in range(0, len(original_mass_train), batch_size):
            batch_paths = original_mass_train[i:i + batch_size]
            images = []
            labels = []

            for file_path in batch_paths:
                dicom_image = load_dicom_image(file_path)
                processed_image = main_preprocess(dicom_image)
                processed_image = datagen.random_transform(processed_image)
                transformed_path = transform_file_path(file_path)
                pathology = path_to_label.get(transformed_path, "unknown")
                label = 1 if pathology == "MALIGNANT" else 0
                
                images.append(processed_image)
                labels.append(label)

            yield np.array(images), np.array(labels)


In [None]:
your_test_data, your_test_labels = load_and_preprocess_batch_test(original_mass_test, labels_df, batch_size=batch_size)

# Split the test data into validation and test sets
validation_data, test_data, validation_labels, test_labels = train_test_split(
    your_test_data, your_test_labels, test_size=0.5, random_state=42
)

binary_classification_model.fit(
    x=your_binary_data,  # Input your binary classification data here
    y=your_binary_labels,  # Binary labels (0 for benign, 1 for malignant)
    epochs=10,  # Adjust as needed
    validation_data=(your_validation_data, your_validation_labels),  # Validation data
)

# Evaluate the model for binary classification
binary_classification_model.evaluate(your_test_data, your_test_labels)

## on calcification