## Challenge Two - Creating and Logging Entities Accessed

Install the IRIS DB-API driver needed to get access IRIS for Health

In [1]:
! uv add intersystems_irispython-3.2.0-py3-none-any.whl

[2K[37m⠙[0m [2mResolving dependencies...                                                     [0m[2mResolved [1m37 packages[0m [2min 11ms[0m[0m
[2mAudited [1m35 packages[0m [2min 0.81ms[0m[0m


#### Add pandas, numpy, and matplotlib to Python Packages for this Lesson

In [2]:
! uv add pandas numpy

[2mResolved [1m37 packages[0m [2min 0.67ms[0m[0m
[2mAudited [1m35 packages[0m [2min 0.02ms[0m[0m


In [3]:
%pip install matplotlib

Note: you may need to restart the kernel to use updated packages.


#### Import libraries

In [4]:
import os,sys
import warnings
warnings.simplefilter(action='ignore')
import iris
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime

#### Build connection to our demo namespace

In [5]:
connection_string = "127.0.0.1:1972/DEMO"
username = "_system"
password = "ISCDEMO"
connection = iris.connect(connection_string, username, password)

#### Check if a table exists before creating it

In [6]:
def table_exists(cur, table_name: str) -> bool:
    """
    Return True if a table with the given name exists in the namespace (any schema).
    IRIS stores unquoted identifiers in uppercase in the catalog.
    """
    cur.execute("""
        SELECT 1
        FROM INFORMATION_SCHEMA.TABLES
        WHERE UPPER(TABLE_NAME) = ?
        """, (table_name.upper(),))
    return cur.fetchone() is not None

#### Create Audit Table for Entities that have been touched or examined

In [None]:
## Schema: AUDIT
def create_entity_schema():
  ## use the connection we created above 
  try:
        cur = connection.cursor()
        tablename = "Audit.fhir_entities"
        if table_exists(cur, tablename):
            print(f"Table {tablename} already exists.")
            return
        else:
            print(f"Table {tablename} does not exist. Creating it now...")
            table_schema = f"""CREATE TABLE {tablename} (
            entity_id BIGINT AUTO_INCREMENT PRIMARY KEY,
            log_id BIGINT NOT NULL,
            event_ts TIMESTAMP NOT NULL, 
            patient_id VARCHAR(128) NOT NULL,
            source_resource_type VARCHAR(64)  NOT NULL,
            source_resource_id   VARCHAR(128),
            relation             VARCHAR(64)  NOT NULL, 
            ref_path             VARCHAR(256) NOT NULL,              
            is_direct_target     SMALLINT     NOT NULL DEFAULT 0,     
            operation            VARCHAR(32),
            request_id           VARCHAR(128), 
            CONSTRAINT fk_fhir_entities_logs
            FOREIGN KEY (log_id) REFERENCES AUDIT.fhir_logs(log_id)
            ON DELETE CASCADE
            )
            """
            cur.execute(table_schema)
            connection.commit()
            print(f"Table {tablename} created successfully.")
            return
  except Exception as e:
        print("ERROR: Could not connect to InterSystems IRIS. or table creation failed.")
        print(e)
        sys.exit(1)


In [None]:
create_entity_schema()

#### Indexes (practical set for common audits)

In [None]:
## Create Indexes (practical set for common audits)
def create_indices():
  ## use the connection we created above 
  try:
        cur = connection.cursor()
        tablename = "Audit.fhir_entities"
        print(f"Creating indices for Table {tablename} ..")
        indices = f"""
            CREATE INDEX ix_fhir_entities_patient_ts
            ON AUDIT.fhir_entities (patient_id, event_ts DESC)
            """
        cur.execute(indices)
        connection.commit()
        indices = f""" CREATE BITMAP INDEX ix_fhir_entities_is_direct
            ON AUDIT.fhir_entities (is_direct_target)
            """
        cur.execute(indices)
        connection.commit()
        indices = f""" CREATE INDEX ix_fhir_entities_patient_relation
            ON AUDIT.fhir_entities (patient_id, relation)
            """
        cur.execute(indices)  
        connection.commit()  
        indices = f""" CREATE INDEX ix_fhir_entities_log
            ON AUDIT.fhir_entities (log_id)
            """
        cur.execute(indices)
        connection.commit()  
        print(f"Indices for Table {tablename} created successfully.")
        return
  except Exception as e:
        print("ERROR: Could not connect to InterSystems IRIS. or index creation failed.")
        print(e)
        sys.exit(1)




In [None]:
create_indices()

#### Create Convenience view (makes “who accessed Patient X” queries easy)

In [None]:
def create_convenience_view():
  ## use the connection we created above 
  try:
    cur = connection.cursor()
    table_view = f"""CREATE VIEW AUDIT.v_patient_access AS
    SELECT
    fe.event_ts,
    fe.patient_id,
    fe.relation,
    fe.is_direct_target,
    fe.source_resource_type,
    fe.source_resource_id,
    fe.ref_path,
    fl.method,
    fl.status_code,
    fl.operation AS request_operation,
    fl.consumer_username,
    fl.credential_type,
    fl.auth_subject,
    fl.scopes,
    fl.service_name,
    fl.route_path,
    fl.request_path,
    fl.request_id
    FROM AUDIT.fhir_entities fe
    JOIN AUDIT.fhir_logs
    fl ON fl.log_id = fe.log_id
    """
    cur.execute(table_view)
    connection.commit()
    print(f"View AUDIT.v_patient_access created successfully.")
    return
  except Exception as e:
        print("ERROR: Could not connect to InterSystems IRIS. or view creation failed.")
        print(e)
        sys.exit(1)

In [None]:
create_convenience_view()

### Setup new receiver program with enhanced logging and entity functionality

#### First set up the environment

In [42]:
! uv add fastapi

2134.14s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


[2mResolved [1m37 packages[0m [2min 7ms[0m[0m
[2mAudited [1m35 packages[0m [2min 0.18ms[0m[0m


In [44]:
import os, sys

# 👇 CHANGE THIS to the folder that actually contains konglog_ingest.py
PROJECT_DIR = os.path.expanduser("/Users/pjamieso/VanderbiltLocalFHIRTraining/Lesson36")   # e.g. /Users/you/path/to/Lesson36

if PROJECT_DIR not in sys.path:
    sys.path.insert(0, PROJECT_DIR)


In [36]:
import sys, sysconfig
print("Python:", sys.executable)
print("site-packages:", sysconfig.get_paths()["purelib"])

Python: /Library/Frameworks/Python.framework/Versions/3.11/bin/python3
site-packages: /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages


In [45]:
%pip install -q fastapi "uvicorn[standard]" intersystems-irispython python-dotenv requests


2150.28s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


Note: you may need to restart the kernel to use updated packages.


### The enhanced functionality of the receiver is in konglog_injest2.py

In [46]:
import konglog_ingest2
from konglog_ingest2 import app
print("Loaded from:", konglog_ingest2.__file__)

Loaded from: /Users/pjamieso/VanderbiltLocalFHIRTraining/Lesson36/konglog_ingest2.py


In [49]:
import os

# -- set your lab env --
os.environ["IRIS_CONNECTION_STRING"] = "127.0.0.1:1972/DEMO"
os.environ["IRIS_USER"] = "_SYSTEM"
os.environ["IRIS_PASSWORD"] = "ISCDEMO"
os.environ["IRIS_LOG_TABLE"] = "AUDIT.fhir_logs"
os.environ["LOG_BEARER_TOKEN"] = "fhirdemotoken"

# optional (nice for debugging)
os.environ["INGEST_DEBUG"] = "true"
os.environ["TS_AS_TEXT"] = "true"
os.environ["FHIR_BASE_PREFIXES"] = "/fhir,/r4,/fhir/r4"

# Import AFTER env is set
from konglog_ingest2 import app

In [None]:
import threading, time, requests, uvicorn
server = uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=8082, log_level="info"))
t = threading.Thread(target=server.run, daemon=True)
t.start()

# quick health probe
for _ in range(20):
    try:
        r = requests.get("http://127.0.0.1:8082/healthz", timeout=0.5)
        print("Health:", r.status_code, r.text)
        break
    except Exception:
        time.sleep(0.25)

INFO:     Started server process [75620]
2025-09-29 10:44:06,341 DEBUG Starting new HTTP connection (1): 127.0.0.1:8082
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8082 (Press CTRL+C to quit)
2025-09-29 10:44:06,600 DEBUG Starting new HTTP connection (1): 127.0.0.1:8082


INFO:     127.0.0.1:62030 - "GET /healthz HTTP/1.1" 200 OK


2025-09-29 10:44:06,626 DEBUG http://127.0.0.1:8082 "GET /healthz HTTP/1.1" 200 11


Health: 200 {"ok":true}


2025-09-29 10:44:17,998 INFO Kong POST received: entries=1


INFO:     127.0.0.1:62035 - "POST /kong-log HTTP/1.1" 200 OK


2025-09-29 10:44:23,414 INFO Kong POST received: entries=1


INFO:     127.0.0.1:62039 - "POST /kong-log HTTP/1.1" 200 OK


2025-09-29 10:44:28,311 INFO Kong POST received: entries=1


INFO:     127.0.0.1:62039 - "POST /kong-log HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [75620]


### Here is how to stop the server

In [89]:
server.should_exit = True
t.join(timeout=5)
print("Ingest server stopped.")

Ingest server stopped.


### Create function to send request and print response

In [51]:
import requests
import json


def send_request(query, apikey):
# --- Request setup ---
    baseurl = "http://127.0.0.1:8000/fhir"  # via Kong proxy
    headers = {
    "Accept": "*/*",
    "content-type": "application/fhir+json",
    "Accept-Encoding": "gzip, deflate, br",
    "apikey": apikey
    }
    url = f"{baseurl}/{query}"  # via Kong proxy
    # --- Send request ---
    try:
        resp = requests.get(url, headers=headers, timeout=15)
        print("Status:", resp.status_code)
        # show a couple of useful headers
        for k in ["Content-Type", "X-Kong-Request-Id", "X-Kong-Response-Latency", "Server"]:
            if k in resp.headers:
                print(f"{k}: {resp.headers[k]}")
    # Pretty-print JSON if possible, otherwise show text
        try:
            data = resp.json()
            print("\nJSON body:")
            print(json.dumps(data, indent=2))
        except ValueError:
            print("\nText body:")
            print(resp.text[:2000])  # avoid dumping extremely large responses
    except requests.RequestException as e:
        print("Request error:", e)

#### Let's test it with some very simply queries

In [87]:
key = "random-demo-key"
query = "Patient/4"
send_request(query, key)

2025-09-29 14:44:36,103 DEBUG Starting new HTTP connection (1): 127.0.0.1:8000
2025-09-29 14:44:36,172 DEBUG http://127.0.0.1:8000 "GET /fhir/Patient/4 HTTP/1.1" 200 1113


Status: 200
Content-Type: application/fhir+json; charset=UTF-8
X-Kong-Request-Id: 925e3675405d8a2b7cdda7200ca8433c
Server: Apache

JSON body:
{
  "resourceType": "Patient",
  "id": "4",
  "text": {
    "status": "generated",
    "div": "<div xmlns=\"http://www.w3.org/1999/xhtml\">Generated by <a href=\"https://github.com/synthetichealth/synthea\">Synthea</a>.Version identifier: synthea-java .   Person seed: 5887668216735179003  Population seed: 1597764932523</div>"
  },
  "extension": [
    {
      "url": "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName",
      "valueString": "Terrilyn Reynolds"
    },
    {
      "url": "http://hl7.org/fhir/StructureDefinition/patient-birthPlace",
      "valueAddress": {
        "city": "Lynn",
        "state": "Massachusetts",
        "country": "US"
      }
    },
    {
      "url": "http://synthetichealth.github.io/synthea/disability-adjusted-life-years",
      "valueDecimal": 2.252271652349974
    },
    {
      "url": "http://sy

### Go to SQL Explorer in the Management Portal and take a look at the entities table

#### Let's see what happens when we query for an observation

In [88]:
key = "random-demo-key"
query = "Observation/1"
send_request(query, key)

2025-09-29 14:55:16,433 DEBUG Starting new HTTP connection (1): 127.0.0.1:8000
2025-09-29 14:55:16,504 DEBUG http://127.0.0.1:8000 "GET /fhir/Observation/1 HTTP/1.1" 200 338


Status: 200
Content-Type: application/fhir+json; charset=UTF-8
X-Kong-Request-Id: 1745341d1cffb92f29fbeb3732e44261
Server: Apache

JSON body:
{
  "resourceType": "Observation",
  "id": "1",
  "status": "final",
  "code": {
    "coding": [
      {
        "system": "http://loinc.org",
        "code": "718-7"
      }
    ],
    "text": "Hemoglobin[g/dL]"
  },
  "subject": {
    "reference": "Patient/LP0001"
  },
  "effectiveDateTime": "2025-08-30T17:28:47Z",
  "valueQuantity": {
    "value": 15.4,
    "unit": "g/dL"
  },
  "meta": {
    "lastUpdated": "2025-09-01T17:49:47Z",
    "versionId": "2"
  }
}


#### Check out SQL Explorer in the Managment Portal to see how it recorded the subject of the observation

### Let's try a simple search query to see how that is handled 

In [83]:
key = "random-demo-key"
query = "Patient?_count=5"
send_request(query, key)

2025-09-29 14:30:25,765 DEBUG Starting new HTTP connection (1): 127.0.0.1:8000
2025-09-29 14:30:25,831 DEBUG http://127.0.0.1:8000 "GET /fhir/Patient?_count=5 HTTP/1.1" 200 2269


Status: 200
Content-Type: application/fhir+json; charset=UTF-8
X-Kong-Request-Id: 762fdd4cfc62f9548dab7e271e292225
Server: Apache

JSON body:
{
  "resourceType": "Bundle",
  "id": "5e061bf3-9d62-11f0-a364-b2198069b018",
  "type": "searchset",
  "timestamp": "2025-09-29T18:30:25Z",
  "total": 122,
  "link": [
    {
      "relation": "first",
      "url": "http://127.0.0.1:8000/fhir/Patient?page=1&queryId=5e05a2aa-9d62-11f0-a364-b2198069b018"
    },
    {
      "relation": "self",
      "url": "http://127.0.0.1:8000/fhir/Patient?_count=5"
    },
    {
      "relation": "next",
      "url": "http://127.0.0.1:8000/fhir/Patient?page=2&queryId=5e05a2aa-9d62-11f0-a364-b2198069b018"
    },
    {
      "relation": "last",
      "url": "http://127.0.0.1:8000/fhir/Patient?page=25&queryId=5e05a2aa-9d62-11f0-a364-b2198069b018"
    }
  ],
  "entry": [
    {
      "fullUrl": "http://127.0.0.1:8000/fhir/Patient/4",
      "resource": {
        "resourceType": "Patient",
        "id": "4",
        "text

### Check the entities table again, did it record all the patients that were accessed in the search query?

#### Let's try another query with a a different API key (demo-user-2)

In [None]:
key = "random-demo-key-2"
query = "MedicationRequest?_count=5"
send_request(query, key)

### Was it able to record the patient that was the subject of the Medication Request?

#### Let's use Pandas and our convenience function to join the log and entities tables

In [None]:
sql = """SELECT 
event_ts, patient_id, relation, is_direct_target, source_resource_type, source_resource_id, ref_path, method, status_code, request_operation, consumer_username, credential_type, auth_subject, scopes, service_name, route_path, request_path, request_id
FROM Audit.v_patient_access"""
auditLogFrame = pd.read_sql(sql, connection)

#### Examine first few rows

In [None]:
auditLogFrame.head(10)

### Lets plot Bundles vs Patient

In [None]:
import matplotlib.pyplot as plt

# Normalize and count just Bundle vs Patient
src = auditLogFrame['source_resource_type'].astype('string').str.strip().str.lower()
counts = (src[src.isin(['bundle', 'patient'])]
          .map({'bundle': 'Bundle', 'patient': 'Patient'})
          .value_counts()
          .reindex(['Bundle', 'Patient'], fill_value=0))

total = int(counts.sum())
if total == 0:
    print("No rows with source_resource_type of Bundle or Patient.")
else:
    fig, ax = plt.subplots()
    ax.pie(
        counts.values,
        labels=counts.index,
        autopct=lambda p: f"{p:.1f}% ({int(round(p/100*total))})",
        startangle=90
    )
    ax.axis('equal')  # keeps it circular
    ax.set_title("Bundle vs Patient in source_resource_type")
    plt.show()