# Import packages

In [1]:
import psutil
import math
import duckdb
import boto3

# Set up connection to S3
duckdb.execute("""
INSTALL httpfs;
LOAD httpfs;

CREATE SECRET(
    TYPE S3,
    PROVIDER CREDENTIAL_CHAIN
);
""")

# Set DuckDB usage limits based on available memory and cpu (in ec2 instance)
num_threads = psutil.cpu_count(logical=True)
memory_gb = math.floor(psutil.virtual_memory().total/ (1024**3))

duckdb.execute(f"""
    SET temp_directory = 'temp_dir.tmp';
    SET preserve_insertion_order = false;
    SET memory_limit = '{memory_gb}GB';
    SET threads TO {num_threads};
""")


<duckdb.duckdb.DuckDBPyConnection at 0x7675b9377db0>

# Register Tables

In [2]:
def register_tables(data_asset = 'deid_cdw'):
    s3_client = boto3.client('s3')
    response = s3_client.list_objects_v2(
        Bucket = 'ic-data-41-4-r-us-west-2.sec.ucsf.edu',
        Prefix = f'data/parquet/{data_asset}/' ,
        MaxKeys = 9999,
        Delimiter = '/'
    )
    table_list = [p['Prefix'].split('/')[-2] for p in response.get('CommonPrefixes', [])]

    data_asset_path = f's3://ic-data-41-4-r-us-west-2.sec.ucsf.edu/data/parquet/{data_asset}'
    for table in table_list:
        table_df = duckdb.read_parquet(f'{data_asset_path}/{table}/*.parquet')
        duckdb.register(table, table_df)

    return table_list

In [3]:
register_tables('deid_omop_ucsf_sfdph')

['care_site',
 'care_site_extension',
 'cdm_source',
 'concept',
 'concept_ancestor',
 'concept_class',
 'concept_relationship',
 'concept_synonym',
 'condition_era',
 'condition_occurrence',
 'condition_occurrence_extension',
 'death',
 'death_extension',
 'device_exposure',
 'domain',
 'drug_era',
 'drug_exposure',
 'drug_exposure_extension',
 'drug_strength',
 'location',
 'location_extension',
 'measurement',
 'measurement_extension',
 'observation',
 'observation_extension',
 'observation_period',
 'person',
 'person_extension',
 'procedure_occurrence',
 'procedure_occurrence_extension',
 'provider',
 'provider_extension',
 'relationship',
 'version',
 'visit_detail',
 'visit_occurrence',
 'visit_occurrence_extension',
 'vocabulary']

# Atlas SQL (Grab patients in cohort)

