In [47]:
import argparse
import csv
import logging
import json
from pathlib import Path
from typing import Dict

In [48]:
def inject_note_text(notes_map: Dict[int, str], admission: Dict) -> Dict:
    """Inject text in-place into admission"""
    for note in admission["notes"]:
        note_id = note.get("note_id")  # Get note_id from note
        if note_id is not None:
            text = notes_map.get(note_id)  # Get text from notes_map
            if text is not None:
                note["text"] = text
                for annotation in note.get("annotations", []):
                    begin = annotation.get("begin", 0)
                    end = annotation.get("end", 0)
                    annotation["covered_text"] = text[begin:end]
            else:
                # If you want to print a warning message instead of using logger:
                print(f"No text found for note_id: {note_id}")
        else:
            # If you want to print a warning message instead of using logger:
            print("No note_id found in note")

    return admission

In [49]:

def _make_out_path(json_file: Path, input_dir: Path, out_dir: Path) -> Path:
    """Generate output path for injected JSON file"""
    prefix_len = len(input_dir.parts)
    return out_dir.joinpath(*json_file.parts[prefix_len:])


In [50]:
def inject_and_persist(notes_map: Dict[int, str], data_dir: Path, out_dir: Path):
    """Inject text into admission and persist in out_dir if provided"""
    if out_dir:
        print(f"Injecting text and persisting to {out_dir.absolute()}")
    else:
        print("Injecting text in place")

    # Iterate through JSON files in data_dir
    for json_file in data_dir.glob("**/*.json"):
        with open(json_file, "r", encoding="utf8") as ifp:
            admission = inject_note_text(notes_map, json.load(ifp))

        # Create output path
        if out_dir:
            out_path = _make_out_path(json_file, data_dir, out_dir)
            out_path.parent.mkdir(parents=True, exist_ok=True)
        else:
            out_path = json_file

        # Write injected admission to output file
        with open(out_path, "w", encoding="utf8") as ofp:
            json.dump(admission, ofp, indent=2)


In [51]:
def build_notes_map(noteevents: Path) -> Dict[int, str]:
    """Build mapping from note_id to text from NOTEEVENTS.csv"""
    print(f"Loading {noteevents}")
    id_text_map = dict()
    csv.field_size_limit(1024 * 1024 * 1024)
    with open(noteevents, "r", encoding="utf8") as ifp:
        reader = csv.reader(ifp)
        # Skip header
        next(reader)
        for row in reader:
            note_id, text = int(row[0]), row[10]
            id_text_map[note_id] = text
    return id_text_map

In [52]:
def main(noteevents_path: Path, data_dir: Path, out_dir: Path = None):
    # Build mapping from note_id to text
    notes_map = build_notes_map(noteevents_path)
    # Inject text into admissions and persist if out_dir provided
    inject_and_persist(notes_map, data_dir, out_dir)

In [53]:
if __name__ == "__main__":
    noteevents_path = Path(r"C:\Users\okechukwu chude\Documents\NLP\text extraction\Automating-Medical-Coding\NOTEEVENTS.csv")
    data_dir = Path(r"C:\Users\okechukwu chude\Documents\NLP\text extraction\Automating-Medical-Coding\predictions")
    out_dir = Path(r"C:\Users\okechukwu chude\Documents\NLP\text extraction\Automating-Medical-Coding\with_text\gold")  # Set to None if you don't want to save to an output directory

    main(noteevents_path, data_dir, out_dir)

Loading C:\Users\okechukwu chude\Documents\NLP\text extraction\Automating-Medical-Coding\NOTEEVENTS.csv
Injecting text and persisting to C:\Users\okechukwu chude\Documents\NLP\text extraction\Automating-Medical-Coding\with_text\gold
No text found for note_id: 45238
No text found for note_id: 1188351
No text found for note_id: 1187104
No text found for note_id: 1188743
No text found for note_id: 1188269
No text found for note_id: 1187310
No text found for note_id: 1188081
No text found for note_id: 1188815
No text found for note_id: 37661
No text found for note_id: 709614
No text found for note_id: 709437
No text found for note_id: 709357
No text found for note_id: 709697
No text found for note_id: 445921
No text found for note_id: 445953
No text found for note_id: 35602
No text found for note_id: 38450
No text found for note_id: 1199478
No text found for note_id: 38312
No text found for note_id: 42388
No text found for note_id: 1251065
No text found for note_id: 1251080
No text found f

##########################                                   Start DATA.py                              ############################

In [54]:
import dataclasses
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator, List, Dict, Tuple, Optional

In [56]:
# Dataclass for representing a span of text
@dataclass(frozen=True)
class Span:
    begin: int
    end: int

    covered_text: Optional[str] = dataclasses.field(compare=False, default=None)

    def __len__(self) -> int:
        return self.end - self.begin

    @staticmethod
    def from_note_text(begin: int, end: int, note_text: str):
        return Span(begin, end, note_text[begin:end])



In [57]:
# Dataclass for representing a billing code
@dataclass(frozen=True)
class BillingCode:
    code: str
    code_system: str
    code_description: Optional[str] = dataclasses.field(compare=False, default=None)

In [58]:
# Dataclass for representing an annotation
@dataclass(frozen=True)
class Annotation:
    span: Span
    billing_code: BillingCode
    type: Optional[str] = dataclasses.field(compare=False, default=None)

    @staticmethod
    def from_json_dict(data: Dict) -> "Annotation":
        return Annotation(
            span=Span(
                begin=data.pop("begin"),
                end=data.pop("end"),
                covered_text=data.pop("covered_text", None),
            ),
            billing_code=BillingCode(
                data.pop("code"), data.pop("code_system"), data.pop("description", None)
            ),
            **data,
        )


