# Patient Data Export

A notebook exporting patient data for study analyses.

## Definitions

### Imports

In [None]:
import dataclasses
import io
import itertools
import json
import operator
import pathlib
import re
from enum import Enum
from typing import List, Optional, Union

import IPython.display
import ipywidgets
import nbformat
import pandas as pd
import pyzipper
from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
from scope.documents import document_set
from scope.populate.data.archive import Archive

### Constants and Flags

In [None]:
# In development, it can be helpful to sample a subset of patients.
# If DEVELOPMENT_SAMPLE_PATIENTS <= 0, process all patients.
# If DEVELOPMENT_SAMPLE_PATIENTS > 0, randomly sample DEVELOPMENT_SAMPLE_PATIENTS patients.
DEVELOPMENT_SAMPLE_PATIENTS: int = 10

DISPLAY_PREPARE_PATIENTS: bool = False

### Utilities

### Utility: excel_dataframe

Returns bytes containing an Excel export of a dataframe.

In [None]:
def excel_dataframe(df: pd.DataFrame) -> bytes:
    iobytes = io.BytesIO()
    df.to_excel(iobytes, index=False)

    return iobytes.getvalue()

### Utility: markdown_documentation

Returns a string containing markdown content recovered from a cell in this notebook.

Intended to allow including the content of markdown cells as documentation in an export.

In [None]:
def markdown_documentation(documentation_name: str) -> str:
    # Load this same notebook.
    notebook = nbformat.read("patientdata.ipynb", nbformat.NO_CONVERT)

    # Go through each cell, looking for a match.
    for cell_current in notebook["cells"]:
        match = True
        if match:
            match = cell_current["cell_type"] == "markdown"
        if match:
            match = re.match(
                "^(#*) Documentation: ({})\\n(.*)".format(documentation_name),
                cell_current["source"],
            )

        if match:
            return cell_current["source"]

    # If no match was found, raise a ValueError.
    raise ValueError(
        "No matching documentation cell found: {}".format(documentation_name)
    )

### Utility: patient_data_export_file

The path and contents of a file to be exported.

In [None]:
class ExportFileType(Enum):
    EXCEL = "EXCEL"
    MARKDOWN = "MARKDOWN"


@dataclasses.dataclass(frozen=True)
class ExportFile:
    path: pathlib.Path
    type: ExportFileType
    bytes: Optional[bytes]
    text: Optional[str]

    @classmethod
    def from_excel(
        cls,
        path: Union[pathlib.Path, str],
        excel: bytes,
    ):
        return ExportFile(
            path=pathlib.Path(path),
            type=ExportFileType.EXCEL,
            bytes=excel,
            text=None,
        )

    @classmethod
    def from_markdown(
        cls,
        path: Union[pathlib.Path, str],
        markdown: str,
    ):
        return ExportFile(
            path=pathlib.Path(path),
            type=ExportFileType.MARKDOWN,
            bytes=None,
            text=markdown,
        )


def patient_data_export_file(file: ExportFile):
    patient_data_export_file_list.append(file)

### Utility: dataframe_sanitize

Sanitize contents of a dataframe that otherwise cannot be written to Excel.

In [None]:
def dataframe_sanitize(df: pd.DataFrame) -> pd.DataFrame:
    def sanitize_cell(value):
        if type(value) == str:
            value = ILLEGAL_CHARACTERS_RE.sub("?", value)
    
        return value

    return df.map(sanitize_cell)

## Input

### Input Archive Suffix: archive_suffix 

In [None]:
# Obtain suffix indicating desired version of encrypted archives.
# Do not include the '.zip' suffix.
archive_suffix = input("Encrypted archive suffix: ")

### Input Archive Password: archive_password

In [None]:
# Obtain password to encrypted archives.
archive_password = input("Encrypted archive password: ")

### Utility: archive_dir_path

In [None]:
# Obtain password to encrypted archives.
archive_dir_path = "../../../secrets/data"

## Load Archives

### Decrypt Archives

