In [None]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import seaborn as sb
import matplotlib.pyplot as plt
from datetime import datetime
import math
from IPython.display import display, HTML
from datetime import date
import multiprocessing

import os
multiprocessing.cpu_count()

In [None]:
my_bucket = os.getenv('WORKSPACE_BUCKET')
CDR_version=os.getenv("WORKSPACE_CDR")

### Phecode definition for MDD (296.22)
also, get phecodes for comorbidity exclusion: 296.1, 295.1, 317, 316

In [None]:
rollup_map=pd.read_csv("phecode_rollup_map.csv")
ICD9_exclude=pd.read_csv("ICD9PhecodeExclude.csv")
ICD9_IC10_Phecodes=pd.read_csv("phecode_map_icd9_10.csv")

# strip off extra column 
ICD9_IC10_Phecodes = ICD9_IC10_Phecodes.iloc[:, 1:]
phecode_info=pd.read_csv("pheinfo.csv")

phecodes_list=ICD9_IC10_Phecodes["phecode"].unique().tolist()

In [None]:
query = ("""
SELECT distinct observation_source_value, vocabulary_id
FROM 
    (SELECT DISTINCT person_id, observation_source_concept_id, observation_source_value, observation_date
        FROM `"""+ str(CDR_version) +""".observation`) AS obs
     INNER JOIN 
        (SELECT DISTINCT concept_id, concept_name, concept_code, vocabulary_id 
            FROM `"""+str(CDR_version)+""".concept`
            where (vocabulary_id ='ICD9CM') or 
            (vocabulary_id ='ICD10CM')) as concept 
            on concept.concept_id = obs.observation_source_concept_id
""")
observation_codes = pd.read_gbq(query, dialect="standard")

observation_icd9 = observation_codes[observation_codes.vocabulary_id=="ICD9CM"]["observation_source_value"]
observation_icd10 = observation_codes[observation_codes.vocabulary_id=="ICD10CM"]["observation_source_value"]