In [59]:
# Dataclass for representing a note
@dataclass(frozen=True)
class Note:
    note_id: int
    category: str
    description: str
    annotations: List[Annotation] = dataclasses.field(repr=False)

    text: Optional[str] = dataclasses.field(repr=False, default=None)

    @staticmethod
    def from_json_dict(data: Dict) -> "Note":
        return Note(
            text=data.pop("text", None),
            annotations=[Annotation.from_json_dict(a) for a in data.pop("annotations")],
            **data,
        )


In [60]:
# Dataclass for representing an admission
@dataclass(frozen=True)
class Admission:
    hadm_id: int
    notes: List[Note]

    comment: str = None

    def __iter__(self) -> Iterator[Tuple[Note, Annotation]]:
        for note in self.notes:
            for annotation in note.annotations:
                # Set annotation.span.covered_text if we have note.text
                if note.text and not annotation.span.covered_text:
                    annotation = dataclasses.replace(
                        annotation,
                        span=dataclasses.replace(
                            annotation.span,
                            covered_text=note.text[
                                annotation.span.begin : annotation.span.end
                            ],
                        ),
                    )
                yield note, annotation

    def _has_text(self) -> bool:
        """Check that all notes have text"""
        return not any(note.text is None for note in self.notes)

    @staticmethod
    def from_json_dict(data: Dict) -> "Admission":
        return Admission(
            notes=[Note.from_json_dict(note) for note in data.pop("notes")],
            **data,
        )

    @staticmethod
    def from_json_file(file_path: Path) -> "Admission":
        with open(file_path, "r", encoding="utf8") as ifp:
            return Admission.from_json_dict(json.load(ifp))


In [61]:
# Dataclass for representing MDACE data
@dataclass(frozen=True)
class MDACEData:
    admissions: List[Admission]

    @property
    def __len__(self) -> int:
        return sum((1 for _ in iter(self)))

    @staticmethod
    def from_dir(dataset_dir: Path, require_text: bool = True) -> "MDACEData":
        try:
            next(dataset_dir.glob("*.json"))
        except StopIteration:
            raise ValueError(f"No JSON files found in path {dataset_dir.absolute()}")

        admissions = list()
        for json_file in dataset_dir.glob("*.json"):
            with open(json_file, "r", encoding="utf8") as ifp:
                adm = Admission.from_json_dict(json.load(ifp))
                if require_text and not adm._has_text():
                    raise ValueError(
                        f"Admission {adm.hadm_id} is missing note text. "
                        f"Please run: python inject-note-text.py --noteevents NOTEEVENTS.csv --data-dir {dataset_dir}"
                    )
                admissions.append(adm)

        return MDACEData(admissions)

    def __iter__(self) -> Iterator[Tuple[Admission, Note, Annotation]]:
        """Iterate over Annotations; include information from Admission and Note"""
        for admission in self.admissions:
            for note, annotation in admission:
                yield admission, note, annotation


##########################                                   END DATA.py                              ############################

##########################                                   start text.py                            ############################

In [62]:
import dataclasses
import re
from typing import List, Callable

In [63]:
# Regular expression pattern to match word tokens
TOKEN_PATTERN = re.compile(r"\w+", flags=re.UNICODE | re.MULTILINE | re.DOTALL)

In [64]:
def tokenize(text: str) -> List[Span]:
    """Tokenizes the given text into word tokens."""
    
    # Find all matches of the token pattern in the lowercase text
    matches = TOKEN_PATTERN.finditer(text.lower())

    # Create Span objects for each match
    spans = [Span(*match.span(), covered_text=match.group()) for match in matches]

    # Exclude numbers greater than 10 as per Mullenbach's recommendation
    spans = [
        span
        for span in spans
        if not span.covered_text.isdigit() or int(span.covered_text) <= 10
    ]

    return spans


In [65]:
def tokenize_annotation(
    annotation: Annotation, tokenize_fn: Callable[[str], List[Span]]
) -> List[Annotation]:
    """Tokenizes the covered text of an annotation."""
    
    # Check if the covered text is available for tokenization
    if annotation.span.covered_text is None:
        raise ValueError(
            "Cannot tokenize annotations without text -- run inject-note-text.py"
        )

    token_offset = annotation.span.begin
    
    # Tokenize the covered text and adjust span positions
    return [
        dataclasses.replace(
            annotation,
            span=dataclasses.replace(
                span, begin=token_offset + span.begin, end=token_offset + span.end
            ),
        )
        for span in tokenize_fn(annotation.span.covered_text)
    ]


In [66]:
def tokenize_annotations(
    annotations: List[Annotation], tokenize_fn: Callable[[str], List[Span]]
) -> List[Annotation]:
    """Tokenizes a list of annotations."""
    
    flat = list()
    for a in annotations:
        flat.extend(tokenize_annotation(a, tokenize_fn))
    return flat


In [67]:
def tokenize_admission(
    admission: Admission, tokenize_fn: Callable[[str], List[Span]]
) -> Admission:
    """Tokenizes all annotations within an admission."""
    
    return dataclasses.replace(
        admission,
        notes=[
            dataclasses.replace(
                note, annotations=tokenize_annotations(note.annotations, tokenize_fn)
            )
            for note in admission.notes
        ],
    )


#################################                         text.py end                               #########################