In [None]:
!pip install annoy

In [None]:
import os
import re
from tqdm import tqdm
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET

import tensorflow as tf
import matplotlib.pyplot as plt
from annoy import AnnoyIndex
import cv2
from collections import defaultdict
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer,CountVectorizer

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount = True)

In [None]:
REPORTS_PATH = '/content/drive/MyDrive/Colab Notebooks/Data/IU X-Ray New/ecgen-radiology'
IMAGES_DIR = '/content/drive/My Drive/Colab Notebooks/Data/IUXRay/Scanned Images Unzipped/Scanned Images/'

ids = []
impressions = []
findings = []
images = []

for filename in  sorted(os.listdir(REPORTS_PATH)):
    root = ET.parse(os.path.join(REPORTS_PATH,filename)).getroot()
    imageId = []

    for child in root.iter():
        if child.tag == 'pmcId':
            ids.append(child.attrib['id'])
        if child.tag == 'AbstractText':
            if child.attrib['Label'] == 'IMPRESSION':
                impressions.append(child.text)
            if child.attrib['Label'] == 'FINDINGS':
                findings.append(child.text)
        if child.tag == 'parentImage':
            imageId.append(IMAGES_DIR+child.attrib['id']+'.png')
    images.append(imageId)

In [None]:
df = pd.DataFrame(
    {'ids': ids,
     'images': images,
     'impressions': impressions,
     'findings': findings
    })

In [None]:
img_count_0 = df.images.apply(lambda x:len(x)==0)
idx = []
for i in img_count_0.index.values:
    if img_count_0[i]==True:
        idx.append(i)
df.drop(idx, inplace=True)
df.reset_index(inplace=True)

df['images'] = df['images'].apply(lambda x:x[:2])
for i in range(len(df['images'])):
  if len(df['images'][i]) < 2:
    df['images'][i] = [df['images'][i][0], df['images'][i][0]]

df.dropna(inplace=True)

In [None]:
def decontracted(phrase):
    phrase = re.sub(r"won't", "will not", phrase)
    phrase = re.sub(r"can\'t", "can not", phrase)
    phrase = re.sub(r"n\'t", " not", phrase)
    phrase = re.sub(r"\'re", " are", phrase)
    phrase = re.sub(r"\'s", " is", phrase)
    phrase = re.sub(r"\'d", " would", phrase)
    phrase = re.sub(r"\'ll", " will", phrase)
    phrase = re.sub(r"\'t", " not", phrase)
    phrase = re.sub(r"\'ve", " have", phrase)
    phrase = re.sub(r"\'m", " am", phrase)
    return phrase

def preprocess(text):
  text = re.sub('XXXX','',text)
  text = re.sub('xxxx','',text)
  text = re.sub(r'[^A-Za-z.]+',' ',text) # removing periods too because impressions are one line
  text = decontracted(text)
  text = re.sub(r'[\r\n]', ' ', text)
  return text

In [None]:
df['findings'] = df['findings'].map(preprocess)

In [None]:
df.head()

In [None]:
df_images_split = df['images'].apply(pd.Series)
df = pd.concat([df, df_images_split], axis=1)
df.head()
df.columns = ['index','uid', 'images', 'impressions', 'findings', 'image1', 'image2']
df = df.drop(columns=['index', 'images'])
# df['image2'].fillna(df['image1'], inplace=True)

In [None]:
df.head()

In [None]:
IMG_HEIGHT = 224
IMG_WIDTH = 224

In [None]:
# Load pre-trained VGG16 model
# vgg16 = tf.keras.applications.VGG16(weights='imagenet', include_top=False, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3))

# trying chexnet
pretrained_model = tf.keras.applications.DenseNet121(weights='/content/drive/My Drive/Colab Notebooks/Data/IUXRay/CheXNet_weights.h5',
                                                classes = 14,input_shape=(IMG_HEIGHT,IMG_WIDTH,3))

In [None]:
train_df, test_df = train_test_split(df, test_size=0.25, random_state=42)

In [None]:
train_df.head()

In [None]:
from tensorflow.keras.applications.densenet import DenseNet121, preprocess_input

