In [27]:
import os
import pandas as pd
import tensorflow as tf 
import numpy as np
import glob
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.metrics import classification_report

In [28]:
import cv2
import numpy as np
import pydicom

RESIZE_DIM = 256

def read_dcm(img_path):
    img_dicom = pydicom.dcmread(img_path)
    # convert to numpy array
    # Check the PhotometricInterpretation metadata
    if img_dicom.PhotometricInterpretation == 'MONOCHROME1':
        image = np.invert(img_dicom.pixel_array)
    else:
        image = img_dicom.pixel_array

    # Rescale the pixel values to [0, 255]
    image = cv2.normalize(image, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U)

    # Resize the image to desired size
    return cv2.resize(image, (RESIZE_DIM, RESIZE_DIM))

In [29]:
from skimage.util import random_noise
from enum import Enum

class NoiseType(Enum):
    GAUSSIAN = 1
    POISSON = 2
    SALT_AND_PEPPER = 3
    SPECKLE = 4

"""
 This class defines the noise configuration to be applied to the CT scan dataset.
"""
class NoiseConfiguration:
    def __init__(self, noise_types, noise_levels):
        """
        Initialize the noise configuration.

        Args:
            noise_types (list): A list of noise types to apply. Supported types are NoiseType enums.
            noise_levels (list): A list of noise levels to apply, corresponding to the noise types.
        """
        self.noise_types = noise_types
        self.noise_levels = noise_levels

"""
This class simulates different types of noise on CT scan images.
"""
class NoiseSimulator:
    @staticmethod
    def apply_gaussian_noise(image, sigma):
        """
        Apply Gaussian noise to the input image.

        Gaussian noise is a common type of noise that can be observed in medical imaging due to various factors, such as electronic noise in the imaging device or thermal noise. This type of noise is characterized by a normal distribution of pixel values around the original pixel value.

        Args:
            image (numpy.ndarray): The input image to be transformed.
            noise_level (float): The standard deviation of the Gaussian noise to be applied.

        Returns:
            numpy.ndarray: The image with Gaussian noise applied.
        """
        noisy_image = random_noise(image, mode="gaussian", var=sigma**2)

        return noisy_image

    @staticmethod
    def apply_poisson_noise(image):
        """
        Apply Poisson noise to the input image.

        Poisson noise is another type of noise that is common in medical imaging, especially in low-light conditions or when the signal-to-noise ratio is low.
        This type of noise is characterized by a Poisson distribution of pixel values, where the variance of the noise is proportional to the original pixel value.

        Args:
            image (numpy.ndarray): The input image to be transformed.

        Returns:
            numpy.ndarray: The image with Poisson noise applied.
        """
        
        return random_noise(image, mode="poisson", clip=True)



    @staticmethod
    def apply_salt_and_pepper_noise(image, noise_pct):
        """
        Apply salt-and-pepper noise to the input image.

        Salt-and-pepper noise is a type of noise that can be observed in medical imaging due to sensor errors or bit errors during data transmission. 
        This type of noise is characterized by randomly occurring white and black pixels, which can be interpreted as "salt" (white) and "pepper" (black) pixels.

        Args:
            image (numpy.ndarray): The input image to be transformed.
            noise_pct (float): The proportion of pixels to apply Poisson noise to (0-1).

        Returns:
            numpy.ndarray: The image with salt-and-pepper noise applied.
        """

        return random_noise(image, mode="s&p", amount=noise_pct)


    @staticmethod
    def apply_speckle_noise(image, noise_level):
        """
        Apply speckle noise to the input image.

        Speckle noise is a type of multiplicative noise that is common in ultrasound imaging. This type of noise is characterized by a granular pattern that can be observed in the image due to interference between the transmitted and reflected signals.

        Args:
            image (numpy.ndarray): The input image to be transformed.
            noise_level (float): The variance of the speckle noise to be applied.

        Returns:
            numpy.ndarray: The image with speckle noise applied.
        """

        return random_noise(image, mode="speckle", var=noise_level**2)
    

    @staticmethod
    def apply_noise(image: np.ndarray, noise_config: NoiseConfiguration):
        """
        Apply the specified noise configurations to the input image.

        Args:
            image (numpy.ndarray): The input image to be transformed.
            noise_config (NoiseConfiguration): The noise configuration object that defines the noise types and levels to be applied.

        Returns:
            numpy.ndarray: The image with the specified noise applied.
        """
        noisy_image = image.copy()
        for noise_type, noise_level in zip(noise_config.noise_types, noise_config.noise_levels):
            if noise_type == NoiseType.GAUSSIAN:
                noisy_image = NoiseSimulator.apply_gaussian_noise(noisy_image, noise_level)
            elif noise_type == NoiseType.POISSON:
                noisy_image = NoiseSimulator.apply_poisson_noise(noisy_image)
            elif noise_type == NoiseType.SALT_AND_PEPPER:
                noisy_image = NoiseSimulator.apply_salt_and_pepper_noise(noisy_image, noise_level)
            elif noise_type == NoiseType.SPECKLE:
                noisy_image = NoiseSimulator.apply_speckle_noise(noisy_image, noise_level)
            else:
                raise ValueError(f"Unsupported noise type: {noise_type}")
            
        return noisy_image

In [30]:
from skimage.transform import iradon, radon