In [None]:
def decrypt_archives():
    # Obtain name for each archive.
    archive_multicare_file_name = "archive_multicare_{}.zip".format(archive_suffix)
    archive_scca_file_name = "archive_scca_{}.zip".format(archive_suffix)

    # Obtain a full path to encrypted archive, relative to the location of the notebook.
    # Expects the encrypted archive to be in the "secrets/data" directory.
    archive_multicare_path = pathlib.Path(
        archive_dir_path,
        archive_multicare_file_name,
    )
    archive_scca_path = pathlib.Path(
        archive_dir_path,
        archive_scca_file_name,
    )

    print("Decrypting archive:")
    print("{}".format(archive_multicare_path.resolve()))

    # Obtain the archive.
    archive_multicare = Archive.read_archive(
        archive_path=archive_multicare_path,
        password=archive_password,
    )

    print("{} documents.".format(len(archive_multicare.entries.values())))
    print("")
    print("Decrypting archive:")
    print("{}".format(archive_scca_path.resolve()))

    # Obtain the archive.
    archive_scca = Archive.read_archive(
        archive_path=archive_scca_path,
        password=archive_password,
    )

    print("{} documents.".format(len(archive_scca.entries.values())))

    return archive_multicare, archive_scca


archive_multicare, archive_scca = decrypt_archives()

### Process Archives

#### Combine Patients DataFrames

In [None]:
def combine_patients_dataframes():
    # Get patient documents from MultiCare.
    documents_multicare_patients = (
        archive_multicare.collection_documents(
            collection="patients",
        )
        .remove_sentinel()
        .remove_revisions()
    )
    df_multicare_patients = pd.DataFrame.from_records(
        documents_multicare_patients.documents
    )
    df_multicare_patients["database"] = "multicare"

    # Get patient documents from SCCA.
    documents_scca_patients = (
        archive_scca.collection_documents(
            collection="patients",
        )
        .remove_sentinel()
        .remove_revisions()
    )
    df_scca_patients = pd.DataFrame.from_records(documents_scca_patients.documents)
    df_scca_patients["database"] = "fhcc"

    # Unify all current patient documents.
    df_combined_patients = pd.concat(
        [df_multicare_patients, df_scca_patients]
    ).reset_index(drop=True)

    # Sanitize once so contents dataframe can be exported.
    df_combined_patients = dataframe_sanitize(df_combined_patients)

    return df_combined_patients


df_archive_patients_raw = combine_patients_dataframes()

#### Reset Filtering and Sampling

In [None]:
df_archive_patients = df_archive_patients_raw.copy()

#### Filter Pilot Patients

Remove the 6 pilot patients.

In [None]:
df_archive_patients = df_archive_patients.drop(
    df_archive_patients[
        df_archive_patients["patientId"].isin(
            [
                "ymzwx6e6w6kqi",
                "mmmb54v52l7re",
                "ouoa4ucldbhie",
                "zazst4yu23a5q",
                "wf4btxqjtd2oa",
                "s3bcmgmp7gdss",
            ]
        )
    ].index
).reset_index(drop=True)

#### Sample Patients

In development, it can be helpful to sample a subset of patients.

In [None]:
if DEVELOPMENT_SAMPLE_PATIENTS > 0:
    df_archive_patients = df_archive_patients.sample(
        n=DEVELOPMENT_SAMPLE_PATIENTS
    ).reset_index(drop=True)

#### Utility: patient_documents

In [None]:
# Create a helper for accessing the document collection of a unified patient.
def patient_documents(row_patient) -> document_set.DocumentSet:
    if row_patient["database"] == "multicare":
        archive = archive_multicare
    elif row_patient["database"] == "fhcc":
        archive = archive_scca
    else:
        raise ValueError()

    return archive.collection_documents(collection=row_patient["collection"])

## Prepare Data

### Prepare Patients

#### Documentation: Patients

- An export of all patientIdentity documents.
  These are stored separately from the documents associated with each patient.
  They provide an index of all patients, then we can access documents for an individual patient.

