In [1]:
# Import packages
import pyspark
import dxpy
import dxdata
from pyspark.sql.functions import concat_ws
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [3]:
# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname='database', 
    name='app*', 
    folder='/', 
    name_mode='glob', 
    describe=True)
dispensed_database_name = dispensed_database['describe']['name']

dispensed_dataset = dxpy.find_one_data_object(
    typename='Dataset', 
    name='app*.dataset', 
    folder='/', 
    name_mode='glob')
dispensed_dataset_id = dispensed_dataset['id']

In [4]:
# Load dataset
dataset = dxdata.load_dataset(id=dispensed_dataset_id)
engine = dxdata.connect()
dataset.entities
participant = dataset["participant"]

In [5]:
# Create function to look up fields based on the field ID
def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

def field_names_for_id(field_id):
    return [f.name for f in fields_for_id(field_id)]

In [6]:
field_ids = [
    str(x) for x in range(31040, 31050)
] + [
    str(x) for x in range(29156, 29162)
] + [
    '29207', '29207'
] + [
    str(x) for x in range(28010, 28034)
] + [
    str(x) for x in range(27990, 27994)
]
print(field_ids)

['31040', '31041', '31042', '31043', '31044', '31045', '31046', '31047', '31048', '31049', '29156', '29157', '29158', '29159', '29160', '29161', '29207', '29207', '28010', '28011', '28012', '28013', '28014', '28015', '28016', '28017', '28018', '28019', '28020', '28021', '28022', '28023', '28024', '28025', '28026', '28027', '28028', '28029', '28030', '28031', '28032', '28033', '27990', '27991', '27992', '27993']


In [7]:
# Add all relevant fields for outcome and covariates
field_names = [field_names_for_id(x) for x in field_ids]
field_names_flat = ['eid'] + [x for l in field_names for x in l]
covid = participant.retrieve_fields(names=field_names_flat, engine=dxdata.connect(), coding_values="replace")
covid.head

<bound method DataFrame.head of DataFrame[eid: string, p31040_i2: double, p31040_i3: double, p31041_i2: double, p31041_i3: double, p31042_i2: double, p31042_i3: double, p31043_i2: double, p31043_i3: double, p31044_i2: double, p31044_i3: double, p31045_i2: bigint, p31045_i3: bigint, p31046_i2: bigint, p31046_i3: bigint, p31047_i2_a0: string, p31047_i2_a1: string, p31047_i3_a0: string, p31047_i3_a1: string, p31048_i2: string, p31048_i3: string, p31049_i2: string, p31049_i3: string, p29156: string, p29157: string, p29158: string, p29159: string, p29160: string, p29161: string, p29207: date, p28010_i0: string, p28010_i1: string, p28010_i2: string, p28010_i3: string, p28010_i4: string, p28010_i5: string, p28010_i6: string, p28010_i7: string, p28010_i8: string, p28011_i0: string, p28011_i1: string, p28011_i2: string, p28011_i3: string, p28011_i4: string, p28011_i5: string, p28011_i6: string, p28011_i7: string, p28011_i8: string, p28012_i0: string, p28012_i1: string, p28012_i2: string, p28012

In [8]:
covid_pandas = covid.toPandas()
covid_pandas.head()
covid_pandas.to_csv('covid.csv', index=False)

In [None]:
dxpy.upload_local_file("covid_variables.ipynb")
dxpy.upload_local_file("covid.csv")