# pQTL relevant data extraction using PySpark on UKB RAP
This notebook is built upon two notebooks from DNAnexus:
https://github.com/dnanexus/OpenBio/blob/master/UKB_notebooks/ukb-rap-pheno-basic.ipynb
https://github.com/dnanexus/UKB_RAP/blob/main/proteomics/0_extract_phenotype_protein_data.ipynb

### This notebook consists of three sections:
### Step 1: Extract clinical phenotype data from dataset['participant'] 
### Step 2: Extract Olink proteomics from dataset['olink_instance_0']
### Step 3: Extract operation records from dataset['hesin_oper']

In [None]:
# Import packages
# dxpy allows python to interact with the platform storage
# Note: This notebook is using spark since the size of the dataset we're extracting
# (i.e. the number of fields) is too large for a single node instance.
import dxpy
import dxdata

import pandas as pd
import subprocess
import glob
import os
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext


# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
# Need to adjust this buffer otherwise will get an error in toPandas() call
conf = pyspark.SparkConf().set("spark.kryoserializer.buffer.max", "1024")

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
sqlContext = SQLContext(sc)

In [None]:
# Normal spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
# sc = pyspark.SparkContext()
# spark = pyspark.sql.SparkSession(sc)

In [None]:
dxdata.__version__

In [None]:
# silence warning
import warnings
warnings.filterwarnings('ignore')

# Re-enable warnings after your code if you want to see warnings again in subsequent cells
# warnings.filterwarnings('default')

In [None]:
# Set the option to None to display all rows, or set a specific number
# pd.set_option('display.max_rows', None)

# Reset display options to default
# pd.reset_option('display.max_rows')

In [None]:
# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname='database', 
    name='app*', 
    folder='/', 
    name_mode='glob', 
    describe=True)
dispensed_database_name = dispensed_database['describe']['name']

dispensed_dataset = dxpy.find_one_data_object(
    typename='Dataset', 
    name='app*.dataset', 
    folder='/', 
    name_mode='glob')
dispensed_dataset_id = dispensed_dataset['id']

## Access dataset

In [None]:
dataset = dxdata.load_dataset(id=dispensed_dataset_id)

### Dataset 'entities' are virtual tables linked to one another.

The main entity is 'participant' and corresponds to most pheno fields. Additional entities correspond to linked health care data.
Entities starting with 'hesin' are for hospital records; entities starting with 'gp' are for GP records, etc.

In [None]:
dataset.entities

### Step 1: Accessing the main 'participant' entity
The extraction code follows some examples

In [None]:
participant = dataset['participant']

#### Constructing field names, given UKB showcase field id, instance id, and array id

For the main participant data, the Platform uses field names with the following convention:

|Type of field|Syntax for field name|Example|
|:------------|---------------------|-------|
|Neither instanced nor arrayed|`p<FIELD-ID>`|`p31`|
|Instanced but not arrayed|`p<FIELD-ID>_i<INSTANCE-ID>`|`p40005_i0`|
|Arrayed but not instanced|`p<FIELD-ID>_a<ARRAY-ID>`|`p41262_a0`|
|Instanced and arrayed|`p<FIELD-ID>_i<INSTANCE-ID>_a<ARRAY-ID>`|`p93_i0_a0`|

Lastly, the participant id field itself (EID) is named `eid`

If you know exactly the field names you want to work with, put them in a string array (we will see later how to use that):

In [None]:
# example
# field_names = ['eid', 'p31', 'p21022', 'p40005_i0', 'p93_i0_a0']

#### Looking up fields, given UKB showcase field id

If you know the field id but you are not sure if it is instanced or arrayed, and want to grab all instances/arrays (if any), use these:

In [None]:
# Returns all field objects for a given UKB showcase field id

def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

# Returns all field names for a given UKB showcase field id

def field_names_for_id(field_id):
    return [f.name for f in fields_for_id(field_id)]

##### Examples:

In [None]:
# Participant sex
field_names_for_id('31')

In [None]:
# Age when attending assessment centre has multiple instances (visits) 
field_names_for_id('21003')

In [None]:
field_names_for_id('21001') # BMI

In [None]:
field_names_for_id('22009') # PC

#### Looking up fields by title keyword

If you remember part of the field title, use these:

In [None]:
# Returns all field objects for a given title keyword

def fields_by_title_keyword(keyword):
    from distutils.version import LooseVersion
    fields = list(participant.find_fields(lambda f: keyword.lower() in f.title.lower()))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

# Returns all field names for a given title keyword

def field_names_by_title_keyword(keyword):
    return [f.name for f in fields_by_title_keyword(keyword)]

