# Extracting data using the UKB RAP

## Introduction
In this notebook, we will dispense data for analysis using the RAP. 

## About this notebook

This is a Python notebook.

Data on the RAP is dispensed using Spark (a data processing framework well-suited to big data which works by distributing data and data processing over several machines). We will access the data using [pyspark](https://pypi.org/project/pyspark/), a Python interface to Spark.

Here, we will also do some manipulation of the data using pyspark. There are alternatives: you could convert earlier to a [pandas](https://pandas.pydata.org/) or [koalas](https://koalas.readthedocs.io/en/latest/) data frame and manipulate those. Be aware that working in pandas no longer takes advantage of the distributed aspect of Spark, so you may run into issues with the size of the data. koalas looks and feels like pandas but makes use of the distributed aspect of Spark.

## How to run this notebook

This notebook should be run in a *Spark in JupyterLab* session. Read how to set it up [here](https://dnanexus.gitbook.io/uk-biobank-rap/working-on-the-research-analysis-platform/using-spark-to-analyze-tabular-data).

## Set up the session

We load modules we'll use, and set up the Spark session:

In [1]:
import pyspark
import dxpy # tools starting with 'dx' are from the DNANexus ecosystem
import dxdata
from pyspark.sql.functions import when, concat_ws
from re import sub

In [2]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/cluster/dnax/jars/dnanexus-api-0.1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/cluster/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2023-09-05 14:09:03.567 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-09-05 14:09:04.640 WARN  Utils:69 - Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 43000. Attempting port 43001.
2023-09-05 14:09:04.942 WARN  MetricsReporter:84 - No metrics configured for reporting
2023-09-05 14:09:04.944 WARN  LineProtoUsageReporter:48 - Telegraf configurations: url [metrics.push.telegraf.hostport], user [metrics.push.telegraf.user] or password [metrics.push.telegraf.password] missing.
2023-09-05 14:09:04.944 WARN  MetricsReporter:117 - metrics.scraping.httpserver.port


## Dispense a dataset

In [3]:
dispensed_database = dxpy.find_one_data_object(
    classname="database", 
    name="app*", 
    folder="/", 
    name_mode="glob", 
    describe=True)
dispensed_database_name = dispensed_database["describe"]["name"]

dispensed_dataset = dxpy.find_one_data_object(
    typename="Dataset", 
    name="app*.dataset", 
    folder="/", 
    name_mode="glob")
dispensed_dataset_id = dispensed_dataset["id"]

## Load dataset

In [4]:
dataset = dxdata.load_dataset(id=dispensed_dataset_id)

## Tabular participant data

We first load the "straightforward" tabular participant data:

In [5]:
participant = dataset["participant"]

Next, we'll load a list of fields we might want for our analysis. We use this utility function:

In [6]:
def load_column_list(file_name):
    """Load list of UK Biobank column IDs from file
    Column IDs can be obtained from the UK Biobank showcase: https://biobank.ndph.ox.ac.uk/showcase/index.cgi
    e.g. 90012 refers to the recommended variable for accelerometer measured physical activity
        https://biobank.ndph.ox.ac.uk/showcase/field.cgi?id=90012
        
    This function is due to Aiden Doherty.

    :param str file_name: Name of file listing UK Biobank column IDs
    :return: list of IDs
    :rtype: list
    :Example:
    >>> load_column_list("field_list.txt")
    ["31", "34", "52" ...]
    """
    
    column_IDs = []
    
    with open(file_name) as f:
            for line in f:
                li = line.strip()
                if "#" in li:
                    li = li.split("#")[0].strip()
                if not li.startswith("#") and not li=="":
                    column_IDs += [li]
    return column_IDs

We read in the list:

In [7]:
column_list = load_column_list("candidate_field_list.txt") # This is a text file listing the field IDs we'll use
# You should change this file location as appropriate

Next, we'll use these field numbers to work out the [field *names*](https://dnanexus.gitbook.io/uk-biobank-rap/frequently-asked-questions#how-are-column-names-determined-for-the-dispensed-database) we need.

In particular, UK Biobank fields sometimes have *instance* and *array* indices, which are [indicated in the field names](https://dnanexus.gitbook.io/uk-biobank-rap/frequently-asked-questions#how-are-column-names-determined-for-the-dispensed-database). Instances refer to different occasions when participants were measured (e.g. different visits to the assessment centre). For most instanced variables, [instance 0 is baseline, instance 1 is a resurvey among a limited number of participants, instance 2 is the imaging visit, and instance 3 is the first repeat imaging visit](https://biobank.ndph.ox.ac.uk/ukb/instance.cgi?id=2). At each instance after baseline, only a subset of participants were measured. Array indices are used where variables have multiple items e.g. participants gave multiple answers (e.g. employment history) or had multiple measurements on a single occasion (e.g. blood pressure).

We write utility functions to get names for a particular field and instance. These functions are based on [functions from DNANexus available on GitHub here](https://github.com/dnanexus/OpenBio/blob/master/UKB_notebooks/ukb-rap-pheno-basic.ipynb). 

In [8]:
def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

def field_names_for_id_instanced(field_id, instance="i0", include_non_instanced=True):
    candidate_fields = [f.name for f in fields_for_id(field_id)]
    return_fields = [f for f in candidate_fields if instance in f]
    if (include_non_instanced): # This means fields without instancing (e.g. sex) will be included, and defaults to true
        return_fields += [f for f in candidate_fields if "_i" not in f]
    return return_fields

In this analysis, we will take non-accelerometer participant data from baseline. This means we need variables from instance 0 or with no instancing (some variables, like sex, have one value across all visits; the accelerometer variables are also not instanced):

In [9]:
field_list = ["eid"]
for col in column_list:
    field_list += field_names_for_id_instanced(col, instance="i0")

  return sorted(fields, key=lambda f: LooseVersion(f.name))


An aside: as accelerometer wear occurred several years after baseline assessment, and some participants have undergone repeat assessments between baseline assessment and accelerometer wear, so we might alternatively use data from the most recent assessment before accelerometer wear for covariate adjustment. Given only a small proportion of participants have a second visit before accelerometer wear, for simplicity we ignore that here. However, we will add a couple of fields on self-reported conditions from repeat assessments, so we can more rigorously account for prevalent disease.

In [10]:
field_list += ["p6150_i1", "p6150_i2", "p53_i1", "p53_i2"] # we'll take a couple of extra fields to allow us to
# more rigorously exclude people with prevalent disease

If we just input the field list like this, we'll retrieve fields with their UK Biobank field names (format: [pX_iX_aX](https://dnanexus.gitbook.io/uk-biobank-rap/working-on-the-research-analysis-platform/using-spark-to-analyze-tabular-data#database-columns)). We can set up some aliases with more human readable names:

[We use two different approaches here to illustrate them- you can use either, neither (i.e. use the fields with their original field names), or a mix.]

In [11]:
# We'll handcraft some of the names to slot neatly into later code
field_list_aliases = {
    "eid" : "eid", 
    "p31" :  "sex",
    "p52" : "month_birth", 
    "p34" : "year_birth", 
    "p54_i0" :"ukb_assess_cent", 
    "p21000_i0" : "ethnicity_raw",
    "p189" : "tdi_raw",
    "p6138_i0" : "qualif_raw",
    "p845_i0" : "age_education_raw",
    "p20116_i0" :"smoking_raw",
    "p1558_i0" :"alcohol_raw", 
    "p21001_i0" : "BMI_raw", 
    "p191": "date_lost_followup",
    "p6150_i0": "self_report_cvd_baseline", 
    "p6150_i1": "self_report_cvd_inst_1", 
    "p6150_i2": "self_report_cvd_inst_2", 
    "p53_i0": "date_baseline", 
    "p53_i1": "date_inst_1", 
    "p53_i2": "date_inst_2", 
    "p90016" : "quality_good_calibration",
    "p90183" : "clips_before_cal",
    "p90185" : "clips_after_cal", 
    "p90187" : "total_reads",
    "p90015" :"quality_good_wear_time",
    "p90012": "overall_activity", 
    "p90011": "date_end_accel",
    "p20002": "conditions_at_baseline_2006"
    }

# We'll then auto-add names for other fields 
dataset_fields = [participant.find_field(name=x) for x in field_list] # get the full field info, not just the name
for field in dataset_fields:
    if field.name not in field_list_aliases.keys():
        field_list_aliases[field.name] = field.title

We retrieve relevant data:

In [12]:
participant_data = participant.retrieve_fields(names=field_list, engine=dxdata.connect(), coding_values="replace", column_aliases=field_list_aliases)

2023-09-05 14:09:18.520 WARN  ShellBasedUnixGroupsMapping:210 - unable to return groups for user VZ33590Qf7vk0vzPk8ZfJjJYXpV5820y5k7KkKk7__project-GXJBY38JZ32Vb0588YVYx3Gy
PartialGroupNameException The user name 'VZ33590Qf7vk0vzPk8ZfJjJYXpV5820y5k7KkKk7__project-GXJBY38JZ32Vb0588YVYx3Gy' is not found. id: ‘VZ33590Qf7vk0vzPk8ZfJjJYXpV5820y5k7KkKk7__project-GXJBY38JZ32Vb0588YVYx3Gy’: no such user
id: ‘VZ33590Qf7vk0vzPk8ZfJjJYXpV5820y5k7KkKk7__project-GXJBY38JZ32Vb0588YVYx3Gy’: no such user

	at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.resolvePartialGroupNames(ShellBasedUnixGroupsMapping.java:294)
	at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getUnixGroups(ShellBasedUnixGroupsMapping.java:207)
	at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getGroups(ShellBasedUnixGroupsMapping.java:97)
	at org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.getGroups(JniBasedUnixGroupsMappingWithFallback.java:51)
	at org.apache.hadoop.security.Groups$Gro

Setting coding values to "replace" in this function means we get variables' values (rather than the coded format they are stored in).

As we will analyse only those participants with accelerometer data, we drop participants without accelerometer data:

In [13]:
participant_wacc_data = participant_data.filter(participant_data.date_end_accel.isNotNull()) # the end time of accelerometer wear should be present for all
# participants with accelerometer data 

We now write the data out as a csv file. 

Variables for which the value is itself an array (the usual format where the participant could tick several boxes) cannot be written to csv. We convert the affected variables to string variables: 

In [14]:
arrayed_cols = ["qualif_raw", "self_report_cvd_baseline", "self_report_cvd_inst_1", "self_report_cvd_inst_2"]
for col in arrayed_cols:
    participant_wacc_data = participant_wacc_data.withColumn(col, concat_ws("|", col))

Spark stores and works with the data in several parts, so we coalesce them into one part:

In [15]:
participant_wacc_data = participant_wacc_data.coalesce(1)

We then set options and write to a csv file:

In [16]:
participant_wacc_data.write.mode("overwrite").option("header", "true").csv("participant_wacc_data")

2023-09-05 14:09:50.040 WARN  package:69 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


AnalysisException: CSV data source does not support array<string> data type.

Currently it is saved only in this session, so we need to upload to storage for reuse. As it was written by Spark, it is also stored in the "hdfs" (Hadoop Distributed File System), in a structured folder, so we first get it out of that and into "standard" storage:

In [None]:
%%bash
hdfs dfs -copyToLocal /user/root/participant_wacc_data participant_wacc_data # this gets it out of the hdfs
dx upload participant_wacc_data/*.csv --dest /users/mcgaghd/Data/participant_wacc_data.csv # this uploads to the permanent storage on the RAP

## Hospital data

There are several sources of data on health and disease outcomes in the UK Biobank dataset. These include data from inpatient hospital admissions (sometimes referred to as "Hospital Episode Statistics"), death registry data, cancer registry data, primary care data on some participants/for some purposes, and some outcomes added to the tabular data (including the "algorithmically defined outcomes" identifying particular diseases across different data sources). 

For this analysis (following [1](https://journals.plos.org/plosmedicine/article?id=10.1371/journal.pmed.1003487)), we will use hospital inpatient data on diagnoses and death data.

We will use two tables from the hospital inpatient data: hesin and hesin_diag. An entry in hesin is associated with a single hospital *episode* (generally: ["a continuous period
of admitted patient care administered under one consultant within that one hospital provider"](https://biobank.ndph.ox.ac.uk/ukb/ukb/docs/HospitalEpisodeStatistics.pdf)), and provides information such as dates associated with the episode. Entries in hesin_diag are associated with a particular diagnosis within a particular episode.

In this section, we will access, process, and save it.

We load the data: 

In [None]:
hesin = dataset["hesin"]
hesin_diag = dataset["hesin_diag"]

hesin_data = hesin.retrieve_fields(engine=dxdata.connect())
hesin_diag_data = hesin_diag.retrieve_fields(engine=dxdata.connect())

A small proportion of episodes are missing an episode start date but have different dates associated with the episode. In an optional additional step, we make a new "dateepiimp" column which takes the value of "epistart", but when that is missing imputes them with "disdate" [NOTE - 2022-10-24, UK Biobank's own protocol when producing derived fields is to impute with admidate, epiend and disdate (see https://biobank.ndph.ox.ac.uk/showcase/ukb/docs/first_occurrences_outcomes.pdf, page 8). This notebook will be updated to match.] :

In [None]:
hesin_data = hesin_data.withColumn("dateepiimp",
                                   when(hesin_data["epistart"].isNotNull(), hesin_data["epistart"]).otherwise(hesin_data["disdate"]))

 To associate dates with diagnoses, we need to join hesin and hesin_diag. We join on "eid" (participant ID), "ins_index" (the hospital episode in the data) and "dnx_hesin_id" (which is a combination of the participant ID and the instance index and so can be used to join uniquely - we join on all three just to avoid duplicating columns). 

In [None]:
hes_data = hesin_data.join(
    hesin_diag_data, 
    ["eid", "ins_index", "dnx_hesin_id"],
    "left_outer"
)

We keep only those participants with accelerometry data:

In [None]:
wacc_data =  participant_wacc_data.select("eid").rdd.flatMap(lambda x: x).collect()
hes_wacc_data = hes_data.filter(hes_data.eid.isin(wacc_data))

As for the tabular participant data, we coalesce to a single part and write out to a csv file:

In [None]:
hes_wacc_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("hes_wacc_data")

And again upload it to storage so we can reuse it: 

In [None]:
%%bash
hdfs dfs -copyToLocal /user/root/hes_wacc_data hes_wacc_data
dx upload hes_wacc_data/*.csv --dest /users/mcgaghd/Data/hes_wacc_data.csv

# Death data

In [None]:
death = dataset["death"]
death_cause = dataset["death_cause"]

death_data = death.retrieve_fields(engine=dxdata.connect())
death_cause_data = death_cause.retrieve_fields(engine=dxdata.connect())

To reduce the size of the dataset, we keep only those participants with accelerometry data:

In [None]:
death_wacc_data = death_data.filter(death_data.eid.isin(wacc_data))
death_cause_wacc_data = death_cause_data.filter(death_cause_data.eid.isin(wacc_data))

We now write it out to a csv file:

In [None]:
death_wacc_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("death_wacc_data")
death_cause_wacc_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("death_cause_wacc_data")

And again upload it to storage so we can reuse it: 

In [None]:
%%bash
hdfs dfs -copyToLocal /user/root/death_wacc_data death_wacc_data
hdfs dfs -copyToLocal /user/root/death_cause_wacc_data death_cause_wacc_data
dx upload death_wacc_data/*.csv --dest /users/mcgaghd/Data/death_wacc_data.csv
dx upload death_cause_wacc_data/*.csv --dest /users/mcgaghd/Data/death_cause_wacc_data.csv