In [1]:
import pyspark
import dxpy
import dxdata
import pandas as pd

# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

# Automatically discover dispensed database name and dataset id
dispensed_database = dxpy.find_one_data_object(
    classname="database", 
    name="app*", 
    folder="/", 
    name_mode="glob", 
    describe=True)
dispensed_database_name = dispensed_database["describe"]["name"]

dispensed_dataset = dxpy.find_one_data_object(
    typename="Dataset", 
    name="app*.dataset", 
    folder="/", 
    name_mode="glob")
dispensed_dataset_id = dispensed_dataset["id"]

dataset = dxdata.load_dataset(id=dispensed_dataset_id)
participant = dataset["participant"]

In [2]:
fields = ['eid', 'p31', 'p22000', 'p54_i0', 'p22009_a1', 'p22009_a2', 'p22009_a3', 'p22009_a4', 'p22009_a5', 'p22009_a6', 'p22009_a7', 'p22009_a8', 'p22009_a9', 'p22009_a10']
df = participant.retrieve_fields(names=fields, engine=dxdata.connect(), coding_values="replace")
# Show the first 5 lines of the spark dataframe
df.show(5, truncate=False)

+-------+------+-------------+-----------------+---------+---------+---------+---------+---------+---------+----------+---------+---------+----------+
|eid    |p31   |p22000       |p54_i0           |p22009_a1|p22009_a2|p22009_a3|p22009_a4|p22009_a5|p22009_a6|p22009_a7 |p22009_a8|p22009_a9|p22009_a10|
+-------+------+-------------+-----------------+---------+---------+---------+---------+---------+---------+----------+---------+---------+----------+
|5970576|Female|UKBiLEVEAX_b2|Leeds            |-12.6693 |4.30928  |-2.23438 |-0.109226|-7.364   |-1.01024 |1.31101   |-1.4512  |3.99414  |-1.68128  |
|4620207|Male  |null         |Stockport (pilot)|null     |null     |null     |null     |null     |null     |null      |null     |null     |null      |
|3544231|Male  |Batch_b004   |Glasgow          |-13.7558 |5.64773  |-3.58187 |7.74165  |21.0285  |0.711038 |-0.0382205|0.162054 |1.54023  |-0.545929 |
|3308524|Female|Batch_b055   |Sheffield        |-12.3794 |2.03865  |-0.837131|-0.562303|2.6028

In [3]:
# Read in 50k eids
eids_100k = pd.read_csv("eids_keep_100k.csv", header=None) 
eids_100k.columns = ['eid']
# eids_50k.shape

In [4]:
# In order to filter the spark dataframe using a pandas dataframe, we need to broadcast the pandas df and then join
from pyspark.sql.functions import broadcast

eids_100k_spark = spark.createDataFrame(eids_100k)
eids_100k_broadcasted = broadcast(eids_100k_spark)
covars_100k = df.join(eids_100k_broadcasted, on="eid", how="inner")
covars_100k.show(5, truncate=False)

  for column, series in pdf.iteritems():


+-------+------+----------+---------+---------+---------+---------+---------+---------+---------+----------+---------+---------+----------+
|eid    |p31   |p22000    |p54_i0   |p22009_a1|p22009_a2|p22009_a3|p22009_a4|p22009_a5|p22009_a6|p22009_a7 |p22009_a8|p22009_a9|p22009_a10|
+-------+------+----------+---------+---------+---------+---------+---------+---------+---------+----------+---------+---------+----------+
|3544231|Male  |Batch_b004|Glasgow  |-13.7558 |5.64773  |-3.58187 |7.74165  |21.0285  |0.711038 |-0.0382205|0.162054 |1.54023  |-0.545929 |
|1106345|Female|Batch_b007|Liverpool|-12.6667 |4.18819  |-1.96996 |2.85875  |0.6597   |0.795101 |2.07537   |-0.8787  |-2.67951 |2.85529   |
|1458523|Female|Batch_b093|Glasgow  |-13.3813 |3.63312  |-3.34136 |9.06684  |17.796   |-1.28133 |0.80981   |0.550325 |1.7284   |1.5269    |
|4018308|Female|Batch_b004|Edinburgh|-14.3743 |2.25444  |-2.81493 |7.63346  |13.5495  |2.33539  |-1.21718  |0.611471 |0.644174 |0.375372  |
|1438109|Female|Batc

In [5]:
print(covars_100k.count()) # check number of rows - 48262
print(len(covars_100k.columns)) # check number of columns, should be eid + sex + array + centre + 10 PCs = 14

91466
14


In [8]:
covars_100k_pd = covars_100k.toPandas()

In [9]:
#First, convert batch to either 'Affymetrix' or 'Illumina', and sex into 'F' or 'M'
covars_100k_pd['batch'] = covars_100k_pd['p22000'].apply(lambda x: 'Affymetrix' if x.startswith('UKBiLEVE') else ('Illumina' if x.startswith('Batch') else None))
covars_100k_pd['sex'] = covars_100k_pd['p31'].apply(lambda x: 'F' if x == 'Female' else 'M')

In [10]:
# Next, we need to separate continuous and discrete covars. 

discrete_pd = covars_100k_pd[['eid', 'eid', 'batch', 'sex', 'p54_i0']]
cont_pd = covars_100k_pd[['eid', 'eid', 'p22009_a1', 'p22009_a2', 'p22009_a3', 'p22009_a4','p22009_a5','p22009_a6','p22009_a7','p22009_a8','p22009_a9','p22009_a10']]

In [11]:
discrete_pd.to_csv('discrete_covars_100k.tsv', header=False, sep='\t', index=False)
cont_pd.to_csv('cont_covars_100k.tsv', header=False, sep='\t', index=False)

In [12]:
%%bash
dx upload discrete_covars_100k.tsv --dest /
dx upload cont_covars_100k.tsv --dest /

ID                                file-GqJK1Q8Jg5yP0Zz500g7gY79
Class                             file
Project                           project-GbfkzqQJg5y0KGvVy2ByF23b
Folder                            /
Name                              discrete_covars_100k.tsv
State                             closing
Visibility                        visible
Types                             -
Properties                        -
Tags                              -
Outgoing links                    -
Created                           Fri Sep  6 11:00:17 2024
Created by                        s_shen
 via the job                      job-GqJJj4QJg5y5XgJ89f5JKv5F
Last modified                     Fri Sep  6 11:00:18 2024
Media type                        
archivalState                     "live"
cloudAccount                      "cloudaccount-dnanexus"
ID                                file-GqJK1QQJg5yF78kQxGYByXBX
Class                             file
Project                           project-Gbfkzq