- A patientIdentity may have been modified throughout the study (e.g., to change a patient name or email address).
  If it was modified, this export includes only the final version of that document.
  There is therefore exactly one row per patient.

- Several fields are removed to reduce clutter:
  - `_id`
  - `_rev`
  - `_set_id`
  - `cognitoAccount`
- Several identifiers are removed:
  - `email`, as part of `cognitoAccount`
  - `name`

- Although it appears to be clutter, `collection` is used by later processing and should not be removed.

#### Data: df_patients_raw

A dataframe containing the identity of each patient.

In [None]:
df_patients_raw = df_archive_patients.copy()

#### Data: df_patients

A dataframe containing the identity of each patient.

In [None]:
df_patients = df_patients_raw.copy()

if DISPLAY_PREPARE_PATIENTS:
    IPython.display.display(df_patients_raw)

In [None]:
df_patients = df_patients.drop(
    [
        # Remove clutter.
        "_id",
        "_rev",
        "_set_id",
        # Remove identifiers.
        "cognitoAccount",  # Includes email.
        "name",
    ],
    axis=1,
)

# Order columns for readability.
df_patients = df_patients[
    [
        "_type",
        "database",
        "patientId",
        "collection",
        "MRN",
    ]
]

# Sort rows for readability.
df_patients = df_patients.sort_values(
    [
        "_type",
        "database",
        "patientId",
    ]
)

if DISPLAY_PREPARE_PATIENTS:
    IPython.display.display(df_patients)

### Prepare Per-Patient DocumentSets and DataFrames

#### Data: patient_id_to_documentset

In [None]:
def prepare_patient_id_to_documentset():
    patient_id_to_documentset = {}

    progress_max = len(df_patients)
    progress_patient_count = ipywidgets.IntProgress(min=0, max=progress_max)

    print("Preparing DocumentSet for {} Patients".format(progress_max))
    IPython.display.display(progress_patient_count)

    progress_patient_count.description = "{}/{}".format(0, progress_max)
    for patient_count, (row_current, patient_current) in enumerate(
        df_patients.iterrows()
    ):
        patient_id_current = patient_current["patientId"]
        patient_collection = patient_documents(patient_current.to_dict())

        patient_id_to_documentset[patient_id_current] = patient_collection

        progress_patient_count.description = "{}/{}".format(
            patient_count + 1, progress_max
        )
        progress_patient_count.value = patient_count + 1

    return patient_id_to_documentset


patient_id_to_documentset = prepare_patient_id_to_documentset()

#### Transform: transform_add_patient_id

Add a `patientId` column to each document.

In [None]:
def transform_add_patient_id(
    df_documents: pd.DataFrame,
    *,
    patient_id,
) -> pd.DataFrame:
    df_documents = df_documents.copy()
    df_documents["patientId"] = patient_id

    df_documents = dataframe_format_export(
        df_documents,
        drop_empty_columns=False,
        drop_columns=[],
        sort_columns=["patientId"],
        sort_rows_by_columns=[],
    )

    return df_documents

#### Data: patient_id_to_df_documents_raw

In [None]:
def prepare_patient_id_to_df_documents_raw():
    patient_id_to_df_documents_raw = {}

    progress_max = len(patient_id_to_documentset)
    progress_patient_count = ipywidgets.IntProgress(min=0, max=progress_max)

    print("Preparing DataFrame for {} Patients".format(progress_max))
    IPython.display.display(progress_patient_count)

    progress_patient_count.description = "{}/{}".format(0, progress_max)
    for patient_count, (patient_id_current, patient_documentset_current) in enumerate(
        patient_id_to_documentset.items()
    ):
        df_documents_raw_current = pd.DataFrame.from_records(
            patient_documentset_current.documents
        )
        df_documents_raw_current = dataframe_sanitize(df_documents_raw_current)

        # Apply a minimal transform to the "raw" documents.
        df_documents_raw_current = transform_add_patient_id(
            df_documents_raw_current,
            patient_id=patient_id_current
        )

        patient_id_to_df_documents_raw[patient_id_current] = df_documents_raw_current

        progress_patient_count.description = "{}/{}".format(
            patient_count + 1, progress_max
        )
        progress_patient_count.value = patient_count + 1

    return patient_id_to_df_documents_raw