# Returns all field titles for a given title keyword

def field_titles_by_title_keyword(keyword):
    return [f.title for f in fields_by_title_keyword(keyword)]

# Furhter information: https://github.com/dnanexus/OpenBio/blob/master/UKB_notebooks/ukb-rap-pheno-basic.ipynb

### Grabbing fields into a Spark DataFrame

The `participant.retrieve_fields` function can be used to construct a Spark DataFrame of the given fields.

By default, this retrieves data as encoded by UK Biobank. For example, field p31 (participant sex) will be returned as an integer column with values of 0 and 1. To receive decoded values, supply the `coding_values='replace'` argument.

For more information, see [Tips for Retrieving Fields](https://dnanexus.gitbook.io/uk-biobank-rap/getting-started/working-with-ukb-data#tips-for-retrieving-fields) in the documentation.

## Extract clinical data 

In [None]:
field_names = ['eid', 
               'p31',  # sex
               'p21022',  # age at recruitment
               'p21001_i0',  # BMI
               'p54_i0',  # UK Biobank assessment centre
               'p53_i0',  # Date of attending assessment centre p53_i0
               'p3166_i0_a0', # time of blood collection
               'p22019', # Sex_chromosome_aneuploidy
               'p22000', # genotype batch
               'p22006',# genetic grouping, 
               'p30901_i0', # olink plateID
              ] \
                + field_names_for_id('22009') 

# 41270 = ICD10

• Further informatiaon on Date of first in-patient diagnosis can be found at https://biobank.ndph.ox.ac.uk/crystal/field.cgi?id=41280:
The corresponding ICD-10 diagnosis codes can be found in data-field Field 41270 and the two fields can be linked using the array structure.

• CAD definition and risk factors:
J Am Coll Cardiol. 2018 Oct 16;72(16):1883-1893. 
https://pubmed.ncbi.nlm.nih.gov/30309464/


## Grabbing fields into a Spark DataFrame
The participant.retrieve_fields function can be used to construct a Spark DataFrame of the given fields.

By default, this retrieves data as encoded by UK Biobank. For example, field p31 (participant sex) will be returned as an integer column with values of 0 and 1. To receive decoded values, supply the coding_values='replace' argument.

For more information, see Tips for Retrieving Fields in the documentation.

In [None]:
# Grabbing fields into a Spark DataFrame
df = participant.retrieve_fields(names=field_names, engine=dxdata.connect())

In [None]:
# See the first five entries as a Spark DataFrame:
# df.show(5, truncate=False)

In [None]:
# See the first five entries as a Pandas DataFrame:
df.limit(5).toPandas()

In [None]:
# if the above looks good, go ahead and convert the entire spark data frame to pandas data frame 
pdf = df.toPandas()

In [None]:
print(pdf.columns)

In [None]:
# Set the option to None to display all rows, or set a specific number
# pd.set_option('display.max_rows', None)

# Your code to display the Series or DataFrame
# For example, if you have a Series named non_na_counts:
# pdf.count()

In [None]:
## Given the information above, re-select the column if needed
#field_names = ['p31', 'p21022', 'p54_i0', 'p41202', 'p53_i0', 'p40000_i0']  \
#    + field_names_for_id('41280') 
#df = participant.retrieve_fields(names=field_names, engine=dxdata.connect())

In [None]:
# Reset display options to default
# pd.reset_option('display.max_rows')

In [None]:
# Saving as TSV file
pdf.to_csv('clinical_data.tsv', sep='\t', index=False)

### Step 2: Extract Olink proteomics from dataset['olink_instance_0']

In [None]:
olink = dataset['olink_instance_0']
# olink.fields # to list all

In [None]:
temp_list = olink.fields
type(temp_list)

In [None]:
#def field_names_for_id(field_id):
#    return [f.name for f in fields_for_id(field_id)]
olink_all_field_names = [f.name for f in olink.fields]

In [None]:
#field_names = ['eid', 'col6a3'] # select like this if you're interested in only a few protein
olink_all_field_names[:5] # check

In [None]:
dfo = olink.retrieve_fields(names=olink_all_field_names, engine=dxdata.connect())

In [None]:
# to check
# dfo.head(5)

### In case of extracting particular proteins only:
olink_field_names = ['eid', 'col6a3']
dfo = olink.retrieve_fields(names=olink_field_names, engine=dxdata.connect())

In [None]:
# See the first five entries as a Pandas DataFrame:
# dfo.limit(5).toPandas()

In [None]:
dfo.count() # check rows (53016 individual's data)

In [None]:
pdfo = dfo.toPandas()

In [None]:
pdfo.shape # (53016, 2924)

In [None]:
# Alternative approach
# pdfo = dfo.toPandas() is very memory intensive. So instead, we can do this sequentially (if needed).
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

# Total number of rows in the DataFrame
total_rows = dfo.count()

# Number of chunks
num_chunks = 10

# Calculate the number of rows per chunk. Adding 1 to ensure the last chunk includes all remaining rows
rows_per_chunk = (total_rows // num_chunks) + (total_rows % num_chunks > 0)

# Initialize an empty list to store each chunk's pandas DataFrame
chunks_list = []

# Create a column 'row_id' to help in filtering rows for each chunk
dfo_with_id = dfo.withColumn("row_id", row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

for i in range(num_chunks):
    # Calculate start index for the current chunk
    start_index = i * rows_per_chunk
    
    # End index is not needed as we limit the number of rows fetched
    chunk_df = dfo_with_id.filter(dfo_with_id.row_id >= start_index).limit(rows_per_chunk)
    
    # Convert the chunk to a pandas DataFrame and append to the list
    chunk_pd_df = chunk_df.drop("row_id").toPandas()
    chunks_list.append(chunk_pd_df)

# Concatenate all chunks to form the final pandas DataFrame
pdf = pd.concat(chunks_list, ignore_index=True)

# Checking the shape of the final DataFrame
print(pdf.shape)

In [None]:
pdfo.iloc[:5, :5] # check

In [None]:
pdfo.to_csv('olink_data.tsv', sep='\t', index=False)

In [7]:
%%bash
#dx upload clinical_data.tsv --dest UKB:/data/04.COL6A3/
#dx upload olink_data.tsv --dest UKB:/data/04.COL6A3/
dx upload ukbrap_extract_clinical_olink_data.ipynb --dest UKB:/data/04.COL6A3/

ID                          file-GjFF0zjJBPpv7b56626G5vG4
Class                       file
Project                     project-Gf7Jk4QJBPpxxxG2x187pyJg
Folder                      /data/04.COL6A3
Name                        ukbrap_extract_clinical_olink_data.ipynb
State                       closing
Visibility                  visible
Types                       -
Properties                  -
Tags                        -
Outgoing links              -
Created                     Wed Apr 10 15:30:07 2024
Created by                  satoshi.yoshiji
 via the job                job-GjF9jx0JBPpbPqBx1Fy7zKqY
Last modified               Wed Apr 10 15:30:08 2024
Media type                  
archivalState               "live"
cloudAccount                "cloudaccount-dnanexus"


## Below are miscellaneous codes

### if you want to combine data frame, use pandas's merge 
combined = pd.merge(pdf, df_final, on = 'eid', how= 'left')

### Filtering spark dataframes

Spark dataframes can be filtered using the syntax: `df.filter(expression)`

The expression can be either :

* a string expression, built using SQL field names (e.g. `p31`) and SQL operators (e.g. `=`, `NOT`, `OR`, `AND`)
  * example: `'(p21022 >= 50) AND (p31 = 0)'`
  

* a Python expression, built using Python object fields (e.g. `df.p31`) and Python operators (e.g. `==`, `!`, `|`, `&`)
  * example: `(df.p21022 >= 50) & (df.p31 == 0)`

#### Example: Participants above 50 years old and female

df.count()

### Using SQL syntax
df.filter('(p21022 >= 50) AND (p31 = 0)').count()

### Using Python syntax
df.filter((df.p21022 >= 50) & (df.p31 == 0)).count()

### Getting more information about fields

Field objects (such as those returned by `fields_for_id` and `fields_by_title_keyword` above, or via the `participant[<field_name>]` syntax) provide additional information such as codings and units.

####  Working with codings

participant['p31'].coding

participant['p31'].coding.codes

def field_codes_by_keyword(field_name, keyword):
    return dict([(k,v) for (k,v) in participant[field_name].coding.codes.items() if keyword.lower() in v.lower()])

field_codes_by_keyword('p31', 'female')

field_codes_by_keyword('p41202', 'obesity')

### Additional field information

### Get link to UKB documentation page
participant['p31'].linkout

### Get field units
participant['p21022'].units

### Saving results

### Saving as CSV file
df.toPandas().to_csv('results.csv', index=False)

### Saving as TSV file
df.toPandas().to_csv('results.tsv', sep='\t', index=False)

### Saving as DTA file (Stata)
df.toPandas().astype(str).replace('None|NaN|nan', '.', regex=True).to_stata('results.dta')

#### Writing results back to the project

%%bash
dx upload results.tsv --dest / 