In [5]:
duckdb.sql("""
CREATE TEMP TABLE Codesets  (codeset_id int NOT NULL,
  concept_id bigint NOT NULL
)
;
INSERT INTO Codesets (codeset_id, concept_id)
SELECT 0 as codeset_id, c.concept_id FROM (select distinct I.concept_id FROM
( 
  select concept_id from concept where concept_id in (134603)
UNION  select c.concept_id
  from concept c
  join concept_ANCESTOR ca on c.concept_id = ca.descendant_concept_id
  and ca.ancestor_concept_id in (134603)
  and c.invalid_reason is null
) I
) C UNION ALL 
SELECT 1 as codeset_id, c.concept_id FROM (select distinct I.concept_id FROM
( 
  select concept_id from concept where concept_id in (1304107)
UNION  select c.concept_id
  from concept c
  join concept_ANCESTOR ca on c.concept_id = ca.descendant_concept_id
  and ca.ancestor_concept_id in (1304107)
  and c.invalid_reason is null
) I
) C UNION ALL 
SELECT 2 as codeset_id, c.concept_id FROM (select distinct I.concept_id FROM
( 
  select concept_id from concept where concept_id in (4187635,4211481)
) I
) C UNION ALL 
SELECT 3 as codeset_id, c.concept_id FROM (select distinct I.concept_id FROM
( 
  select concept_id from concept where concept_id in (42900401,1358436,1525007,1394023,43013182,702133,43009098)
) I
) C UNION ALL 
SELECT 4 as codeset_id, c.concept_id FROM (select distinct I.concept_id FROM
( 
  select concept_id from concept where concept_id in (4028623)
UNION  select c.concept_id
  from concept c
  join concept_ANCESTOR ca on c.concept_id = ca.descendant_concept_id
  and ca.ancestor_concept_id in (4028623)
  and c.invalid_reason is null
) I
) C;
ANALYZE Codesets;
CREATE TEMP TABLE qualified_events
AS
SELECT
event_id, person_id, start_date, end_date, op_start_date, op_end_date, visit_occurrence_id
FROM
(
  select pe.event_id, pe.person_id, pe.start_date, pe.end_date, pe.op_start_date, pe.op_end_date, row_number() over (partition by pe.person_id order by pe.start_date ASC) as ordinal, cast(pe.visit_occurrence_id as bigint) as visit_occurrence_id
  FROM (-- Begin Primary Events
select P.ordinal as event_id, P.person_id, P.start_date, P.end_date, op_start_date, op_end_date, cast(P.visit_occurrence_id as bigint) as visit_occurrence_id
FROM
(
  select E.person_id, E.start_date, E.end_date,
         row_number() OVER (PARTITION BY E.person_id ORDER BY E.sort_date ASC, E.event_id) ordinal,
         OP.observation_period_start_date as op_start_date, OP.observation_period_end_date as op_end_date, cast(E.visit_occurrence_id as bigint) as visit_occurrence_id
  FROM 
  (
  -- Begin Condition Era Criteria
select C.person_id, C.condition_era_id as event_id, C.start_date, C.end_date,
  CAST(NULL as bigint) as visit_occurrence_id, C.start_date as sort_date
from 
(
  select ce.person_id,ce.condition_era_id,ce.condition_concept_id,ce.condition_occurrence_count,ce.condition_era_start_date as start_date, ce.condition_era_end_date as end_date 
  FROM condition_era ce
where ce.condition_concept_id in (SELECT concept_id from  Codesets where codeset_id = 0)
) C
JOIN PERSON P on C.person_id = P.person_id
WHERE EXTRACT(YEAR FROM C.start_date) - P.year_of_birth >= 18
-- End Condition Era Criteria
  ) E
	JOIN observation_period OP on E.person_id = OP.person_id and E.start_date >=  OP.observation_period_start_date and E.start_date <= op.observation_period_end_date
  WHERE (OP.OBSERVATION_PERIOD_START_DATE + 0*INTERVAL'1 day') <= E.START_DATE AND (E.START_DATE + 3650*INTERVAL'1 day') <= OP.OBSERVATION_PERIOD_END_DATE
) P
WHERE P.ordinal = 1
-- End Primary Events
) pe
) QE
;
ANALYZE qualified_events
;
--- Inclusion Rule Inserts
CREATE TEMP TABLE Inclusion_0
AS
SELECT
0 as inclusion_rule_id, person_id, event_id
FROM
(
  select pe.person_id, pe.event_id
  FROM qualified_events pe
JOIN (
-- Begin Criteria Group
select 0 as index_id, person_id, event_id
FROM
(
  select E.person_id, E.event_id 
  FROM qualified_events E
  INNER JOIN
  (
    -- Begin Correlated Criteria
select 0 as index_id, p.person_id, p.event_id
from qualified_events p
LEFT JOIN (
SELECT p.person_id, p.event_id 
FROM qualified_events P
JOIN (
  -- Begin Condition Occurrence Criteria
SELECT C.person_id, C.condition_occurrence_id as event_id, C.start_date, C.end_date,
  C.visit_occurrence_id, C.start_date as sort_date
FROM 
(
  SELECT co.person_id,co.condition_occurrence_id,co.condition_concept_id,co.visit_occurrence_id,co.condition_start_date as start_date, COALESCE(co.condition_end_date, (co.condition_start_date + 1*INTERVAL'1 day')) as end_date 
  FROM CONDITION_OCCURRENCE co
  JOIN Codesets cs on (co.condition_concept_id = cs.concept_id and cs.codeset_id = 2)
) C
-- End Condition Occurrence Criteria
) A on A.person_id = P.person_id  AND A.START_DATE >= P.OP_START_DATE AND A.START_DATE <= P.OP_END_DATE AND A.START_DATE >= P.OP_START_DATE AND A.START_DATE <= (P.START_DATE + 0*INTERVAL'1 day') ) cc on p.person_id = cc.person_id and p.event_id = cc.event_id
GROUP BY p.person_id, p.event_id
HAVING COUNT(cc.event_id) = 0
-- End Correlated Criteria
  ) CQ on E.person_id = CQ.person_id and E.event_id = CQ.event_id
  GROUP BY E.person_id, E.event_id
  HAVING COUNT(index_id) = 1
) G
-- End Criteria Group
) AC on AC.person_id = pe.person_id AND AC.event_id = pe.event_id
) Results
;
ANALYZE Inclusion_0
;
CREATE TEMP TABLE Inclusion_1
AS
SELECT
1 as inclusion_rule_id, person_id, event_id
FROM
(
  select pe.person_id, pe.event_id
  FROM qualified_events pe
JOIN (
-- Begin Criteria Group
select 0 as index_id, person_id, event_id
FROM
(
  select E.person_id, E.event_id 
  FROM qualified_events E
  INNER JOIN
  (
    -- Begin Correlated Criteria
select 0 as index_id, p.person_id, p.event_id
from qualified_events p
LEFT JOIN (
SELECT p.person_id, p.event_id 
FROM qualified_events P
JOIN (
  -- Begin Drug Exposure Criteria
select C.person_id, C.drug_exposure_id as event_id, C.start_date, C.end_date,
  C.visit_occurrence_id,C.start_date as sort_date
from 
(
  select de.person_id,de.drug_exposure_id,de.drug_concept_id,de.visit_occurrence_id,days_supply,quantity,refills,de.drug_exposure_start_date as start_date, COALESCE(de.drug_exposure_end_date, (de.drug_exposure_start_date + de.days_supply*INTERVAL'1 day'), (de.drug_exposure_start_date + 1*INTERVAL'1 day')) as end_date 
  FROM DRUG_EXPOSURE de
JOIN Codesets cs on (de.drug_concept_id = cs.concept_id and cs.codeset_id = 3)
) C
-- End Drug Exposure Criteria
) A on A.person_id = P.person_id  AND A.START_DATE >= P.OP_START_DATE AND A.START_DATE <= P.OP_END_DATE AND A.START_DATE >= P.OP_START_DATE AND A.START_DATE <= P.OP_END_DATE ) cc on p.person_id = cc.person_id and p.event_id = cc.event_id
GROUP BY p.person_id, p.event_id
HAVING COUNT(cc.event_id) = 0
-- End Correlated Criteria
  ) CQ on E.person_id = CQ.person_id and E.event_id = CQ.event_id
  GROUP BY E.person_id, E.event_id
  HAVING COUNT(index_id) = 1
) G
-- End Criteria Group
) AC on AC.person_id = pe.person_id AND AC.event_id = pe.event_id
) Results
;
ANALYZE Inclusion_1
;
CREATE TEMP TABLE Inclusion_2
AS
SELECT
2 as inclusion_rule_id, person_id, event_id
FROM
(
  select pe.person_id, pe.event_id
  FROM qualified_events pe
JOIN (
-- Begin Criteria Group
select 0 as index_id, person_id, event_id
FROM
(
  select E.person_id, E.event_id 
  FROM qualified_events E
  INNER JOIN
  (
    -- Begin Correlated Criteria
select 0 as index_id, p.person_id, p.event_id
from qualified_events p
LEFT JOIN (
SELECT p.person_id, p.event_id 
FROM qualified_events P
JOIN (
  -- Begin Procedure Occurrence Criteria
select C.person_id, C.procedure_occurrence_id as event_id, C.start_date, C.end_date,
       C.visit_occurrence_id, C.start_date as sort_date
from 
(
  select po.person_id,po.procedure_occurrence_id,po.procedure_concept_id,po.visit_occurrence_id,po.quantity,po.procedure_date as start_date, (po.procedure_date + 1*INTERVAL'1 day') as end_date 
  FROM PROCEDURE_OCCURRENCE po
JOIN Codesets cs on (po.procedure_concept_id = cs.concept_id and cs.codeset_id = 4)
) C
-- End Procedure Occurrence Criteria
) A on A.person_id = P.person_id  AND A.START_DATE >= P.OP_START_DATE AND A.START_DATE <= P.OP_END_DATE AND A.START_DATE >= P.OP_START_DATE AND A.START_DATE <= (P.START_DATE + 0*INTERVAL'1 day') ) cc on p.person_id = cc.person_id and p.event_id = cc.event_id
GROUP BY p.person_id, p.event_id
HAVING COUNT(cc.event_id) = 0
-- End Correlated Criteria
  ) CQ on E.person_id = CQ.person_id and E.event_id = CQ.event_id
  GROUP BY E.person_id, E.event_id
  HAVING COUNT(index_id) = 1
) G
-- End Criteria Group
) AC on AC.person_id = pe.person_id AND AC.event_id = pe.event_id
) Results
;
ANALYZE Inclusion_2
;
CREATE TEMP TABLE inclusion_events
AS
SELECT
inclusion_rule_id, person_id, event_id
FROM
(select inclusion_rule_id, person_id, event_id from Inclusion_0
UNION ALL
select inclusion_rule_id, person_id, event_id from Inclusion_1
UNION ALL
select inclusion_rule_id, person_id, event_id from Inclusion_2) I;
ANALYZE inclusion_events
;
TRUNCATE TABLE Inclusion_0;
DROP TABLE Inclusion_0;
TRUNCATE TABLE Inclusion_1;
DROP TABLE Inclusion_1;
TRUNCATE TABLE Inclusion_2;
DROP TABLE Inclusion_2;
CREATE TEMP TABLE included_events
AS
SELECT
event_id, person_id, start_date, end_date, op_start_date, op_end_date
FROM
(
  SELECT event_id, person_id, start_date, end_date, op_start_date, op_end_date, row_number() over (partition by person_id order by start_date ASC) as ordinal
  from
  (
    select Q.event_id, Q.person_id, Q.start_date, Q.end_date, Q.op_start_date, Q.op_end_date, SUM(coalesce(POWER(cast(2 as bigint), I.inclusion_rule_id), 0)) as inclusion_rule_mask
    from qualified_events Q
    LEFT JOIN inclusion_events I on I.person_id = Q.person_id and I.event_id = Q.event_id
    GROUP BY Q.event_id, Q.person_id, Q.start_date, Q.end_date, Q.op_start_date, Q.op_end_date
  ) MG -- matching groups
  -- the matching group with all bits set ( POWER(2,# of inclusion rules) - 1 = inclusion_rule_mask
  WHERE (MG.inclusion_rule_mask = POWER(cast(2 as bigint),3)-1)
) Results
WHERE Results.ordinal = 1
;
ANALYZE included_events
;
-- generate cohort periods into #final_cohort
CREATE TEMP TABLE cohort_rows
AS
SELECT
person_id, start_date, end_date
FROM
( -- first_ends
	select F.person_id, F.start_date, F.end_date
	FROM (
	  select I.event_id, I.person_id, I.start_date, CE.end_date, row_number() over (partition by I.person_id, I.event_id order by CE.end_date) as ordinal
	  from included_events I
	  join ( -- cohort_ends
-- cohort exit dates
-- By default, cohort exit at the event's op end date
select event_id, person_id, op_end_date as end_date from included_events
    ) CE on I.event_id = CE.event_id and I.person_id = CE.person_id and CE.end_date >= I.start_date
	) F
	WHERE F.ordinal = 1
) FE;
ANALYZE cohort_rows
;
CREATE TEMP TABLE final_cohort
AS
SELECT
person_id, min(start_date) as start_date, end_date
FROM
( --cteEnds
	SELECT
		 c.person_id
		, c.start_date
		, MIN(ed.end_date) AS end_date
	FROM cohort_rows c
	JOIN ( -- cteEndDates
    SELECT
      person_id
      , (event_date + -1 * 0*INTERVAL'1 day')  as end_date
    FROM
    (
      SELECT
        person_id
        , event_date
        , event_type
        , SUM(event_type) OVER (PARTITION BY person_id ORDER BY event_date, event_type ROWS UNBOUNDED PRECEDING) AS interval_status
      FROM
      (
        SELECT
          person_id
          , start_date AS event_date
          , -1 AS event_type
        FROM cohort_rows
        UNION ALL
        SELECT
          person_id
          , (end_date + 0*INTERVAL'1 day') as end_date
          , 1 AS event_type
        FROM cohort_rows
      ) RAWDATA
    ) e
    WHERE interval_status = 0
  ) ed ON c.person_id = ed.person_id AND ed.end_date >= c.start_date
	GROUP BY c.person_id, c.start_date
) e
group by person_id, end_date
;
ANALYZE final_cohort
;
""")