def get_noisy_image(image):
    subsample = 8
    theta = np.linspace(0., 180., max(image.shape)//subsample, endpoint=False)
    sinogram = radon(image, theta=theta)
    simulation_config = NoiseConfiguration([NoiseType.POISSON], [])
    noisy_sinogram = NoiseSimulator.apply_noise(sinogram, simulation_config)
    return iradon(noisy_sinogram, theta=theta, filter_name='ramp')


In [31]:
def get_train_and_test_data():
  print("Getting train and test data")
  df=pd.read_csv("/kaggle/input/rsna-bme548/RSNA_BME548/data.csv")

  files=glob.glob("/kaggle/input/rsna-bme548/RSNA_BME548/images/*.dcm")

  label=[]
  img_array=[]
  for i in files:
    filename = os.path.basename(i)
    img=read_dcm(i)
    noisy_img = get_noisy_image(img)
    img_array.append(np.array(noisy_img))
    # Get value of column "any" for the row with filename as index
    label.append(df.loc[df['filename'] == filename]['any'].values[0])

  x = np.array(img_array)
  x = x/255
  y = np.array(label)

  x = np.expand_dims(x,3)
  X_train, X_test, y_train, y_test = train_test_split(x,y, test_size = 0.15, random_state = 0)
  
  return X_train, X_test, y_train, y_test

In [32]:
class Block(tf.keras.Model):
    def __init__(self, filters, kernel_size, repetitions, pool_size=2, strides=2):
        super(Block, self).__init__()
        self.filters =filters
        self.kernel_size = kernel_size
        self.repetitions = repetitions
        
        # Define a conv2D_0, conv2D_1, etc based on the number of repetitions
        for i in range(self.repetitions):
            
            # Define a Conv2D layer, specifying filters, kernel_size, activation and padding.
            vars(self)[f'conv2D_{i}'] = tf.keras.layers.Conv2D(self.filters,self.kernel_size,activation='relu',padding="same")
        
        # Define the max pool layer that will be added after the Conv2D blocks
        self.max_pool = tf.keras.layers.MaxPooling2D(pool_size, strides=strides)
  
    def call(self, inputs):
        # access the class's conv2D_0 layer
        conv2D_0 = vars(self)['conv2D_0']
        
        # Connect the conv2D_0 layer to inputs
        x = conv2D_0(inputs)

        # for the remaining conv2D_i layers from 1 to `repetitions` they will be connected to the previous layer
        for i in range(1,self.repetitions):
            # access conv2D_i by formatting the integer `i`. (hint: check how these were saved using `vars()` earlier)
            conv2D_i = vars(self)[f'conv2D_{i}']
            
            # Use the conv2D_i and connect it to the previous layer
            x = conv2D_i(x)

        # Finally, add the max_pool layer
        max_pool = self.max_pool(x)
        
        return max_pool

In [33]:
class MyVGG(tf.keras.Model):

    def __init__(self, num_classes):
        super(MyVGG, self).__init__()

        # Creating blocks of VGG with the following 
        # (filters, kernel_size, repetitions) configurations
        self.block_a = Block(filters=64, kernel_size=3, repetitions=2)
        self.block_b = Block(filters=128, kernel_size=3, repetitions=2)
        self.block_c = Block(filters=256, kernel_size=3, repetitions=3)
        self.block_d = Block(filters=512, kernel_size=3, repetitions=3)
        self.block_e = Block(filters=512, kernel_size=3, repetitions=3)

        # Classification head
        # Define a Flatten layer
        self.flatten = tf.keras.layers.Flatten()
        # Create a Dense layer with 256 units and ReLU as the activation function
        self.fc = tf.keras.layers.Dense(256,activation='relu')
        # Finally add the softmax classifier using a Dense layer
        self.classifier = tf.keras.layers.Dense(num_classes, activation='softmax')

    def call(self, inputs):
        # Chain all the layers one after the other
        x = self.block_a(inputs)
        x = self.block_b(x)
        x = self.block_c(x)
        x = self.block_d(x)
        x = self.block_e(x)
        x = self.flatten(x)
        x = self.fc(x)
        x = self.classifier(x)
        return x

In [34]:
def get_model_accuracy(X_train, X_test, y_train, y_test):
  vgg = MyVGG(num_classes=2)
  vgg.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
  vgg.fit(X_train,y_train,epochs=5)

  # vgg.save_weights("models/vgg.weights.h5")

  y_pred=vgg.predict(X_test)

  y_pred_binary=[]
  for a in y_pred:
    y_pred_binary.append(np.argmax(a))

  y_pred_binary=np.array(y_pred_binary)

  accuracy = metrics.accuracy_score(y_test,y_pred_binary)

  print("Accuracy of model=",accuracy)

  print(classification_report(y_test, y_pred_binary, target_names=['0','1']))

  return accuracy

In [35]:
X_train, X_test, y_train, y_test = get_train_and_test_data()

Getting train and test data


  warn('Radon transform: image must be zero outside the '


In [36]:
get_model_accuracy(X_train, X_test, y_train, y_test)

Epoch 1/5
[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 83ms/step - accuracy: 0.4922 - loss: 0.6935
Epoch 2/5
[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 77ms/step - accuracy: 0.4804 - loss: 0.6933
Epoch 3/5
[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 77ms/step - accuracy: 0.4883 - loss: 0.6932
Epoch 4/5
[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 77ms/step - accuracy: 0.5049 - loss: 0.6931
Epoch 5/5
[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 77ms/step - accuracy: 0.4843 - loss: 0.6932
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 95ms/step
Accuracy of model= 0.5016666666666667
              precision    recall  f1-score   support

           0       0.50      1.00      0.67       301
           1       0.00      0.00      0.00       299

    accuracy                           0.50       600
   macro avg       0.25      0.50      0.33       600
weighted avg   

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


0.5016666666666667