def extract_image_features(filepath):
    img = cv2.imread(filepath)
    # print(filepath)
    img = cv2.resize(img, (IMG_WIDTH, IMG_HEIGHT))
    img = tf.keras.applications.densenet.preprocess_input(img)
    features = pretrained_model.predict(np.expand_dims(img, axis=0))
    return features

def extract_features(filepaths):
    features = []
    for filepath in filepaths:
        features.append(extract_image_features(filepath))
        if  len(features) %100 == 0:
          print(len(features))
    return np.array(features)

In [None]:
# Extracting text features using TF-IDF
def extract_text_features(text):
    vectorizer = TfidfVectorizer()
    # Fit vectorizer to the text data and transform into features
    features = vectorizer.fit_transform(text)
    return features.toarray()

In [None]:
text_features = extract_text_features(df['findings'])

In [None]:
image1_features = extract_features(df['image1'].values)


In [None]:
train_df.head()

In [None]:
image1_features_train = extract_features(train_df['image1'].values)
image_features_train = [f.flatten() for f in image_features_train]
text_features_train = extract_text_features(train_df['findings'])

In [None]:
image_features_train.shape, text_features_train.shape

In [None]:
len(image_features_train[0])

In [None]:
features_train = [np.concatenate((img_feat, text_feat)) for img_feat, text_feat in zip(image_features_train, text_features_train)]
num_dimensions = features_train[0].shape[0]

In [None]:
index = AnnoyIndex(num_dimensions, metric="angular")
# adding feature vectors to the index
for i, feature in enumerate(features_train):
    index.add_item(i, feature)
index.build(10)

In [None]:
image_features_test = extract_features(test_df['image1'].values)
# text_features_test = extract_text_features(test_df['findings'])

In [None]:
k = 10
true_findings = []
predicted_findings = []

for i, row in test_df.iterrows():
    query_image_features = extract_image_features(row['image1'])
    query_features = np.concatenate((query_image_features,), axis=None)
    query_features = np.pad(query_features, (0, num_dimensions - len(query_features)), mode='constant')
    nn_indices = index.get_nns_by_vector(query_features, k)
    nn_findings = [train_df.iloc[i]['findings'] for i in nn_indices]
    predicted_finding = max(nn_findings, key = nn_findings.count)
    predicted_findings.append(predicted_finding)
    true_findings.append(row['findings'])

In [None]:
len(predicted_findings), len(true_findings)

In [None]:
print(f'Ground Truth Report for a sample: {predicted_findings[0]}')
print(f'Predicted Report for a sample: {true_findings[0]}')

In [None]:
print(f'Ground Truth Report for a sample: {predicted_findings[40]}')
print(f'Predicted Report for a sample: {true_findings[40]}')

In [None]:
from nltk.translate.bleu_score import corpus_bleu

def compute_bleu_scores(predicted_findings, true_findings):
    references = [[true.split()] for true in true_findings]
    hypotheses = [predicted.split() for predicted in predicted_findings]

    bleu_1 = corpus_bleu(references, hypotheses, weights=(1, 0, 0, 0))
    bleu_2 = corpus_bleu(references, hypotheses, weights=(0.5, 0.5, 0, 0))
    bleu_3 = corpus_bleu(references, hypotheses, weights=(1/3, 1/3, 1/3, 0))
    bleu_4 = corpus_bleu(references, hypotheses)

    return bleu_1, bleu_2, bleu_3, bleu_4


bleu_1, bleu_2, bleu_3, bleu_4 = compute_bleu_scores(predicted_findings, true_findings)

print(f"BLEU-1: {bleu_1}")
print(f"BLEU-2: {bleu_2}")
print(f"BLEU-3: {bleu_3}")
print(f"BLEU-4: {bleu_4}")

In [None]:
from nltk.translate.bleu_score import sentence_bleu
total_score = 0.0
scores = []
num_scores = 0
for pred, true in zip(predicted_findings, true_findings):
    pred_tokens = pred.split()
    true_tokens = true.split()
    score = sentence_bleu([true_tokens], pred_tokens)
    scores.append(score)
    total_score += score
    num_scores += 1
avg_score = total_score / num_scores

print(f"Average BLEU score: {avg_score}")

In [None]:
plt.hist(scores, density=True, bins=30)
plt.show()