In [None]:
!pip install pydicom
!pip install seaborn

import glob, pylab, pandas as pd
import pydicom, numpy as np
from os import listdir
from os.path import isfile, join
import matplotlib.pylab as plt
import os
import seaborn as sns

In [2]:
from keras import layers
from keras.applications import DenseNet121
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import Callback, ModelCheckpoint
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.optimizers import Adam
from tqdm import tqdm

In [3]:
PATH="../input/rsna-intracranial-hemorrhage-detection/rsna-intracranial-hemorrhage-detection"
!ls ../input/rsna-intracranial-hemorrhage-detection/rsna-intracranial-hemorrhage-detection

stage_2_sample_submission.csv  stage_2_test  stage_2_train  stage_2_train.csv


In [None]:
train = pd.read_csv(join(PATH,'stage_2_train.csv'))
train.head()

In [None]:
train['Sub_type'] = train['ID'].str.split("_", n = 2, expand = True)[2]
train['PatientID'] = train['ID'].str.split("_", n = 2, expand = True)[1]
train.head()

In [None]:
num_of_training_patients=len(os.listdir("/kaggle/input/rsna-intracranial-hemorrhage-detection/rsna-intracranial-hemorrhage-detection/stage_2_train"))
num_of_testing_patients=len(os.listdir("/kaggle/input/rsna-intracranial-hemorrhage-detection/rsna-intracranial-hemorrhage-detection/stage_2_test"))


In [None]:
#ploting amount of training and testing data
labels = 'Training', 'Testing'
sizes = [num_of_training_patients, num_of_testing_patients]
explode = (0, 0.1)

fig, ax = plt.subplots(figsize=(6, 6))
ax.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax.axis('equal')
ax.set_title('Training and Testing Data')

plt.show()

In [None]:
#Comparing 0 labels to 1 labels
print(train.Label.value_counts())
sns.countplot(x='Label', data=train)

In [None]:
#Number of each subtype labeled as 1
subtype_counts = train.groupby("Sub_type").Label.value_counts().unstack()
subtype_counts = subtype_counts.loc[:, 1]
subtype_counts


In [None]:
fig=plt.figure(figsize=(20, 8))

sns.countplot(x="Sub_type", hue="Label", data=train)

plt.title("Total Images by Subtype")

**Note:**
* The samples labeled with 0s are too much when compared to the samples labeled with 1s for each subtype.

In [None]:
labels =  'epidural','intraparenchymal','intraventricular','subarachnoid','subdural'
sizes = [subtype_counts[1],subtype_counts[2],subtype_counts[3],subtype_counts[4],subtype_counts[5]]
explode = (0, 0.1)

fig, ax = plt.subplots(figsize=(6, 6))
ax.pie(sizes, labels=labels, autopct='%1.1f%%', shadow=False, startangle=90)
ax.axis('equal')
ax.set_title('Subtypes')

plt.show()

**Notes:**
*  We can see here that the data is not balanced, some subtypes have few examples, and that will make it hard to train the model to detectthose subtypes(epidural for example).
*  Data augmentation techniques will be required to perform the IH detection. Or random sampling can be used too, in such a way that the number of positive patients are equal to the number of negative patients. And as the data is so big I suggest to use subset of it, and the choosen subset have to balanced. 

In [None]:
traindf=train.copy()
traindf[['ID', 'Image', 'Diagnosis']] = traindf['ID'].str.split('_', expand=True)
traindf = traindf[['Image', 'Diagnosis', 'Label']]
traindf.drop_duplicates(inplace=True)
traindf = traindf.pivot(index='Image', columns='Diagnosis', values='Label').reset_index()
traindf['Image'] = 'ID_' + traindf['Image']
traindf.head(100)

In [None]:
#Cases with more than one IH subtype detected in the training dataset
x=[]
for n in range(6):
    many = traindf[traindf[['epidural', 'intraparenchymal', 'intraventricular', 'subarachnoid', 'subdural']].sum(1) == n].copy()
    x.append(len(many))
    print('Number of hemorrhages: {}, amount of such images: {}, fraction: {:.3f}%'.format(n, len(many), 100 * len(many) / len(traindf)))
    print(x[n])

In [None]:
y=['0','1','2','3','4','5']
fig, ax = plt.subplots()
sns.barplot(x=list(y[1:]), y=list(x[1:]), ax=ax)
ax.set_title("the number of images for each class")
ax.set_xlabel("class")

In [None]:
!pip install scikit-image