patient_id_to_df_documents_raw = prepare_patient_id_to_df_documents_raw()

In [None]:
df_documents_raw = pd.concat(patient_id_to_df_documents_raw.values(), ignore_index=True)

## Transform Documents

### Transform: transform_assessment_log

In [None]:
def transform_assessment_log(
    df_documents: pd.DataFrame,
) -> pd.DataFrame:
    # Pull each value of the gad-7 assessment scale out to its own column.
    def _transform_gad7_points(
        df_documents: pd.DataFrame,
    ) -> pd.DataFrame:
        def _factory_transform_gad7_points_key(keyJson):
            def _transform_gad7_points_key(row):
                if row["_type"] != "assessmentLog":
                    return None
                if row["assessmentId"] != "gad-7":
                    return None
                if not row["pointValues"]:
                    return None

                return row["pointValues"][keyJson]

            return _transform_gad7_points_key

        gad7EnumMap = {
            "Anxious": "gad7Anxious",
            "Constant worrying": "gad7ConstantWorrying",
            "Worrying too much": "gad7WorryingTooMuch",
            "Trouble relaxing": "gad7TroubleRelaxing",
            "Restless": "gad7Restless",
            "Irritable": "gad7Irritable",
            "Afraid": "gad7Afraid",
        }

        for (keyJson, keyExport) in gad7EnumMap.items():
            df_documents[keyExport] = df_documents.apply(
                _factory_transform_gad7_points_key(keyJson), axis=1
            )

        return df_documents

    # Obtain or calculate a gad-7 score.
    def _transform_gad7_score(row):
        if row["_type"] != "assessmentLog":
            return None
        if row["assessmentId"] != "gad-7":
            return None

        # Some rows already provide a totalScore.
        if "totalScore" in row and not pd.isna(row["totalScore"]):
            return row["totalScore"]

        # Otherwise we need to sum the pointValues.
        if "pointValues" in row and row["pointValues"]:
            return sum(row["pointValues"].values())

        # We should always have one or the other.
        raise ValueError()

    # Pull each value of the phq-9 assessment scale out to its own column.
    def _transform_phq9_points(
        df_documents: pd.DataFrame,
    ) -> pd.DataFrame:
        def _factory_transform_phq9_points_key(keyJson):
            def _transform_phq9_points_key(row):
                if row["_type"] != "assessmentLog":
                    return None
                if row["assessmentId"] != "phq-9":
                    return None
                if not row["pointValues"]:
                    return None

                return row["pointValues"][keyJson]

            return _transform_phq9_points_key

        phq9EnumMap = {
            "Interest": "phq9Interest",
            "Mood": "phq9Mood",
            "Sleep": "phq9Sleep",
            "Energy": "phq9Energy",
            "Appetite": "phq9Appetite",
            "Guilt": "phq9Guilt",
            "Concentrating": "phq9Concentrating",
            "Motor": "phq9Motor",
            "Suicide": "phq9Suicide",
        }

        for (keyJson, keyExport) in phq9EnumMap.items():
            df_documents[keyExport] = df_documents.apply(
                _factory_transform_phq9_points_key(keyJson), axis=1
            )

        return df_documents

    # Obtain or calculate a phq-9 score.
    def _transform_phq9_score(row):
        if row["_type"] != "assessmentLog":
            return None
        if row["assessmentId"] != "phq-9":
            return None

        # Some rows already provide a totalScore.
        if "totalScore" in row and not pd.isna(row["totalScore"]):
            return row["totalScore"]

        # Otherwise we need to sum the pointValues.
        if "pointValues" in row and row["pointValues"]:
            return sum(row["pointValues"].values())

        # We should always have one or the other.
        raise ValueError()

    df_documents = df_documents.copy()
    df_documents = _transform_gad7_points(df_documents)
    df_documents["gad7Score"] = df_documents.apply(_transform_gad7_score, axis=1)
    df_documents = _transform_phq9_points(df_documents)
    df_documents["phq9Score"] = df_documents.apply(_transform_phq9_score, axis=1)

    return df_documents