In [6]:
duckdb.sql("COPY final_cohort TO 'final_cohort_test.parquet' (FORMAT 'parquet')")

# Grabbing Clinical notes

In [4]:
import pandas as pd
import pyarrow.parquet as pq

table = pq.read_table("final_cohort.parquet", use_threads=True)
df = table.to_pandas(timestamp_as_object=True) 
df

Unnamed: 0,person_id,start_date,end_date,person_source_value,person_source_value_1
0,659,2001-11-26 00:00:00,2025-03-03 00:00:00,D28E749B18B1AD,D28E749B18B1AD
1,5283,1997-02-12 00:00:00,2025-03-03 00:00:00,D211D904C22F9D,D211D904C22F9D
2,5287,1998-03-08 00:00:00,2025-03-03 00:00:00,D54D8903D5644E,D54D8903D5644E
3,5924,1997-11-03 00:00:00,2025-03-03 00:00:00,D541693502EA22,D541693502EA22
4,11313,2011-03-06 00:00:00,2025-03-03 00:00:00,D4DA16CA35EA0B,D4DA16CA35EA0B
...,...,...,...,...,...
1929,9644412,2013-07-11 00:00:00,2025-03-03 00:00:00,D884389A104E1A,D884389A104E1A
1930,9925332,2003-10-09 00:00:00,2025-03-03 00:00:00,D1176F87173CFF,D1176F87173CFF
1931,10179762,2012-12-04 00:00:00,2025-03-03 00:00:00,D9646D1413897D,D9646D1413897D
1932,10416394,2013-02-17 00:00:00,2025-03-03 00:00:00,D6BE43CD0DC2E1,D6BE43CD0DC2E1


