# Labels and Report Reading

In [None]:
import numpy as np

ROOT = "/gpfs/users/a1808469"

DATASET = "mimic-cxr" 

RRS_FINDINGS_PATH = f"{ROOT}/{DATASET}/text/train.findings.tok"
RRS_IMPRESSION_PATH = f"{ROOT}/{DATASET}/text/train.impression.tok"
RRS_IMG_PATH = f"{ROOT}/{DATASET}/text/train.image.tok"

IMG_LABEL_PATH = f"{ROOT}/{DATASET}/{DATASET}-train-img-labels.json"

SENTENCE_MAPPING_PATH = f"{ROOT}/{DATASET}/chexpert-labels/sentence_mappings_train_findings.json"

CHEXPERT_FINDINGS_LABELS_PATH = f"{ROOT}/{DATASET}/chexpert-labels/train.findings.label"
CHEXPERT_IMPRESSION_LABELS_PATH = f"{ROOT}/{DATASET}/chexpert-labels/train.impression.label"

# Get txt labels
import csv
with open(CHEXPERT_FINDINGS_LABELS_PATH, "r") as f:
    reader = csv.DictReader(f)  # Reads CSV into a list of dictionaries
    findings_labels = list(reader)

# Convert txt labels to float 
findings_labels = [
    {k: float(v) if k != "Reports" and v else v if k == "Reports" else np.nan for k, v in label.items()}
    for label in findings_labels
]

with open(CHEXPERT_IMPRESSION_LABELS_PATH, "r") as f:
    reader = csv.DictReader(f)  # Reads CSV into a list of dictionaries
    impression_labels = list(reader)

# Convert txt labels to float 
impression_labels = [
    {k: float(v) if k != "Reports" and v else v if k == "Reports" else np.nan for k, v in label.items()}
    for label in impression_labels
]


# Get sentence mappings
import json
with open(SENTENCE_MAPPING_PATH, "r") as f:
    sentence_mappings = json.load(f)

# Read original txt
with open(RRS_FINDINGS_PATH, "r") as f:
    input_text = [line.strip() for line in f]

with open(RRS_IMPRESSION_PATH, "r") as f:
    ground_truth = [line.strip() for line in f]

with open(RRS_IMG_PATH, "r") as f:
    image_paths = [line.strip() for line in f]

# normalize each sample in original text (remove <q> and lower case)
input_text = [s.replace("<q>", "").lower() for s in input_text]

# Get image labels
with open(IMG_LABEL_PATH, "r") as f:
    img_labels = json.load(f)

# Convert img labels positive labels to match txt labels
for d in img_labels:
    for k, v in d.items():
        if isinstance(v, (int, float)) and v > 0.5:
            d[k] = 1.0
        elif isinstance(v, (int, float)) and v < 0.1:
            d[k] = -1.0
        else:
            d[k] = np.nan


In [None]:
import re
from nltk.tokenize import sent_tokenize
from rapidfuzz import fuzz

def exclusive(original_doc, mapped_sentence, disease, threshold=90):
    new_doc = original_doc

    for sent in sent_tokenize(original_doc):
            if fuzz.ratio(sent.strip(), mapped_sentence[disease]) > threshold:
                new_doc = new_doc.replace(sent, " ")

    return new_doc


In [None]:

modified_index = []
exclusive_set = []
exclusive_set_impression = []
exclusive_set_images = []

conflicting_set = []

reduced_orginal_findings = []
reduced_orginal_impression =[]
reduced_orginal_images =[]

block_count = 0
exact_count = 0
exact = []

# Generate exclusive set
for i, (input_txt, findings, img_label, impressions, sentences) in enumerate(zip(input_text, findings_labels, img_labels, impression_labels, sentence_mappings)):

    exclusive_doc = input_txt # copy input text to temp, for some modifications
    conflicting_doc = input_txt
    blocked = False

    # Normalize image labels to txt labels
    if findings["No Finding"] == 1.0 or impressions["No Finding"] == 1.0:
        # no_replacements.append(i)
        continue

    # Only continue to augmentation if contain other observation/disease

    disease_list = [
        "Enlarged Cardiomediastinum", "Cardiomegaly", "Lung Opacity", "Edema", "Consolidation",
        "Pneumonia", "Atelectasis", "Pneumothorax", "Pleural Effusion", "Pleural Other"
    ]

    for disease in disease_list:
        img_key = disease
        if disease == "Pleural Effusion":
            img_key = "Effusion"
        elif disease == "Pleural Other":
            img_key = "Fibrosis"

        if findings[disease] == img_label.get(img_key, None) == impressions[disease]:
            exclusive_doc = exclusive(input_txt, sentences, disease)
            # conflicting_doc = conflicting(input_txt, sentences, disease)
            blocked = True


    # Special logic for Lung Lesion
    if findings["Lung Lesion"] == impressions["Lung Lesion"]:
        lesion_flag = any(img_label[k] == 1.0 for k in ["Lung Lesion", "Mass", "Nodule"]) or \
                      all(img_label[k] == 0.0 for k in ["Lung Lesion", "Mass", "Nodule"])

        if lesion_flag:
            exclusive_doc = exclusive(input_txt, sentences, "Lung Lesion")
            blocked = True
    

    # After filtering, store if exc & conf successfully obtained
    if conflicting_doc != input_txt and exclusive_doc != input_txt:
        # conflicting_set.append(conflicting_doc)
        exclusive_set.append(exclusive_doc)
        reduced_orginal_findings.append(input_txt)
        modified_index.append(i)

# Get image and impression
for index in modified_index:
    reduced_orginal_impression.append(ground_truth[index])
    reduced_orginal_images.append(image_paths[index])




In [None]:
len(exclusive_set) == len(conflicting_set) == len(reduced_orginal_findings) == len(reduced_orginal_images) == len(reduced_orginal_impression)

In [None]:
len(exclusive_set)

In [None]:
# Write to txt file
def list_to_txt(docs, path):
    with open(path, 'w') as file:
        for string in docs:
            file.write(string + '\n')
    print(f"===Saved to {path} ===")

list_to_txt(exclusive_set, f"{ROOT}/dataset/{DATASET}/text/train.findings.tok.exclusive")

list_to_txt(reduced_orginal_findings, f"{ROOT}/dataset/{DATASET}/text/train.findings.tok.reduced")
list_to_txt(reduced_orginal_impression, f"{ROOT}/dataset/{DATASET}/text/train.impression.tok.reduced")
list_to_txt(reduced_orginal_images, f"{ROOT}/dataset/{DATASET}/text/train.image.tok.reduced")