In [None]:
def getPhecodeParticipants(phecodes_batch, return_dict, phecodes_list, CDR_version, num_processes):
    """
    Batching function for parallel extraction of participant Phecodes 
    ======================================================================================================
    phecodes_batch: Pandas Dataframe of Phecodes 
    return_dict: 
    phecodes_list: List of phecodes to process 
    CDR_version: String of current cdr version 
    ## need to include rollup 
    """
    size = int(np.ceil(len(phecodes_list)/num_processes))
    phecodes_code_list=phecodes_list[phecodes_batch*size:(phecodes_batch+1)*size]
    # changing for 4 cores 
    #phecodes_code_list=phecodes_list[phecodes_batch*235:(phecodes_batch+1)*235]
    # for particular codes, there are rollups, so we need to count on the fly 
    # note, there are icd9 codes that live in observation: 
    phecodes=ICD9_IC10_Phecodes[ICD9_IC10_Phecodes["phecode"].isin(phecodes_code_list)]
    
    ## ICD Codes in condition_occurrence (if a code is in observation, then it shouldn't query anything here)
    icd9_codes_cond=phecodes[phecodes['vocabulary_id']=='ICD9CM']["code"].tolist()
    #
    icd9_codes_cond_str="'"+"','".join(icd9_codes_cond)+"'"
    
    icd10_codes_cond=phecodes[phecodes['vocabulary_id']=='ICD10CM']["code"].tolist()
    icd10_codes_cond_str="'"+"','".join(icd10_codes_cond)+"'"
    
    ## ICD Codes in observation 
    
    icd9_codes_obs=np.intersect1d(observation_icd9,phecodes[phecodes['vocabulary_id']=='ICD9CM']["code"].tolist()).tolist()
    #
    icd9_codes_obs_str="'"+"','".join(icd9_codes_obs)+"'"
    
    icd10_codes_obs=np.intersect1d(observation_icd10,phecodes[phecodes['vocabulary_id']=='ICD10CM']["code"].tolist()).tolist()
    icd10_codes_obs_str="'"+"','".join(icd10_codes_obs)+"'"
    
    # there's a subtlety here that we need icd10 cm
    query="""SELECT DISTINCT icd.person_id,condition_start_date as start_date,condition_concept_id as cid,concept_code,vocabulary_id FROM `"""+CDR_version+""".concept` 
    c  INNER JOIN `"""+CDR_version+""".condition_occurrence` icd  ON icd.condition_source_concept_id=c.concept_id  
    WHERE vocabulary_id ='ICD9CM' AND  concept_code IN ("""+icd9_codes_cond_str+""") 
    ORDER BY condition_start_date"""
    icdcodes1_cond=pd.read_gbq(query, dialect="standard")
    query="""SELECT DISTINCT icd.person_id,condition_start_date as start_date,condition_concept_id as cid,concept_code, vocabulary_id FROM `"""+CDR_version+""".concept` 
    c  INNER JOIN `"""+CDR_version+""".condition_occurrence` icd  ON c.concept_id = icd.condition_source_concept_id  
    WHERE vocabulary_id ='ICD10CM' AND  concept_code IN ("""+icd10_codes_cond_str+""")
    ORDER BY condition_start_date"""
    icdcodes2_cond=pd.read_gbq(query, dialect="standard")
    
    #Now observations 
    
    query="""SELECT DISTINCT icd.person_id,observation_date as start_date, observation_concept_id as cid,concept_code,vocabulary_id FROM `"""+CDR_version+""".concept` 
    c  INNER JOIN `"""+CDR_version+""".observation` icd  ON icd.observation_source_concept_id=c.concept_id  
    WHERE vocabulary_id ='ICD9CM' AND  concept_code IN ("""+icd9_codes_obs_str+""") 
    ORDER BY start_date"""
    icdcodes1_obs=pd.read_gbq(query, dialect="standard")
    query="""SELECT DISTINCT icd.person_id,observation_date as start_date,observation_concept_id as cid, concept_code, vocabulary_id FROM `"""+CDR_version+""".concept` 
    c  INNER JOIN `"""+CDR_version+""".observation` icd  ON c.concept_id = icd.observation_source_concept_id  
    WHERE vocabulary_id ='ICD10CM' AND  concept_code IN ("""+icd10_codes_obs_str+""")
    ORDER BY start_date"""
    
    icdcodes2_obs=pd.read_gbq(query, dialect="standard")
    
    icdcodes=pd.concat([icdcodes1_cond,icdcodes2_cond,icdcodes1_obs,icdcodes2_obs]).drop_duplicates() # drop duplicates within vocab, person id and date

    patients_phcode_count=icdcodes[["person_id","start_date","concept_code","vocabulary_id"]].drop_duplicates()[["person_id","concept_code","start_date","vocabulary_id"]]

    patients_phcode_count=pd.merge(phecodes[["code","phecode","vocabulary_id"]],patients_phcode_count,left_on=["code", "vocabulary_id"],right_on=["concept_code", "vocabulary_id"])

    return_dict[phecodes_batch]=patients_phcode_count

In [None]:
## Batch get the phecodes 
def phecode_counts(num_processes, phecodes_list, CDR_version):
    size = int(np.ceil(len(phecodes_list)/num_processes))
    print("Processing phecodes in blocks of size: " + str(size))

    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(num_processes):
        p = multiprocessing.Process(target=getPhecodeParticipants, args=(i,return_dict, phecodes_list, CDR_version, num_processes)) #phecodes list needs to be included 
        jobs.append(p)
        p.start()
    # close out and join jobs 
    for proc in jobs:
        proc.join()
    # make into a dataframe 
    phecodes_patients_list=pd.concat(list(return_dict.values()))

    # now merge to rollup map 
    # Adjust to including the observation table

    phecodes_patients_list_merge = pd.merge(rollup_map, phecodes_patients_list,left_on = 'code', right_on = 'phecode')[["person_id","phecode_unrolled","start_date"]].drop_duplicates()
    ## clean a little bit more 
    # get rid of any additional
    # Including the observation table
    phecodes_patients_counts_tmp=phecodes_patients_list_merge[["person_id","phecode_unrolled","start_date"]].groupby(["person_id","phecode_unrolled"],as_index=False).count()
    #
    ## so as not to break the other code
    phecodes_patients_counts_tmp.columns = ["person_id", "phecode", 'count']
    
    return(phecodes_patients_counts_tmp)

In [None]:
com_phecode_list = [phecode for phecode in phecodes_list if phecode in [296.1, 295.1, 317, 316, 296.22]]
phecodes_patients_counts = phecode_counts(num_processes=36, 
                                          phecodes_list=com_phecode_list, 
                                          CDR_version = CDR_version)

In [None]:
phecodes_patients_counts.to_csv("./phecode_counts_allofUs.csv")