### Utility: apply_transforms

In [None]:
def apply_transforms(
    df_documents: pd.DataFrame,
) -> pd.DataFrame:
    df_documents = transform_assessment_log(
        df_documents,
    )
    
    return df_documents

### Data: patient_id_to_df_documents

In [None]:
def prepare_transform_patient_documents():
    patient_id_to_df_documents = {}

    progress_max = len(patient_id_to_df_documents_raw)
    progress_patient_count = ipywidgets.IntProgress(min=0, max=progress_max)

    print("Transforming DataFrame for {} Patients".format(progress_max))
    IPython.display.display(progress_patient_count)

    progress_patient_count.description = "{}/{}".format(0, progress_max)
    for patient_count, (patient_id_current, df_documents_raw_current) in enumerate(
        patient_id_to_df_documents_raw.items()
    ):
        patient_id_to_df_documents[patient_id_current] = apply_transforms(
            df_documents_raw_current.copy(),
        )

        progress_patient_count.description = "{}/{}".format(
            patient_count + 1, progress_max
        )
        progress_patient_count.value = patient_count + 1

    return patient_id_to_df_documents


patient_id_to_df_documents = prepare_transform_patient_documents()

### Prepare Combined Documents DataFrame

#### Data: df_documents_raw

In [None]:
df_documents_raw = pd.concat(patient_id_to_df_documents_raw.values(), ignore_index=True)

#### Data: df_documents

In [None]:
df_documents = pd.concat(patient_id_to_df_documents.values(), ignore_index=True)

## Export

### Utility: dataframe_format_export

Formats a dataframe for export.

In [None]:
def dataframe_format_export(
    df: pd.DataFrame,
    *,
    drop_empty_columns: bool,
    drop_columns: List[str],
    sort_columns: List[str],
    sort_rows_by_columns: List[str],
) -> pd.DataFrame:
    # If requested, drop empty columns.
    if drop_empty_columns:
        empty_columns = []
        for column_current in df.columns:
            if (
                df[column_current].isnull().all()
                or (
                    df[column_current].astype(str).str.strip().isin(["", "nan", "None"])
                ).all()
            ):
                empty_columns.append(column_current)

        df = df.drop(columns=empty_columns)

    # If requested, drop specific columns.
    # Be robust to the possibility that a column is not present.
    if drop_columns:
        df = df.drop(columns=drop_columns, errors="ignore")

    # If requested, sort specific columns to the front.
    # Be robust to the possibility that a column is not present.
    # Be robust to the presence of additional columns.
    # Preserve existing order of additional columns after requested columns.
    if sort_columns:
        sort_columns = [
            column_current
            for column_current in sort_columns
            if column_current in df.columns
        ]
        sort_columns = sort_columns + [
            column_current
            for column_current in df.columns
            if column_current not in sort_columns
        ]

        df = df[sort_columns]

    # If requested, sort rows by specific columns.
    # Be robust to the possibility that a column is not present.
    if sort_rows_by_columns:
        sort_rows_by_columns = [
            column_current
            for column_current in sort_rows_by_columns
            if column_current in df.columns
        ]

        df = df.sort_values(sort_rows_by_columns)
    
    return df

### Reset Export File List

In [None]:
patient_data_export_file_list: List[ExportFile] = []

### Documentation: Export

- Data is originally taken from two database exports: one from FHCC and one from MultiCare.
  - These raw data are merged, then a `database` column is added to indicate the origin of each patient.
  - There were 6 pilot patients. These have been completely removed.

- `patients.xlsx` is a list of all patients included in an export.
  - Documentation is in `patients.md`.
  - If included, `patients.raw.xlsx` is an unprocessed version of the same document.

- `patients` is a folder, contains a folder for each patient.
  - Patient folders are named `patient_{patientId}`.

In [None]:
patient_data_export_file(
    ExportFile.from_markdown(
        "documentation.md",
        markdown_documentation("Export"),
    )
)