from skimage.io import imread_collection
import skimage.io
import skimage.color
import skimage.transform
from platform import python_version

**Note:**
In most of the cases where IH is detected we have one subtype hemorhage detected. 

In [None]:
# extract filenames from the folder of images
filenames = []
for root, dirs, files in os.walk('../input/rsna-hemorrhage-jpg/train_jpg/train_jpg'):
    for file in files:
        if file.endswith('.jpg'):
            filenames.append(file)
            
# should be the same as the images imported
len(filenames)

In [None]:
col_dir = '../input/rsna-hemorrhage-jpg/train_jpg/train_jpg/*.jpg'

# Create a collection with the available images
images = imread_collection(col_dir)

len(images)

In [None]:
# Plot the first image
plt.figure()
plt.imshow(images[0])
plt.colorbar()
plt.grid(False)
plt.show()

print(images[0])

In [None]:
print(images[0].shape)
print(images[1].shape)
print(images[2].shape)

In [None]:
# Select only the first 5000 images
images_trn = images[:2000]
print(len(images_trn))
images_val = images[20000:22000]
print(len(images_val))
images_tst = images[25000:30000]
print(len(images_tst))

In [None]:
images_arr_trn = skimage.io.collection.concatenate_images(images_trn)
images_arr_val = skimage.io.collection.concatenate_images(images_val)
images_arr_tst = skimage.io.collection.concatenate_images(images_tst)

In [None]:
!pip install pyarrow

labels = pd.read_feather('../input/rsna-hemorrhage-jpg/meta/meta/labels.fth')

#manipulate the filenames list, stripping the .jpg at the end
idstosearch = [item.rstrip(".jpg") for item in filenames]

#now search the "ID" column for ids that correspond to our filenames
#made the reduced dataframe "labels2" for now
labels2 = labels[labels['ID'].isin(idstosearch)]
labels2.shape

In [None]:
labels = labels2.iloc[:, 1]
print(labels)

In [None]:
labels_trn = labels[:2000]
print(len(labels_trn))
labels_val = labels[20000:22000]
print(len(labels_val))
labels_tst = labels[25000:30000]
print(len(labels_tst))

In [None]:
print(type(labels_trn))
print(labels_trn.sum())

In [None]:
# Transform labels into array
labels_trn = pd.Series.to_numpy(labels_trn)
print(len(labels_trn))
labels_val = pd.Series.to_numpy(labels_val)
print(len(labels_val))
labels_tst = pd.Series.to_numpy(labels_tst)
print(len(labels_tst))

In [None]:
from keras.applications import resnet50

model = resnet50.ResNet50(weights="imagenet")

In [None]:
# Resize all images 

images_final = []

for i in range(len(images_arr_trn)):
  image_rescaled = skimage.transform.resize(images_arr_trn[i], (224, 224, 3))
  images_final.append(image_rescaled)

In [None]:
from skimage.transform import resize
# Resize validation images to (224, 224, 3)
images_val = [resize(image, (224, 224, 3)) for image in images_val]

# Resize test images to (224, 224, 3)
images_tst = [resize(image, (224, 224, 3)) for image in images_tst]

# Resize all images to 224x224
images_final_resized = [resize(image, (224, 224, 3)) for image in images_final]


**MODEL**

In [None]:
import keras
from keras.layers import Dense, GlobalAveragePooling2D
from keras.models import Model
from keras.optimizers import Adam
from skimage.transform import resize
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import tensorflow as tf

In [None]:


# Base model (ResNet50 without top classification layers, using pre-trained weights)
base_model = resnet50.ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Global average pooling layer
x = base_model.output
x = GlobalAveragePooling2D()(x)

# Fully connected layer
x = Dense(256, activation='relu')(x)

# Output layer with sigmoid activation for binary classification (assuming binary classification)
predictions = Dense(1, activation='sigmoid')(x)

# Combine base model and top layers
model = Model(inputs=base_model.input, outputs=predictions)

# Freeze layers from the ResNet50 base model
for layer in base_model.layers:
    layer.trainable = False


In [None]:

# Compile the model
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])    
    
    
# Defining a loss object and an optimizer
#loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
#optimizer = tf.keras.optimizers.Adam()    
    
# Compile the model
#model.compile(optimizer=Adam(lr=0.001), loss=loss_object, metrics=['accuracy'])




# Display model summary
#model.summary()

#visualization of the model architechture
#tf.keras.utils.plot_model(model, show_shapes=True)

training  setup

