In [4]:
import os
import pandas as pd
import numpy as np
from neo4j import Query, GraphDatabase, RoutingControl, Result # Python database driver 5.13 +

## Database connection

In [2]:
DB_ULR = "neo4j://localhost:7687"
DB_USER = "neo4j"
DB_PASS = "test1234"
DB_NAME = "demo" # Have to be neo4j for neo4j aura (but keep it, good for testing on local dev env)

In [3]:
driver = GraphDatabase.driver(DB_ULR, auth=(DB_USER, DB_PASS))
driver.verify_connectivity()

In [None]:
# Ensure database (skip this step on neo4j aura)
records, summary, keys = driver.execute_query(
    "create database {dbname} if not exists".format(dbname = DB_NAME),
    database_="system",
    routing_=RoutingControl.WRITE
)

## Utility functions

In [None]:
## Utility
def split_dataframe(df, chunk_size = 5000): 
    chunks = list()
    num_chunks = len(df) // chunk_size + 1
    for i in range(num_chunks):
        chunks.append(df[i*chunk_size:(i+1)*chunk_size])
    return chunks

## Define indexes and constraints

In [None]:
schema_statements = [
    'create constraint if not exists for (n:UseCase) require (n.name) is node key',
    'create constraint if not exists for (n:Industry) require (n.name) is ::String',
    'create constraint if not exists for (n:BusinessConcept) require (n.id) is node key',
]
for statement in schema_statements:
    driver.execute_query(
        statement,
        database_=DB_NAME,
        routing_=RoutingControl.WRITE
    )

# Fetch all constraints
schema_result_df  = driver.execute_query(
    'show constraints',
    database_=DB_NAME,
    routing_=RoutingControl.READ,
    result_transformer_= lambda r: r.to_df()
)
schema_result_df.head(100)

## Data sources

In [None]:
dm_df = pd.read_parquet('./syth_sdtm/dm_synth.parquet')

In [None]:
dm_df.head()

In [None]:
sv_df = pd.read_parquet('./syth_sdtm/sv_synth.parquet')

In [None]:
sv_df.head()

## Graph creation

In [None]:
# Not wrapped in split_dataframe (looks cleaner if you only have a couple of thousand rows of data to process)
# driver.execute_query(
#     ''' 
#         unwind $rows as row
#         merge (n:UseCase{name: row['COL1']})
#             set n.description = row['COL2]
#         return count(*) as rows_processed
#     ''',
#     database_=DB_NAME,
#     routing_=RoutingControl.WRITE,
#     rows = data[['COL1', 'COL1']].drop_duplicates().to_dict('records')
# )

In [None]:
# Create business concept nodes
for chunk in split_dataframe(dm_df[['id', 'name']].drop_duplicates()):
    records, summary, keys = driver.execute_query(
        ''' 
            unwind $rows as row
            merge (bc:BusnessConecept{id: row.id})
                set bc.term = row.name
            return count(*) as rows_processed
        ''',
        database_=DB_NAME,
        routing_=RoutingControl.WRITE,
        rows = chunk.to_dict('records')
    )

In [None]:
# ClinicalStudy nodes
for chunk in split_dataframe(dm_df[['STUDYID']].drop_duplicates()):
    records, summary, keys = driver.execute_query(
        ''' 
            unwind $rows as row
            merge (cs:ClinicalStudy{id: row.STUDYID})
            return count(*) as rows_processed
        ''',
        database_=DB_NAME,
        routing_=RoutingControl.WRITE,
        rows = chunk.to_dict('records')
    )

In [2]:
# PatientVisist nodes
# + example for how to do resonable sized transactions in the database
for chunk in split_dataframe(sv_df[['STUDYID', 'USUBJID','VISITNUM','SVSTDTC','VISIT']].drop_duplicates()):
    records, summary, keys = driver.execute_query(
        ''' 
            unwind $rows as row
            merge (pv:PatientVisit{study_id: row.STUDYID, visit_num: row.VISITNUM, patient_id: row.USUBJID})
            set pv.date = datetime(row.SVSTDTC),
                pv.type = row.visist
            ....
            return count(*) as rows_processed
        ''',
        database_=DB_NAME,
        routing_=RoutingControl.WRITE,
        rows = chunk.to_dict('records')
    )

NameError: name 'split_dataframe' is not defined

In [None]:
# Person nodes
records, summary, keys = driver.execute_query(
    ''' 
        unwind $rows as row
        merge (pv:Person{id: row.USUBJID})
        return count(*) as rows_processed
    ''',
    database_=DB_NAME,
    routing_=RoutingControl.WRITE,
    rows = sv_df[['USUBJID']].drop_duplicates().to_dict('records')
)

In [None]:
# Temp
def get_person(person_id):

    records, summary, keys = driver.execute_query(
        ''' 
            match (pv:Person{id: $person_idn})
            return count(*) as rows_processed
        ''',
        database_=DB_NAME,
        routing_=RoutingControl.READ,
        person_id = person_id
    )
    return records[0]

In [None]:
# (:Person)<-[:PATIENT]-(:PatientVisist) Relationships
records, summary, keys = driver.execute_query(
    ''' 
        unwind $rows as row
        match (pv:PatientVisit{study_id: row.STUDYID, visit_num: row.VISITNUM, patient_id: row.USUBJID}),
              (p:Person{id: row.USUBJID})
        merge (p)<-[r:PATIENT]-(pv)
        return count(*) as rows_processed
    ''',
    database_=DB_NAME,
    routing_=RoutingControl.WRITE,
    rows = sv_df[['STUDYID', 'USUBJID','VISITNUM','SVSTDTC','VISIT']].drop_duplicates().to_dict('records')
)

In [None]:
# (:ClinicalStudy)<-[:STUDY]-(:PatientVisist) Relationships
records, summary, keys = driver.execute_query(
    ''' 
        unwind $rows as row
        match (pv:PatientVisit{study_id: row.STUDYID, visit_num: row.VISITNUM, patient_id: row.USUBJID}),
              (s:ClinicalStudy{id: row.STUDYID})
        merge (s)<-[:STUDY]-(pv)
        return count(*) as rows_processed
    ''',
    database_=DB_NAME,
    routing_=RoutingControl.WRITE,
    rows = sv_df[['STUDYID', 'USUBJID','VISITNUM','SVSTDTC','VISIT']].drop_duplicates().to_dict('records')
)