In [0]:
import requests 
import tempfile
import shutil
import json
from pyspark.sql.types import StructType
from pyspark.sql.functions import lit


#Steps to complete
1. Download all neccessary Files
- Complete
2. Analyze and parse content into respective domains (Medications and Problems)
- Complete
3. Combine Clinical Data with other supplied data 
- Complete
4. Store the output (parquet, delta, csv) 
- Complete
5. Describe Pipeline
- In Email
6. Any additional Details
- In Email
7. Describe this feed into a common Data Model Format for consumption into Data Warehouse / Lakehouse with focus on HL7/Fhir
- In Email


#**Drop ref files into dbfs or volumes**
/FileStore/tables/MDE/{fileName}
 Need an unrestricted cluster to interact with DBFS

In [0]:
#Filenames
fileNames = ["ccda_pre_signed_urls2.csv", "data_engineer_exam_claims_final.csv", "data_engineer_exam_rx_final.csv", "icd.csv"]

In [0]:
%run ./MDE_INIT_

In [0]:
for file in fileNames:
    ingestRefData(file) #into hive metastore as basically as temp tables

In [0]:
%sql
--Review the output in delta format as a temp view. can create static tables from delta temp views if needed
--select * from ccda_pre_signed_urls2
--select * from data_engineer_exam_claims_final;
--select * from data_engineer_exam_rx_final

## For pre-signed urls, you can ingest directly from S3 by creating a unity catalog location, but I'll do it with requests here.

In [0]:
urlList = spark.sql("SELECT pre_signed_urls FROM ccda_pre_signed_urls2") \
                .select("pre_signed_urls") \
                .rdd \
                .map(lambda row: row.pre_signed_urls) \
                .collect()

# Parse the resultant XML into a dataframe and pull out the Medications and Problems, and iterate into One Big Table (OBT) or dataframe

In [0]:
#use ingestion function from INIT
IngestClinicalFiles(urlList)

## Create a Medications and Problems table parsed out with as much as needed
### using dbx sql as en example of using sql to unpack advanced data structures

In [0]:
%sql
CREATE OR REPLACE TABLE mde_combined_problems AS
SELECT 
  author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title[0] AS Problems, -- 5 is medications, usually 0 is problems
  component.structuredBody.component.section.text[0] as table_of_Problems,
  component.structuredBody.component.section.entry[0] as ProblemArray
FROM  mde_combined_xmls
WHERE UPPER(component.structuredBody.component.section.title[0]) = 'PROBLEMS'

UNION

SELECT 
author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title[1] AS Problems, -- 5 is medications, usually 0 is problems
  component.structuredBody.component.section.text[1] as table_of_Problems,
  component.structuredBody.component.section.entry[1] as ProblemArray
FROM  mde_combined_xmls
WHERE UPPER(component.structuredBody.component.section.title[1]) = 'PROBLEMS'

UNION

SELECT 
author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
  HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title[2] AS Problems, -- 5 is medications, usually 0 is problems
  component.structuredBody.component.section.text[2] as table_of_Problems,
  component.structuredBody.component.section.entry[2] as ProblemArray
FROM  mde_combined_xmls
WHERE UPPER(component.structuredBody.component.section.title[2]) = 'PROBLEMS'

UNION

SELECT 
author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title[3] AS Problems, -- 5 is medications, usually 0 is problems
  component.structuredBody.component.section.text[3] as table_of_Problems,
  component.structuredBody.component.section.entry[3] as ProblemArray
FROM  mde_combined_xmls
WHERE UPPER(component.structuredBody.component.section.title[3]) = 'PROBLEMS'
UNION

SELECT 
author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title[4] AS Problems, -- 5 is medications, usually 0 is problems
  component.structuredBody.component.section.text[4] as table_of_Problems,
  component.structuredBody.component.section.entry[4] as ProblemArray
FROM  mde_combined_xmls
WHERE UPPER(component.structuredBody.component.section.title[4]) = 'PROBLEMS'

UNION

SELECT 
author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title[5] AS Problems, -- 5 is medications, usually 0 is problems
  component.structuredBody.component.section.text[5] as table_of_Problems,
  component.structuredBody.component.section.entry[5] as ProblemArray
FROM  mde_combined_xmls
WHERE UPPER(component.structuredBody.component.section.title[5]) = 'PROBLEMS'

In [0]:
%sql
Select * from  mde_combined_problems

In [0]:
%sql
CREATE
OR REPLACE TABLE  mde_combined_medications AS
SELECT
  author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title [0] AS Medications,-- 5 is medications
  component.structuredBody.component.section.text [0] as table_of_Medications,
  component.structuredBody.component.section.entry [0] as Medications_Array
FROM
   mde_combined_xmls
WHERE
  UPPER(component.structuredBody.component.section.title [0]) = 'MEDICATIONS'
UNION
SELECT
  author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title [1] AS Medications,-- 5 is medications
  component.structuredBody.component.section.text [1] as table_of_Medications,
  component.structuredBody.component.section.entry [1]as Medications_Array