In [None]:
#THE BELOW TRAINING FUNCTION IS BETTER
# Train the model
"""history = model.fit(
    np.array(images_final_resized),  # Input images
    labels_trn,  # Target labels
    epochs=2,
    batch_size=64,
    validation_data=(np.array(images_val), labels_val)
)
"""
# Evaluate the model on test data
#test_loss, test_accuracy = model.evaluate(np.array(images_tst), labels_tst)
#print(f"Test Accuracy: {test_accuracy * 100:.2f}%")



**TRAINING**

In [None]:
"""epochs=2
# Train the model
for epoch in range(epochs):
    history = model.fit(
        np.array(images_final),  # Input images
        labels_trn,  # Target labels
        epochs=1,
        batch_size=32,
        validation_data=(np.array(images_val), labels_val)
    )

    # Evaluate on validation data after each epoch
    y_val_pred = model.predict(np.array(images_val))
    y_val_pred_binary = (y_val_pred > 0.5).astype(int)

    val_accuracy = accuracy_score(labels_val, y_val_pred_binary)
    val_precision = precision_score(labels_val, y_val_pred_binary)
    val_recall = recall_score(labels_val, y_val_pred_binary)
    val_f1 = f1_score(labels_val, y_val_pred_binary)

    print(f"Epoch {epoch + 1} - Validation Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1 Score: {val_f1:.4f}")
"""

In [None]:
import matplotlib.pyplot as plt

from keras.callbacks import ModelCheckpoint
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

"""# Train the model and collect history
history = model.fit(
    np.array(images_final),
    labels_trn,
    epochs=epochs,
    batch_size=32,
    validation_data=(np.array(images_val), labels_val)
)
"""
# Define initial variables and callbacks
best_val_accuracy = 0.0
checkpoint_path = 'best_model_weights.h5'

# Define a callback to save the best model weights based on validation accuracy
checkpoint = ModelCheckpoint(checkpoint_path, monitor='val_accuracy', save_best_only=True, mode='max', verbose=1)


# Give the Epoch number
epochs=15


# Train the model
for epoch in range(epochs):
    history = model.fit(
        np.array(images_final),  # Input images
        labels_trn,  # Target labels
        epochs=1,
        batch_size=64,
        validation_data=(np.array(images_val), labels_val)
    )

    # Evaluate on validation data after each epoch
    y_val_pred = model.predict(np.array(images_val))
    y_val_pred_binary = (y_val_pred > 0.5).astype(int)

    val_accuracy = accuracy_score(labels_val, y_val_pred_binary)
    
    
    # Check if validation accuracy has improved
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        print(f"Epoch {epoch + 1} - Validation Accuracy Improved: {best_val_accuracy:.4f}. Saving model weights.")
        model.save_weights(checkpoint_path)  # Save the model weights

    
    
    val_precision = precision_score(labels_val, y_val_pred_binary)
    val_recall = recall_score(labels_val, y_val_pred_binary)
    val_f1 = f1_score(labels_val, y_val_pred_binary)

    
    
    
    print(f"Epoch {epoch + 1} - Validation Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1 Score: {val_f1:.4f}")




# Plotting accuracy and loss for each epoch
plt.figure(figsize=(12, 5))

# Accuracy
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

# Evaluate the model on test data after training
# ... (rest of your code for evaluation)


Testing

In [None]:

# Evaluate the model on test data after training
y_test_pred = model.predict(np.array(images_tst))
y_test_pred_binary = (y_test_pred > 0.5).astype(int)

test_accuracy = accuracy_score(labels_tst, y_test_pred_binary)
test_precision = precision_score(labels_tst, y_test_pred_binary)
test_recall = recall_score(labels_tst, y_test_pred_binary)
test_f1 = f1_score(labels_tst, y_test_pred_binary)

print(f"Test Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1 Score: {test_f1:.4f}")



In [None]:
#HAVE TO SAVE THE WEIGHTS

In [None]:
#HAVE TO GIVE THE WEIGHTS TO THE MODEL AND GIVE THE IMAGE TO THE MODEL TO SEE RESULTS

In [None]:
images_val = []

for i in range(len(images_arr_val)):
  image_rescaled = skimage.transform.resize(images_arr_val[i], (224, 224, 3))
  images_val.append(image_rescaled)

In [None]:
images_val = skimage.io.collection.concatenate_images(images_val)

In [None]:
# Validate model
test_loss, test_acc = model.evaluate(images_val, labels_val, verbose=2)

print('\nTest accuracy:', test_acc)