### Patients

- Documented above in "Documentation: Patients Export".

In [None]:
patient_data_export_file(
    ExportFile.from_markdown(
        "patients.md",
        markdown_documentation("Patients"),
    )
)

patient_data_export_file(
    ExportFile.from_excel(
        "patients.xlsx",
        excel_dataframe(df_patients),
    )
)

patient_data_export_file(
    ExportFile.from_excel(
        pathlib.Path(
            "data",
            "patients.raw.xlsx",
        ),
        excel_dataframe(df_patients_raw),
    )
)

### Per-Patient Documents

In [None]:
def export_per_patient_documents():
    progress_max = len(df_patients)
    progress_patient_count = ipywidgets.IntProgress(min=0, max=progress_max)

    print("Per-Patient Export for {} Patients".format(progress_max))
    IPython.display.display(progress_patient_count)

    progress_patient_count.description = "{}/{}".format(0, progress_max)
    for patient_count, (row_current, patient_current) in enumerate(
        df_patients.iterrows()
    ):
        patient_id_current = patient_current["patientId"]

        df_documents_raw_current = patient_id_to_df_documents_raw[patient_id_current]
        patient_data_export_file(
            ExportFile.from_excel(
                pathlib.Path(
                    "data",
                    "patients",
                    "patient_{}".format(patient_id_current),
                    "patient_{}.raw.xlsx".format(patient_id_current),
                ),
                excel_dataframe(df_documents_raw_current),
            )
        )

        df_documents_current = patient_id_to_df_documents[patient_id_current]
        patient_data_export_file(
            ExportFile.from_excel(
                pathlib.Path(
                    "data",
                    "patients",
                    "patient_{}".format(patient_id_current),
                    "patient_{}.transformed.xlsx".format(patient_id_current),
                ),
                excel_dataframe(df_documents_current),
            )
        )

        progress_patient_count.description = "{}/{}".format(
            patient_count + 1, progress_max
        )
        progress_patient_count.value = patient_count + 1


export_per_patient_documents()

### Combined Documents

In [None]:
patient_data_export_file(
    ExportFile.from_excel(
        pathlib.Path(
            "data",
            "documents.raw.xlsx",
        ),
        excel_dataframe(df_documents_raw),
    )
)

patient_data_export_file(
    ExportFile.from_excel(
        pathlib.Path(
            "data",
            "documents.transformed.xlsx",
        ),
        excel_dataframe(df_documents),
    )
)

### Analysis: Assessments

In [None]:
def export_analysis_assessments():
    df_assessments = dataframe_format_export(
        df_documents[df_documents["_type"] == "assessmentLog"],
        drop_empty_columns=True,
        drop_columns=["_id", "_type", "_set_id"],
        sort_columns=["patientId", "assessmentId", "assessmentLogId", "_rev"],
        sort_rows_by_columns=["patientId", "assessmentId", "assessmentLogId", "_rev"],
    )

    patient_data_export_file(
        ExportFile.from_excel(
            "assessments.xlsx",
            excel_dataframe(df_assessments),
        )
    )


export_analysis_assessments()

### Write Archive

In [None]:
# The export is stored in a single zip file.
with open(
    pathlib.Path(
        archive_dir_path,
        "export_{}.zip".format(archive_suffix),
    ),
    mode="xb",
) as archive_file:
    with pyzipper.AESZipFile(
        archive_file,
        "w",
        compression=pyzipper.ZIP_LZMA,
        encryption=pyzipper.WZ_AES,
    ) as archive_zipfile:
        # Set the password
        archive_zipfile.setpassword(archive_password.encode("utf-8"))

        for file_current in patient_data_export_file_list:
            if file_current.type == ExportFileType.EXCEL:
                archive_zipfile.writestr(str(file_current.path), file_current.bytes)
            elif file_current.type == ExportFileType.MARKDOWN:
                archive_zipfile.writestr(
                    str(file_current.path), file_current.text.encode("utf-8")
                )
            else:
                raise ValueError("Unknown ExportFileType")