# How FEMR ETLs Work

In this notebook, we go through a toy example of writing a custom ETL for FEMR on a custom dataset. 

We will define a set of patients who have a unique set of associated clinical events, and then initialize a FEMR dataset with this data. 

Before going through this tutorial, please make sure you've installed **FEMR** as detailed in the [README.md](https://github.com/som-shahlab/femr)

#### Learning Goals.
1. Initialize a FEMR dataset from scratch with custom data.
2. Understand the three steps of the FEMR data pipeline: `EventCollection` => `PatientCollection` => `PatientDatabase`

In [1]:
import datetime
import os
import contextlib
import femr
import femr.datasets
import io
import zstandard
import csv
import functools
import shutil

# 0. Overview of **FEMR**

The **FEMR** workflow of creating patient timelines from an EHR dataset is as follows:

1. Extract clinical events from the source EHR database. This could be OMOP-CDM, MIMIC-III, eICU, etc.
    * This code is specific to the source EHR database, and is not covered in this tutorial.
    * We have already written extractors for some popular EHR databases in the `src/femr/extractors` folder.
2. Write these extracted events to disk as an `EventCollection`. 
    * In an abstract sense, an `EventCollection` is simply an unordered list of `(Patient ID, RawEvent)` tuples. 
    * Internally, **FEMR** will shard these tuples across a set of files on disk that will be stored in a single folder, but you don't need to worry about that.
3. Transform the `EventCollection` into a `PatientCollection`. 
    * In an abstract sense, a `PatientCollection` is a **sorted** list of `(Patient ID, Event)` tuples, where the sorting is done first by `Patient ID`, second by the `start` time of each `Event`. 
    * Internally, **FEMR** will shard these consecutively arranged tuples across a set of files on disk that will be stored in a single folder, but you don't need to worry about that.
