In [92]:
# Import packages
import pyspark
import dxpy
import dxdata
from pyspark.sql.functions import concat_ws

In [2]:
# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [3]:
# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname='database', 
    name='app*', 
    folder='/', 
    name_mode='glob', 
    describe=True)
dispensed_database_name = dispensed_database['describe']['name']

dispensed_dataset = dxpy.find_one_data_object(
    typename='Dataset', 
    name='app*.dataset', 
    folder='/', 
    name_mode='glob')
dispensed_dataset_id = dispensed_dataset['id']

In [4]:
# Load dataset
dataset = dxdata.load_dataset(id=dispensed_dataset_id)
engine = dxdata.connect()
dataset.entities
participant = dataset["participant"]

In [5]:
# Create function to look up fields based on the field ID
def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

def field_names_for_id(field_id):
    return [f.name for f in fields_for_id(field_id)]

In [19]:
# Add all relevant fields for outcome and covariates
field_names = ["eid"] + field_names_for_id(34) + field_names_for_id(40000) + field_names_for_id(22189) + field_names_for_id(31) + field_names_for_id(21001) + field_names_for_id(1558) + field_names_for_id(20160) + field_names_for_id(20161) + field_names_for_id(50)
df = participant.retrieve_fields(names=field_names, engine=dxdata.connect(), coding_values="replace")
df.head

<bound method DataFrame.head of DataFrame[eid: string, p34: bigint, p40000_i0: date, p40000_i1: date, p22189: double, p31: string, p21001_i0: double, p21001_i1: double, p21001_i2: double, p21001_i3: double, p1558_i0: string, p1558_i1: string, p1558_i2: string, p1558_i3: string, p20160_i0: string, p20160_i1: string, p20160_i2: string, p20160_i3: string, p20161_i0: double, p20161_i1: double, p20161_i2: double, p20161_i3: double, p50_i0: double, p50_i1: double, p50_i2: double, p50_i3: double]>

In [20]:
df_pandas = df.toPandas()
df_pandas.head()
df_pandas.to_csv('GBACovar.csv', index=False)

In [109]:
icd10_tbi_field_names = ["eid"] + field_names_for_id(41270)
icd10 = participant.retrieve_fields(names=icd10_tbi_field_names, engine=dxdata.connect(), coding_values="replace")
icd9_tbi_field_names = ["eid"] + field_names_for_id(41271)
icd9 = participant.retrieve_fields(names=icd9_tbi_field_names, engine=dxdata.connect(), coding_values="replace")
icd10 = icd10.withColumn('p41270', concat_ws(',', 'p41270'))
icd9 = icd9.withColumn('p41271', concat_ws(',', 'p41271'))

In [110]:
#icd10codes = ["S02%", "S04%", "S06%", "S07%", "T74%"]
icd10.createOrReplaceTempView("icd10")
icd10_new = spark.sql("SELECT eid FROM icd10 WHERE p41270 LIKE '%S02%' OR p41270 LIKE '%S04%' OR p41270 LIKE '%S06%' OR p41270 LIKE '%S07%' OR p41270 LIKE '%T74%'")

In [111]:
#icd9codes = ["800.", "801.", "803.", "804.", "850.", "950.1", "950.2", "950.3", "959.01", "995.55"]
icd9.createOrReplaceTempView("icd9")
icd9_new = spark.sql("SELECT eid FROM icd9 WHERE p41271 LIKE '%800.%' OR p41271 LIKE '%801.%' OR p41271 LIKE '%803.%' OR p41271 LIKE '%804.%' OR p41271 LIKE '%850.%' OR p41271 LIKE '%950.1%' OR p41271 LIKE '%950.2%' OR p41271 LIKE '%950.3%' OR p41271 LIKE '%959.01%' OR p41271 LIKE '%995.55%'")

In [112]:
icd10_pandas = icd10_new.toPandas()
icd9_pandas = icd9_new.toPandas()
icd10_pandas.to_csv('TBI_ICD10.csv', index = False)
icd9_pandas.to_csv('TBI_ICD9.csv', index = False)

In [None]:
dxpy.upload_local_file('GBACovar.csv')
dxpy.upload_local_file('TBI_ICD10.csv')
dxpy.upload_local_file('StrokeData.csv')