In [6]:
register_tables('deid_omop')

['care_site',
 'cdm_source',
 'concept',
 'concept_ancestor',
 'concept_class',
 'concept_recommended',
 'concept_relationship',
 'concept_synonym',
 'condition_era',
 'condition_occurrence',
 'condition_occurrence_extension',
 'death',
 'device_exposure',
 'domain',
 'drug_era',
 'drug_exposure',
 'drug_strength',
 'location',
 'measurement',
 'observation',
 'observation_period',
 'person',
 'person_extension',
 'procedure_occurrence',
 'procedure_occurrence_extension',
 'provider',
 'relationship',
 'version',
 'visit_detail',
 'visit_occurrence',
 'vocabulary']

In [7]:
register_tables('deid_cdw')

['addressdim',
 'allergendim',
 'allergyfact',
 'anesthesiarecordattributevaluedim',
 'anesthesiarecordfact',
 'anesthesiaregistrymetricfact',
 'attendingproviderfact',
 'attributedim',
 'billareadim',
 'billingaccountencountermappingfact',
 'billingaccountfact',
 'billingproceduredim',
 'billingproceduresetdim',
 'billingtransactionfact',
 'birthanesthesiabridge',
 'birthattributevaluedim',
 'birthaugmentationbridge',
 'birthaugmentationindicationbridge',
 'birthcervicalripeningbridge',
 'birthcesareanindicationbridge',
 'birthepisiotomybridge',
 'birthfact',
 'birthinductionbridge',
 'birthinductionindicationbridge',
 'birthrupturetypebridge',
 'cancerstagingattributevaluedim',
 'cancerstagingfact',
 'careareadim',
 'careteamfact',
 'changedpatdurablekey_crosswalk_withoffsets',
 'chiefcomplaintbridge',
 'chiefcomplaintdim',
 'codedhospitaladmissioncmsvariabledatamart',
 'codedhospitaladmissionfact',
 'codedprocedurefact',
 'costcenterdim',
 'coveragedim',
 'datedim',
 'dentalfindinge

In [19]:
final_cohort = duckdb.read_parquet("final_cohort.parquet")

In [26]:
final_cohort['person_id'].fetchdf().nunique()

person_id    1934
dtype: int64

In [40]:
df = duckdb.sql("""
SELECT fc.*, p.person_source_value
FROM final_cohort fc
INNER JOIN person p ON p.person_id = fc.person_id
""")
#duckdb.sql("""
#COPY
#    (SELECT * FROM df)
#    TO 'final_cohort.parquet'
#    (FORMAT parquet);
#""")

In [41]:
df['person_id'].fetchdf().nunique()

person_id    1934
dtype: int64

In [42]:
final_cohort_batch = duckdb.sql("""
SELECT fc.*, 
FROM final_cohort fc
ORDER BY RANDOM()
""")
duckdb.register("final_cohort_batch", final_cohort_batch)


<duckdb.duckdb.DuckDBPyConnection at 0x7675b9377db0>

In [43]:
final_cohort_batch['person_id'].fetchdf().nunique()

person_id    1934
dtype: int64

In [48]:
df = duckdb.sql("""
SELECT 
    fc.*, 
    nm.deid_note_key, 
    nm.note_type, 
    nm.deid_service_date
FROM final_cohort_batch fc
INNER JOIN person p ON p.person_id = fc.person_id
INNER JOIN note_metadata nm ON p.person_source_value = nm.patientepicid
        AND nm.deid_service_date >= fc.start_date
        AND nm.deid_service_date <= fc.end_date


""")


In [45]:
df['person_id'].fetchdf().nunique()

person_id    1878
dtype: int64

In [49]:
duckdb.sql("""
COPY
    (SELECT * FROM df)
    TO 'cohort_metadata1.parquet'
    (FORMAT PARQUET);
""")

In [50]:
cohort_metadata = duckdb.read_parquet('cohort_metadata1.parquet')
cohort_metadata.shape

(464136, 10)

In [51]:
cohort_metadata['person_id'].fetchdf().nunique()

person_id    1878
dtype: int64

# Grabbing only Progress notes

In [52]:
just_pn = duckdb.sql("""
SELECT *
FROM cohort_metadata
WHERE note_type = 'Progress Notes'
""")


In [53]:
just_pn['person_id'].fetchdf().nunique()

person_id    1762
dtype: int64

In [56]:
cohort_notes = duckdb.sql("""
SELECT 
    cm.*,
    SUBSTR(note_text.note_text, 1, 8000) AS note_text
FROM just_pn cm
LEFT JOIN note_text
ON cm.deid_note_key = note_text.deid_note_key
WHERE note_text.note_text IS NOT NULL

""")



In [57]:
duckdb.sql("""
COPY
    (SELECT * FROM cohort_notes)
    TO 'cohort_notes_test.parquet'
    (FORMAT PARQUET);
""")

In [58]:
test = duckdb.read_parquet('cohort_notes_test.parquet')


In [67]:
test_df = test.fetchdf()

### Sort Progress notes & take only latest

In [69]:
test_df['start_date'] = pd.to_datetime(test_df['start_date'])
test_df['end_date'] = pd.to_datetime(test_df['end_date'])
test_df['deid_service_date'] = pd.to_datetime(test_df['deid_service_date'])
latest_notes = test_df.sort_values(['person_id', 'deid_service_date'], ascending=[True, False]) \
                 .drop_duplicates(subset=['person_id'], keep='first')


In [76]:
latest_notes.to_parquet('latest_notes.parquet', engine='pyarrow', index=False)