4. Apply transformations to your patient timelines (e.g. move coding assignments to the end of their visits, move all events before a patient's birthdate to after their birthdate, etc.) that you want to your `PatientCollection` to generate a new `PatientCollection`.
5. Index and save your `PatientCollection` to disk as a `PatientDatabase`.
    * In an abstract sense, a `PatientDatabase` lets you quickly retrieve all the events for a given patient ID. 
    * Internally, **FEMR** will create another set of indexes and files on disk that will be stored in a single folder, but you don't need to worry about that.

Note that everything is done on disk (i.e. not in memory), so you can work with arbitrarily large datasets. 

However, this means that you must specify the location of the folders where you want to store your `EventCollection`, `PatientCollection`, and `PatientDatabase` on disk.

## 1. Define our dataset with `RawPatient` and `RawEvent` objects

#### RawPatients

FEMR represents every raw input patients with the `RawPatient` class. 

A `RawPatient` object contains the following two attributes:

* `patient_id` (str): a unique identifier for the patient
* `events` (list): a list of `RawEvent` objects associated with that patient

The definition of the `RawPatient` class can be [found here](https://github.com/som-shahlab/femr/blob/main/src/femr/datasets/types.py#L9)

#### RawEvents

FEMR represents raw input clinical events with the `RawEvent` object.

An `RawEvent` object can contain any number of arbitrary attributes, but it must have at least the following two attributes:

* `start` (datetime.datetime): the start time of the event
* `concept_id` (int): the concept id for that event

The definition of the `RawEvent` class can be [found here](https://github.com/som-shahlab/femr/blob/main/src/femr/datasets/types.py#L20)


Below, we'll create some events for some fictional patients.

In [2]:
events = [
    # This event contains the bare minimum attributes -- start and code.
    femr.datasets.RawEvent(
        start=datetime.datetime(2010, 1, 5),
        concept_id=2,
    ),
    # This event contains a couple custom attributes -- value and source_table.
    femr.datasets.RawEvent(
        start=datetime.datetime(2010, 1, 3, hour=10, minute=45),
        concept_id=2,
        value="test_value",
        source_table=None,
    ),
    # This event contains even more attributes.
    femr.datasets.RawEvent(
        start=datetime.datetime(2010, 1, 3, hour=10, minute=30),
        concept_id=0,
        value=34.0,
        source_table='visit',
        extra_attr=True,
    ),
]

Now that we have our events, let's create a couple fictional patients.

In [3]:
patients = [
    # Let's assign each patient a subset of our events for example purposes.
    femr.datasets.RawPatient(patient_id=0, events=events[:2]),
    femr.datasets.RawPatient(patient_id=1, events=events[1:]),
    femr.datasets.RawPatient(patient_id=10, events=events),
]

# Lets print out the events of one patient to see what they look like.
# Note that attributes associated with `None` (i.e. `source_table` in the 
# second event) won't be printed out, but they are still tracked internally by femr.
patients[0].events

[RawEvent(start=2010-01-05 00:00:00, concept_id=2),
 RawEvent(start=2010-01-03 10:45:00, concept_id=2, value=test_value)]

We can access a specific event's attributes as follows. Note that accessing an attribute that wasn't defined on an event will return `None` by default (rather than raise an exception).

In [4]:
for patient in patients:
    print(f"Patient {patient.patient_id}:")
    for idx, event in enumerate(patient.events):
        print(f"    Event #{idx}:")
        print(f"       Start = {event.start} | Concept_Id = {event.concept_id} | Value = {event.value} | Source Table = {event.source_table} | Extra Attr = {event.extra_attr}")

Patient 0:
    Event #0:
       Start = 2010-01-05 00:00:00 | Concept_Id = 2 | Value = None | Source Table = None | Extra Attr = None
    Event #1:
       Start = 2010-01-03 10:45:00 | Concept_Id = 2 | Value = test_value | Source Table = None | Extra Attr = None
Patient 1:
    Event #0:
       Start = 2010-01-03 10:45:00 | Concept_Id = 2 | Value = test_value | Source Table = None | Extra Attr = None
    Event #1:
       Start = 2010-01-03 10:30:00 | Concept_Id = 0 | Value = 34.0 | Source Table = visit | Extra Attr = True
Patient 10:
    Event #0:
       Start = 2010-01-05 00:00:00 | Concept_Id = 2 | Value = None | Source Table = None | Extra Attr = None
    Event #1:
       Start = 2010-01-03 10:45:00 | Concept_Id = 2 | Value = test_value | Source Table = None | Extra Attr = None
    Event #2:
       Start = 2010-01-03 10:30:00 | Concept_Id = 0 | Value = 34.0 | Source Table = visit | Extra Attr = True


## 2. Create an `EventCollection` from our events

The first step in generating a **FEMR** database is to first create an `EventCollection`. 

You can think of an `EventCollection` as simply an unordered list of all of our events.

An `EventCollection` is simply an unordered list of events, where each event is associated with a specific patient ID.

Because it is internally represented by **FEMR** as a folder containing multiple files across which events will be sharded, we must specify a **target directory** where **FEMR** can store our `EventCollection`.

We can create an `EventCollection` from a list of events as follows:

In [5]:
# Create directory to store EventCollection
target_directory = "../ignore/dataset_tutorial_target/"
if os.path.exists(target_directory):
    shutil.rmtree(target_directory)
os.makedirs(target_directory, exist_ok=True)

# Create EventCollection
event_collection = femr.datasets.EventCollection(
    os.path.join(target_directory, "events")
)

Now, we'll actually add `RawEvent`s into our `EventCollection`.

First, we must call `EventCollection.create_writer()` in order to create a writer object that will be used to write events to the `EventCollection` on disk (remember that everything in **femr** lives on disk). 

Then, we can call `add_event()` to add each individual event to our `EventCollection`.

In [6]:
# Add events to the EventCollection
with contextlib.closing(event_collection.create_writer()) as writer:
    # NOTE: We need to use the `create_writer()` handler to create
    # a writer object for adding events to the EventCollection
    # This will automatically create a new file for these events
    for patient in patients:
        for event in patient.events:
            # Note that these are getting written to disk as part of the EventCollection
            writer.add_event(patient_id=patient.patient_id, 
                             event=event)

In order to read events from an `EventCollection`, we use the `EventCollection.create_reader()` method to ingest events from disk.

In [7]:
# We need to create a reader object as an EventCollection will be natively stored on disk
# Note that (Patient ID, Event) tuples can be returned in any order -- femr makes no guarantees
with event_collection.reader() as reader:
    for event in reader:
        print(event)

(0, RawEvent(start=2010-01-05 00:00:00, concept_id=2))
(0, RawEvent(start=2010-01-03 10:45:00, concept_id=2, value=test_value))
(1, RawEvent(start=2010-01-03 10:45:00, concept_id=2, value=test_value))
(1, RawEvent(start=2010-01-03 10:30:00, concept_id=0, value=34.0, source_table=visit, extra_attr=True))
(10, RawEvent(start=2010-01-05 00:00:00, concept_id=2))
(10, RawEvent(start=2010-01-03 10:45:00, concept_id=2, value=test_value))
(10, RawEvent(start=2010-01-03 10:30:00, concept_id=0, value=34.0, source_table=visit, extra_attr=True))


# 3. Create a `PatientCollection` from our `EventCollection`

To recap, an `EventCollection` is simply an unordered list of **(Patient ID, Event)** tuples. 

Our next step is to sort this unordered list such that all of a patient's events are grouped together, and within each patient, all of their events are sorted chronologically.

In other words, we go from this `EventCollection`:

1. (Patient 1, Event @ 2020)
1. (Patient 2, Event @ 2021)
1. (Patient 1, Event @ 2020)
1. (Patient 3, Event @ 2020)
1. (Patient 1, Event @ 2019)
1. (Patient 1, Event @ 2021)
1. (Patient 2, Event @ 2019)

to this `PatientCollection`:

1. (Patient 1, Event @ 2019)
1. (Patient 1, Event @ 2020)
1. (Patient 1, Event @ 2020)
1. (Patient 1, Event @ 2021)
1. (Patient 2, Event @ 2019)
1. (Patient 2, Event @ 2021)
1. (Patient 3, Event @ 2020)

To accomplish this, we simply call the `EventCollection.to_patient_collection(path_to_dir)` method. We must provide it with `path_to_dir`, which is the directory where we want to store our `PatientCollection`.

In [8]:
patients = event_collection.to_patient_collection(
    os.path.join(target_directory, "patients"),
    num_threads=1,
)

In order to read events from a `PatientCollection`, we use the `PatientCollection.create_reader()` method to ingest events from disk.

Wheras the `EventCollection` returns a single event with each iteration, the `PatientCollection` returns a single **patient** with each iteration. 

Each patient can contain an arbitrary number of events in its `.events` property. These events are guaranteed to be sorted in chronological order, from least recent to most recent events.

In [9]:
# We need to create a reader object to read our PatientCollection from disk
# Note that a Patient object is returned, and we can access its events in chronological 
# order by looping through the .events property of the patient
with patients.reader() as reader:
    for patient in reader:
        print('Patient:', patient.patient_id)
        for event in patient.events:
            print('\t', event)

Patient: 0
	 RawEvent(start=2010-01-03 10:45:00, concept_id=2, value=test_value)
	 RawEvent(start=2010-01-05 00:00:00, concept_id=2)
Patient: 1
	 RawEvent(start=2010-01-03 10:30:00, concept_id=0, value=34.0, source_table=visit, extra_attr=True)
	 RawEvent(start=2010-01-03 10:45:00, concept_id=2, value=test_value)
Patient: 10
	 RawEvent(start=2010-01-03 10:30:00, concept_id=0, value=34.0, source_table=visit, extra_attr=True)
	 RawEvent(start=2010-01-03 10:45:00, concept_id=2, value=test_value)
	 RawEvent(start=2010-01-05 00:00:00, concept_id=2)


# 4. Apply transformations to our `PatientCollection`

Now that we've ingested our raw EHR data, we will often want to apply transformations to this data in order to make it more useful for our downstream analyses.

For example, moving ICD codes to the end of a patient's visit, moving all events before a patient's birthdate to after their birthdate, dropping events after a patient's death is recorded, deduplicating overlapping visits, etc.

FEMR follows the "configuration as code" mindset by forcing you to explicitly define the transformations you apply to your dataset as a set of composable Python functions.

Each transformation should take as input a single `Patient` object and return either a new `Patient` object or `None` (the latter is used to indicate that the patient should be dropped from the dataset).

In [10]:
# This transformation function will remove all events from 
# the input Patientwith that have a value of "test_value"
def transform_event_value(input: femr.Patient) -> femr.Patient:
    return femr.Patient(
        patient_id=input.patient_id,
        events=[
            a
            for a in input.events
            if a.value != "test_value" # Remove test_value for some reason
        ],
    )

# Apply transformation to each Patient in our PatientCollection
transformed_patients: femr.datasets.PatientCollection = patients.transform(
    os.path.join(target_directory, "transformed_patients"),
    transform_event_value,
    num_threads=1,
)

Now that we've written our transformed patients to disk, we can read them back in as usual using our `PatientCollection.reader()` function:

In [11]:
# Check that our transformation worked -- we shouldn't
# see any events with a value of "test_value" in our output
with transformed_patients.reader() as reader:
    for patient in reader:
        print('Patient:', patient.patient_id)
        for event in patient.events:
            print('\t', event)

Patient: 0
	 RawEvent(start=2010-01-05 00:00:00, concept_id=2)
Patient: 1
	 RawEvent(start=2010-01-03 10:30:00, concept_id=0, value=34.0, source_table=visit, extra_attr=True)
Patient: 10
	 RawEvent(start=2010-01-03 10:30:00, concept_id=0, value=34.0, source_table=visit, extra_attr=True)
	 RawEvent(start=2010-01-05 00:00:00, concept_id=2)


# 5. Create a `PatientDatabase` from our `PatientCollection`

Now that we've loaded, cleaned, and transformed our EHR data, our final step is to save a fast, indexed version of it to disk so that we can quickly iterate over and retrieve patients when we train our machine learning model.

FEMR refers to this fast, indexed version of our dataset as a `PatientDatabase`.

We can create a `PatientDatabase` as follows:
1. Create a `PatientCollection` object (already done)
2. Create an `Ontology` folder containing the unique Event codes that appear in our dataset (need to do)
3. Create a `PatientDatabase` object from our `PatientCollection` object and `Ontology` (need to do)

We will create our `Ontology` by creating two files on disk -- `concept.csv` and `concept_relationship.csv`. We will pass the paths to these files to the `PatientDatabase` constructor.

All unique event codes in our dataset must be present in the `concept.csv` file.

In [12]:
def create_ontology(path_to_ontology_dir: str, concepts = []):
    path_to_concept_file: str = os.path.join(
        path_to_ontology_dir, "concept", "concept.csv.zst"
    )
    os.makedirs(os.path.dirname(path_to_concept_file), exist_ok=True)
    os.makedirs(
        os.path.join(path_to_ontology_dir + "/concept_relationship/"),
        exist_ok=True,
    )

    concept_map = {}

    with io.TextIOWrapper(
        zstandard.ZstdCompressor(1).stream_writer(
            open(path_to_concept_file, "wb")
        )
    ) as o:
        writer = csv.DictWriter(
            o,
            fieldnames=[ 
                        "concept_id", "concept_name", "domain_id", "vocabulary_id", 
                        "concept_class_id", "standard_concept", "concept_code", 
                        "valid_start_DATE", "valid_end_DATE", "invalid_reason", 
                        "load_table_id", "load_row_id",
            ],
        )
        writer.writeheader()

        next_code: int = 0
        for i, c in enumerate(concepts):
            code: int = i + next_code
            concept_map[c] = code
            writer.writerow(
                {
                    "concept_id": str(code), "concept_name": c, "domain_id": "Observation", 
                    "vocabulary_id": "dummy", "concept_class_id": "Observation", "standard_concept": "", 
                    "concept_code": c, "valid_start_DATE": "1970-01-01", "valid_end_DATE": "2099-12-31", 
                    "invalid_reason": "", "load_table_id": "custom_mapping", "load_row_id": "",
                }
            )
    return concept_map

# Create fake ontology
path_to_ontology = os.path.join(target_directory, "ontology")
concepts = set()
with transformed_patients.reader() as reader:
    for patient in reader:
        for e in patient.events:
            concepts.add(e.code)
concepts = sorted(list(concepts))
concept_map = create_ontology(path_to_ontology, concepts)

# Remap codes per our fake ontology
def transform_event_codes_using_ontology(input: femr.Patient, concept_map: dict) -> femr.Patient:
    for e in input.events:
        e.code = concept_map[e.code]
    return input

transformed_patients2 = transformed_patients.transform(
    os.path.join(target_directory, "transformed_patients2"),
    functools.partial(transform_event_codes_using_ontology, concept_map = concept_map),
)

Now that we've created our `Ontology`, we can finally convert our `PatientCollection` to a `PatientDatabase` using the `PatientCollection.to_patient_database()` method.

In [13]:
# Create patient database
transformed_patients2.to_patient_database(
    os.path.join(target_directory, 'patient_database'),
    path_to_ontology, # Pass the path to the ontology directory we previously created
    num_threads=1,
).close()

Could not find the following concept_id in any of the concept tables2Done with main 2023-05-10T16:11:53.452366748+00:00
Done with meta 2023-05-10T16:11:53.452472156+00:00
