In [1]:
import pandas as pd

from bodyport.config import (
    EXAMPLE_ECG_DIR_LATEST,
    EXAMPLE_ECG_DIR_NEW,
    DB_CONN_STRING
)
from bodyport.load import DataWarehouseManager
from bodyport.orm import Run, Subject

In [2]:
demo_db = DB_CONN_STRING + '.demo'

data_warehouse = DataWarehouseManager(db_conn_string=demo_db)

In [3]:
data_warehouse.empty()

In [5]:
# load the new data_dir into the warehouse
data_warehouse.load(data_dir=EXAMPLE_ECG_DIR_LATEST)

In [6]:
runs = data_warehouse.pandas_query('select * from run;')

runs

Unnamed: 0,created_at,updated_at,id,subject_id,number,clinic_id,measurement,date,units,fs,raw_path,meta_path,age_at_run,sex,run_hash,avg_bpm
0,2020-03-31 18:34:11.468641,,1,12,1,sf_state,cg,2005-04-26,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,22,female,f437867a83196c88be70add2de57f9f0,
1,2020-03-31 18:34:11.468641,,2,12,2,sf_state,cg,2005-04-26,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,22,female,d75626a4078271f7cff45c60d7df1f2a,
2,2020-03-31 18:34:11.468641,,3,15,1,sf_state,cg,2005-04-26,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,21,female,c4ce6ec2fa612c92539cf75b64d0a913,
3,2020-03-31 18:34:11.468641,,4,15,2,sf_state,cg,2005-04-26,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,21,female,d3ce0a659423edef6fa63df4e632574f,
4,2020-03-31 18:34:11.468641,,5,23,1,sf_state,cg,2005-04-26,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,25,male,51111c0b4cf36a8202b0764714a589e8,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
283,2020-03-31 18:34:11.468641,,284,51,4,sf_state,cg,2005-05-20,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,31,male,42358e20139c242f0cc6cafea4ed4d31,
284,2020-03-31 18:34:11.468641,,285,69,1,sf_state,cg,2005-05-12,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,16,male,24735dea9d0feaffd113eb5fbe410d81,
285,2020-03-31 18:34:11.468641,,286,69,2,sf_state,cg,2005-05-12,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,16,male,ed51ff329c3b73dd41d5cce76f6447f6,
286,2020-03-31 18:34:11.468641,,287,56,1,sf_state,cg,2005-05-12,mV,500,/Users/sep/workspace/takehomes/bodyport/data/i...,/Users/sep/workspace/takehomes/bodyport/data/i...,21,female,340fcd5aeb681bac2219e9944f76da77,


In [7]:
initial_run_count = len(runs)

print(f"We found {initial_run_count} runs in upload {EXAMPLE_ECG_DIR_LATEST.stem}")

We found 288 runs in upload 2020-01-01


In [8]:
subjects = data_warehouse.pandas_query('select * from subject;')

subjects

Unnamed: 0,created_at,updated_at,id,sex,birth_year
0,2020-03-31 18:34:11.468641,,1,male,1979
1,2020-03-31 18:34:11.468641,,2,female,1981
2,2020-03-31 18:34:11.468641,,3,female,1982
3,2020-03-31 18:34:11.468641,,4,female,1984
4,2020-03-31 18:34:11.468641,,5,male,1984
...,...,...,...,...,...
75,2020-03-31 18:34:11.468641,,76,female,1985
76,2020-03-31 18:34:11.468641,,77,male,1986
77,2020-03-31 18:34:11.468641,,78,male,1971
78,2020-03-31 18:34:11.468641,,79,female,1980


In [9]:
# ensure all subjects have the same age. 
# this is as i expected for this toy example, but 
# in reality I would assume the age is calculated at `date`
# on the run, and we would want to back out approximate birth year from it
# to put into the subject table, so we can maintain a "patient timeline"
assert runs.groupby('subject_id')['age_at_run'].nunique().nunique() == 1

In [10]:
# ensure all subjects have same sex in each run
assert runs.groupby('subject_id')['sex'].nunique().nunique() == 1

# Merge new data into warehouse


In [11]:
# new directory, EXAMPLE_ECG_DIR_NEW, contains 2 subjects with 2 runs each.
# Subject 80 we've seen before, but Subject 81 is new in this upload

new_runs = list(EXAMPLE_ECG_DIR_NEW.glob('*/*.csv'))

new_runs

[PosixPath('/Users/sep/workspace/takehomes/bodyport/data/incoming/clinic=sf_state/measurement=ecg/2020-12-01/subject_80/run_99.csv'),
 PosixPath('/Users/sep/workspace/takehomes/bodyport/data/incoming/clinic=sf_state/measurement=ecg/2020-12-01/subject_80/run_1.csv'),
 PosixPath('/Users/sep/workspace/takehomes/bodyport/data/incoming/clinic=sf_state/measurement=ecg/2020-12-01/subject_81/run_1.csv'),
 PosixPath('/Users/sep/workspace/takehomes/bodyport/data/incoming/clinic=sf_state/measurement=ecg/2020-12-01/subject_81/run_2.csv')]

1 of subject 80's new runs is actually a replica of a previous runs. We'll see that our "checksum" check catches this and does not add that particular one, but run_99, which is off by 1 character, gets inserted.

**We expect 3 new records in the `run` table, and one new record in `subject`.**

In [12]:
data_warehouse.load(data_dir=EXAMPLE_ECG_DIR_NEW)

refreshed_runs = data_warehouse.pandas_query('select * from run;')

assert len(refreshed_runs) ==  initial_run_count + 3

### we also expect idempotency: i.e. re-running the fill command should not change the state at all

In [13]:
data_warehouse.load(data_dir=EXAMPLE_ECG_DIR_NEW)

refreshed_runs = data_warehouse.pandas_query('select * from run;')

assert len(refreshed_runs) ==  initial_run_count + 3

### The subject table did gain one record, (#81), as expected:

In [14]:
data_warehouse.pandas_query('select * from subject;')

Unnamed: 0,created_at,updated_at,id,sex,birth_year
0,2020-03-31 18:34:11.468641,,1,male,1979
1,2020-03-31 18:34:11.468641,,2,female,1981
2,2020-03-31 18:34:11.468641,,3,female,1982
3,2020-03-31 18:34:11.468641,,4,female,1984
4,2020-03-31 18:34:11.468641,,5,male,1984
...,...,...,...,...,...
76,2020-03-31 18:34:11.468641,,77,male,1986
77,2020-03-31 18:34:11.468641,,78,male,1971
78,2020-03-31 18:34:11.468641,,79,female,1980
79,2020-03-31 18:34:11.468641,,80,female,1985


In [16]:
data_warehouse.down()