In [None]:
import json
import re
from pathlib import Path
from typing import ClassVar, Dict, List

import openpyxl
import PyPDF2
import tiktoken
from pydantic import BaseModel
from unidecode import unidecode

In [None]:
GUIDELINES_DIR = Path.cwd().parent / "guidelines"
GUIDELINES_EXCEL_PATH = GUIDELINES_DIR / "eDischarge-Summary-v2.1-1st-Feb-21.xlsx"
GUIDELINES_IMPLEMENTATION_PDF_PATH = (
    GUIDELINES_DIR
    / "eDischarge-Summary-Maintenance-Release-Implementation-Guidance-Report-v2.1-23.1.19.pdf"
)

GUIDELINES_JSON_PATH = GUIDELINES_DIR / "eDischarge-Summary-v2.1-1st-Feb-21_schema.json"

DATABASE_SECTIONS = {
    "Patient demographics",
    "GP practice",
    "Referrer detailsDischarge details",
    "Medications and Medical Devices",
    "Person completing record",
    "Distribution list",
}


TOKENIZER = tiktoken.get_encoding("cl100k_base")

In [None]:
sheet = openpyxl.load_workbook(GUIDELINES_EXCEL_PATH)["Sheet1"]
row_records = list(sheet.iter_rows(values_only=True, min_row=4))

In [None]:
def to_camel_case(text: str) -> str:
    return re.sub(r"\W+", "_", text.strip().lower())


class Row(BaseModel):
    name: str
    description: str
    cardinality: str
    data_type: str
    values: str
    do_not_use: bool

    def from_record(row: List[str]) -> "Row":
        cleaned_values = [unidecode(value).strip() if value else "" for value in row]
        cleaned_values[0] = to_camel_case(cleaned_values[0])
        return Row(
            name=cleaned_values[0],
            description=cleaned_values[1],
            cardinality=cleaned_values[2],
            data_type=cleaned_values[3],
            values=cleaned_values[4],
            do_not_use=(cleaned_values[5] == "Y"),
        )


class SectionRows(BaseModel):
    SECTION_ROW_IDX: ClassVar[int] = 1
    ELEMENT_HEADER_ROW_IDX: ClassVar[int] = 2

    section_row: Row
    element_rows: List[Row]

    @classmethod
    def from_record(cls, section_row_records: List[List[str]]) -> "SectionRows":
        return SectionRows(
            section_row=Row.from_record(
                section_row_records[SectionRows.SECTION_ROW_IDX]
            ),
            element_rows=[
                Row.from_record(row_record)
                for row_record in section_row_records[
                    SectionRows.ELEMENT_HEADER_ROW_IDX + 1 :
                ]
            ],
        )


def get_cluster_rows(rows: List[Row]) -> List[Row]:
    cluster_rows = []
    if len(rows) <= 1:
        raise ValueError(f"Only the header of a cluster was found. {rows}")
    # First row is the header
    for row in rows[1:]:
        # Assume second row is start of cluster (if record entry is a cluster)
        if row.name == f"end_of_{rows[0].name}":
            break
        cluster_rows.append(row)

    return cluster_rows


def row_to_string_schema(row: Row) -> Dict:
    return {
        "description": row.description,
        "type": "string",
    }


def row_to_array_schema(row: Row) -> Dict:
    return {
        "description": row.description,
        "type": "array",
        "items": {
            "type": "string",
        },
    }


def rows_to_object_schema(rows: List[Row]) -> Dict:
    return {"type": "object", "properties": element_rows_to_json_schema(rows)}