FROM
   mde_combined_xmls
WHERE
  UPPER(component.structuredBody.component.section.title [1]) = 'MEDICATIONS'
UNION
SELECT
  author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title [2] AS Medications,-- 5 is medications
  component.structuredBody.component.section.text [2] as table_of_Medications,
  component.structuredBody.component.section.entry [2] as Medications_Array
FROM
   mde_combined_xmls
WHERE
  UPPER(component.structuredBody.component.section.title [2]) = 'MEDICATIONS'
UNION
SELECT
  author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title [3] AS Medications,-- 5 is medications
  component.structuredBody.component.section.text [3] as table_of_Medications,
  component.structuredBody.component.section.entry [3] as Medications_Array
FROM
   mde_combined_xmls
WHERE
  UPPER(component.structuredBody.component.section.title [3]) = 'MEDICATIONS'
UNION
SELECT
  author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title [4] AS Medications,-- 5 is medications
  component.structuredBody.component.section.text [4] as table_of_Medications,
  component.structuredBody.component.section.entry [4] as Medications_Array
FROM
   mde_combined_xmls
WHERE
  UPPER(component.structuredBody.component.section.title [4]) = 'MEDICATIONS'
UNION
SELECT
  author.assignedAuthor.assignedAuthoringDevice.softwareName AS softwareName,
  recordTarget.patientRole.patient.name.given AS fname,
  recordTarget.patientRole.patient.name.family AS lname,
    HASH(recordTarget.patientRole.patient.name.given, recordTarget.patientRole.patient.name.family) as MEMBERID,
  component.structuredBody.component.section.title [5] AS Medications,-- 5 is medications
  component.structuredBody.component.section.text [5] as table_of_Medications,
  component.structuredBody.component.section.entry [5] as Medications_Array
FROM
   mde_combined_xmls
WHERE
  UPPER(component.structuredBody.component.section.title [5]) = 'MEDICATIONS'

In [0]:
%sql
Select * from  mde_combined_medications

# Combined OBT and other ref tables for analysis

In [0]:
%sql

select cl.MemberID,rx.MemberID, cl.*, rx.*
from data_engineer_exam_claims_final cl 
left join data_engineer_exam_rx_final rx on cl.MemberID = rx.MemberID


In [0]:
%sql
--these claims are one for each filled prescription, so it already is the "medication list", but for each member like i did for the problems/icd10 diags

select
  MemberID,
  NDC,
  FromDate,
  PaidDate
from
  data_engineer_exam_rx_final

In [0]:
%sql
with Claims_Problems as (
  select
    MemberID,
    DiagValue,
    ICDDiag,
    FromDate,
    ToDate
  from
    data_engineer_exam_claims_final
      unpivot(
        DiagValue for ICDDiag in (
          ICDDiag1,
          ICDDiag2,
          ICDDiag3,
          ICDDiag4,
          ICDDiag5,
          ICDDiag6,
          ICDDiag7,
          ICDDiag8,
          ICDDiag9,
          ICDDiag10
        )
      )
)
select
  a.*,
  `icd`.`Column5` as ICDDescription
from
  Claims_Problems a left join icd on trim(a.DiagValue) = trim(`icd`.`Column2`)

# Store the result of the ingestion

In [0]:
##Last step


# Write _INIT_ using the above
- Complete

# Write tests for functions used in the processing
- Complete
# Write validation and QA steps into pipeline
- Unknown what validation and QA is necessary, need to know more about the scope and intention of the pipeline

# Document and describe pipeline requirements for orchestration
- Not for this demonstration

# Set up orchestration
- Not for this demonstration

# Set notifications and logging
- Not for this demonstration

# Notes
## I'd prefer to pull the fhir data in json, its much easier to parse for me than xml
**Patient	https://fhir.athena.io/StructureDefinition/ah-patient**
Practitioner	https://fhir.athena.io/StructureDefinition/ah-practitioner
Procedure	https://fhir.athena.io/StructureDefinition/ah-procedure
AllergyIntolerance	https://fhir.athena.io/StructureDefinition/ah-allergyintolerance
Immunization	https://fhir.athena.io/StructureDefinition/ah-immunization
Condition	https://fhir.athena.io/StructureDefinition/ah-condition
Provenance	https://fhir.athena.io/StructureDefinition/ah-provenance
**Medication	https://fhir.athena.io/StructureDefinition/ah-medication**
Observation	https://fhir.athena.io/StructureDefinition/ah-observation
MedicationRequest	https://fhir.athena.io/StructureDefinition/ah-medicationrequest
Encounter	https://fhir.athena.io/StructureDefinition/ah-encounter
CarePlan	https://fhir.athena.io/StructureDefinition/ah-careplan
AllergyIntolerance	https://fhir.athena.io/StructureDefinition/ah-allergyintolerance
DiagnosticReport	https://fhir.athena.io/StructureDefinition/ah-diagnosticreport
Goal	https://fhir.athena.io/StructureDefinition/ah-goal