Do initial setup. This involves the next four code-blocks:

1. Import required modules:
    * pyspark – allows for running on multiple nodes
    * dxpy – allows for querying DNANexus-specific files
    * dxdata – allows for querying DNANexus-specific databases
    * pandas – dataframe manipulation
2. Connect to the pyspark cluster to be able to pull UK Biobank data, structured in a mysql-like database
3. Use dxpy to find the database 'file' in our current project.
4. Load the database into this instance with the dxdata package

In [36]:
import re
import pyspark
import dxpy
import dxdata
import pandas as pd
import numpy as np

In [2]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [3]:
dispensed_dataset_id = dxpy.find_one_data_object(typename = "Dataset",
                                                name = "app*.dataset", folder = "/",
                                                name_mode = "glob") ["id"]

In [4]:
dataset = dxdata.load_dataset(id = dispensed_dataset_id)
dataset.entities

[<Entity "participant">,
 <Entity "death">,
 <Entity "death_cause">,
 <Entity "hesin">,
 <Entity "hesin_critical">,
 <Entity "hesin_delivery">,
 <Entity "hesin_diag">,
 <Entity "hesin_maternity">,
 <Entity "hesin_oper">,
 <Entity "hesin_psych">,
 <Entity "covid19_result_england">,
 <Entity "covid19_result_scotland">,
 <Entity "covid19_result_wales">,
 <Entity "gp_clinical">,
 <Entity "gp_registrations">,
 <Entity "gp_scripts">]

Now we get the participant sql table, which provides the dxdata package with information
to extract individual participant information.

In [5]:
participant = dataset["participant"]

Now use the participant table to pull specific data fields from the UK Biobank database. The `find_field`
method simply takes a UKBiobank field ID (e.g. field 22001 is genetic sex). Here we need to extract several 
fields:
* Fields 130000 - 132605 are First Incidence records (i.e. GP data).
    * Even fields (e.g. 130000) is the data that a record was entered into a participant's medical records
    * Odd fields (e.g. 130001) is the source of the record itself (e.g. Death Record, HES, etc.)

In [6]:
field_names = ['eid']
titles = ['eid']
for i in range(130000,132605, 2):
    field_name = 'p%i' % i
    try:
        curr_field = participant.find_field(field_name)
        field_names.append(field_name)
        titles.append(curr_field.title)
    except LookupError:
        continue

print('%s - %s' % (field_names[0], titles[0]))
len(field_names)

eid - eid


1131

Here we are actually extracting the per-individual values. Due to some 'weirdness' with pyspark,
I extract a fixed number of fields to ensure that we don't run into a memory issue. This issue 
happens regardless of instance size.

In [None]:
file_counter = 1

for i in range (1, len(field_names), 100):
    end = i + 99 if (i + 100) < len(field_names) else (len(field_names) - 1)
    print('%i - %i' % (i, end))
    
    curr_names = field_names[i:end]
    curr_names.insert(0, field_names[0])
    curr_titles = titles[i:end]
    curr_titles.insert(0, 'eid')
    
    df = participant.retrieve_fields(names = curr_names,
                                     coding_values = "replace",
                                     engine = dxdata.connect())
    
    df_pd = df.toPandas()
    
    # Make a name remapper:
    name_map = {}
    for i in range(1,len(curr_names), 1):
        short_name = re.search("[A-Z]\d{2}",curr_titles[i]).group(0)
        name_map[curr_names[i]] = short_name
        
    df_pd = df_pd.rename(columns = name_map)
    
    # Cannot combine all of the files together due to memory problems, so write to individual files to be processed later.
    df_pd.to_csv(f'first_incidence_table{file_counter}.tsv', sep = "\t", na_rep='NA', index = F)
    
    file_counter += 1

1 - 100
101 - 200
201 - 300
301 - 400
401 - 500
501 - 600


Also need to download participant birth / death information so we can do time to event-type analyses

In [72]:
df = participant.retrieve_fields(names = ['eid','p52','p34','p40000_i0'],
                                 coding_values = "replace",
                                 engine = dxdata.connect())

In [75]:
df_pd = df.toPandas()

# Convert to actual dob/dod
df_pd = df_pd[pd.isnull(df_pd['p52'])==False]
df_pd['dob'] = df_pd.apply(lambda x: pd.to_datetime(f'{x["p34"]:0.0f}-{x["p52"]}-15', format='%Y-%B-%d'), axis=1)
df_pd = df_pd.drop(columns={'p52','p34'})
df_pd = df_pd.rename(columns={'p40000_i0':'dod'})

# Write out...
df_pd.to_csv('vital_stats.tsv', sep="\t", index=False, na_rep="NA")