def element_rows_to_json_schema(element_rows: List[Row]) -> Dict:
    element_schema = {}

    row_idx = 0
    while row_idx < len(element_rows):
        element_row = element_rows[row_idx]
        if element_row.do_not_use:
            row_idx += 1
            continue
        elif not element_row.values and "record entry" in element_row.description:
            record_entry_rows = element_rows[row_idx + 1 :]
            if len(record_entry_rows) == 1:
                items = row_to_string_schema(record_entry_rows[0])
            else:
                items = rows_to_object_schema(record_entry_rows)
            row_schema = {
                "description": element_row.description,
                "type": "array",
                "items": items,
            }
            row_idx = len(element_rows)
        elif element_row.name.endswith("item_entry"):
            cluster_rows = get_cluster_rows(element_rows[row_idx + 1 :])
            cluster_schema = rows_to_object_schema(cluster_rows)
            row_schema = {
                "description": element_row.description,
                "type": "array",
                "items": cluster_schema,
            }
            row_idx += len(cluster_rows) + 2
        elif element_row.cardinality.startswith("0 to many"):
            row_schema = row_to_array_schema(element_row)
        elif element_row.name.endswith("cluster"):
            cluster_rows = get_cluster_rows(element_rows[row_idx:])
            # Needs description
            row_schema = rows_to_object_schema(cluster_rows)
            row_idx += len(cluster_rows) + 1
        else:
            row_schema = row_to_string_schema(element_row)

        element_schema[element_row.name] = row_schema
        row_idx += 1
    return element_schema


def create_section_json_schema_from_rows(section_rows: SectionRows) -> Dict:
    elements_schema = element_rows_to_json_schema(section_rows.element_rows)

    if (
        len(elements_schema) == 1
        and list(elements_schema.keys())[0] == section_rows.section_row.name
    ):
        # If only one element squash the section and element schema together
        single_element_schema = next(iter(elements_schema.values()))
        joint_description = " ".join(
            text
            for text in (
                section_rows.section_row.description,
                single_element_schema.pop("description", ""),
            )
        )
        return {
            "description": joint_description,
            **single_element_schema,
        }
    else:
        return {
            "description": section_rows.section_row.description,
            "type": "object",
            "properties": elements_schema,
        }


def create_schema_from_row_records(row_records: List[List[str]]) -> Dict:
    # Sections separated by empty rows
    sections = []
    section_row_records: List[List[str]] = []
    for row in row_records:
        if all(element is None for element in row):
            sections.append(SectionRows.from_record(section_row_records))
            section_row_records = []
        else:
            section_row_records.append(row)
    sections.append(SectionRows.from_record(section_row_records))

    sections_schema = {
        section.section_row.name: create_section_json_schema_from_rows(section)
        for section in sections
    }

    return {"type": "object", "properties": sections_schema}

In [None]:
schema = create_schema_from_row_records(row_records)
GUIDELINES_JSON_PATH.write_text(json.dumps(schema, indent=4))

In [None]:
GUIDELINES_JSON_PATH

In [None]:
camelcase_database_sections = {to_camel_case(section) for section in DATABASE_SECTIONS}
schema["properties"] = {
    k: v
    for k, v in schema["properties"].items()
    if k not in camelcase_database_sections
}

In [None]:
len(TOKENIZER.encode(json.dumps(schema)))

## PDF Parsing

In [None]:
reader = PyPDF2.PdfReader(GUIDELINES_IMPLEMENTATION_PDF_PATH)

In [None]:
text = [line for page in reader.pages for line in page.extract_text().split("\n")]
text = [
    re.sub(
        (
            "(PRSB eDischarge Summary  – Implementation Guidance  V2.1)|(January 2019 "
            r" Page \d+  )|(January 2019  Page \d+  )"
        ),
        "",
        line,
    ).strip()
    for line in text
]
text = [re.sub(" {2,}", " ", line) for line in text]
text = [re.sub("reco rd", "record", line) for line in text]
text = [line for line in text if line]

In [None]:
last_heading_idx = len(text)
heading_to_text = {}
for heading_idx, line in enumerate(reversed(text)):
    if re.match(r"^4\.\d+ [A-Za-z ]+$", line):
        section_text = "\n".join(text[len(text) - heading_idx : last_heading_idx])
        section_text = re.sub(r"\d+.\d+.\d+ ", "", section_text)
        section_text = re.sub("\n(?=[a-z])", " ", section_text)
        section_text = unidecode(section_text)
        heading = to_camel_case(re.sub(r"\d+.\d+ ", "", line))

        heading_to_text[heading] = section_text
        last_heading_idx = len(text) - heading_idx - 1

In [None]:
for property_heading, property_body in schema["properties"].items():
    if property_heading in heading_to_text:
        property_body["description"] += f"\n{heading_to_text[property_heading]}"

In [None]:
len(TOKENIZER.encode(json.dumps(schema)))

In [None]:
GUIDELINES_JSON_PATH.write_text(json.dumps(schema, indent=4))

In [None]:
GUIDELINES_JSON_PATH