### DNAnexus Data Ingestion Code

In [None]:
import os
import dxpy
import dxdata
from distutils.version import LooseVersion

#----------------------------------------------------------------------------------
# retrive location of biobank tabular data
#----------------------------------------------------------------------------------
# compute dataset ID and load dataset containing the main UKB participant data
dataset_id = dxpy.find_one_data_object(typename = "Dataset",
                                       name = "app*.dataset",
                                       folder = "/",
                                       name_mode = "glob")["id"]
dataset = dxdata.load_dataset(id = dataset_id)["participant"]

# define function which helps us retrieve all column names
def fields_by_title_keyword(keyword):
    fields = list(dataset.find_fields(lambda f: keyword.lower() in f.title.lower()))
    return sorted(fields, key = lambda f: LooseVersion(f.name))

# get all columns by using empty string input
all_colnames = [f.name for f in fields_by_title_keyword("")]

#----------------------------------------------------------------------------------
# performed piece-wise loading as the platform generates error for >1000 columns
#----------------------------------------------------------------------------------
# define number of columns to process at any one time
n = 1000

# extract first 1000 columns
df =  dataset.retrieve_fields(names = all_colnames[0:n],
                              engine = dxdata.connect()).toPandas()

# iterate until all columns reached
for i in range(n, len(all_colnames), n):

    # retrive columns from database, convert to pandas
    df_next = dataset.retrieve_fields(names = all_colnames[i:(i+n)],
                                      engine = dxdata.connect()).toPandas()

    # concat with the previous n columns, column concatenation
    df = pd.concat([df, df_next], axis = 1)

    # sanity checks
    print("----- Step " + str(i) + ":" + str(i + 1000) + " Dataframe is: " + 
          str(df_next.shape[0]) + " x " + str(df_next.shape[1]))
    print("Current Dataframe is: " + str(df.shape[0]) + " x " + str(df.shape[1]))

#----------------------------------------------------------------------------------
# write file to output
#----------------------------------------------------------------------------------
# create output file name in-line with current version of dataset (app_datetime)
database_name = dxpy.find_one_data_object(classname = "database", 
                                          name = "app*", 
                                          folder = "/",
                                          name_mode = "glob",
                                          describe = True)["describe"]["name"]
out_name = "ukb_" + database_name + ".csv"

# write to output into local instance
df.to_csv(out_name + '.csv', index = False)
print("Locally Written File Name: " + out_name)

# move from local env into projects folder (visible in UKB RAP interface)
os.system("dx upload " + out_name + " --dest /")
print("Transfered from Local Instance into Projects")

In [None]:
#----------------------------------------------------------------------------------
# returns only columns we defined using R
#----------------------------------------------------------------------------------
# Returns all field names for a given UKB showcase field id
def fields_for_id(field_id):
    field_id = str(field_id)
    fields = dataset.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key = lambda f: LooseVersion(f.name))
def field_names_for_id(field_id):
    return [f.name for f in fields_for_id(field_id)]

# define all variables we are interested in
field_ids = [12292,12336,12338,12340,12673,12679,12682,12685,12686,12702,
             20150,20151,20153,20154,20156,20258,21003,
             22330,22332,22333,22334,22335,22336,22337,22338,22400,22401,22402,22403,22404,22405,22406,22407,22408,22409,22410,22420,22421,22423,22424,22425,22426,22427,22670,22671,22672,22673,22674,22675,22676,22677,22678,22679,22680,22681,22682,
             23098,23099,23100,23101,23102,23104,23106,23107,23108,23109,23110,23111,23112,23113,23114,23115,23116,23117,23118,23119,23120,23121,23122,23123,23124,23125,23126,23127,23128,23129,23130,23200,23201,23202,23203,23204,23205,23206,23208,23209,23210,23212,23213,23214,23215,23216,23217,23218,23219,23220,23221,23222,23223,23224,23225,23226,23227,23228,23229,23230,23231,23232,23233,23234,23235,23236,23237,23238,23239,23240,23241,23242,23243,23244,23245,23246,23247,23248,23249,23250,23251,23252,23253,23254,23255,23256,23257,23258,23259,23260,23261,23262,23263,23264,23265,23266,23267,23268,23269,23270,23271,23272,23273,23274,23275,23276,23277,23278,23279,23280,23281,23282,23283,23284,23285,23286,23287,23288,23289,23290,23291,23292,23293,23295,23296,23297,23298,23299,23300,23301,23302,23304,23305,23306,23307,23308,23309,23310,23311,23312,23313,23314,23315,23316,23317,23318,23320,
             25000,25001,25002,25003,25004,25005,25006,25007,25008,25009,25010,25011,25012,25013,25014,25015,25016,25017,25018,25019,25020,25021,25022,25023,25024,25025,25026,25027,25028,25029,25030,25031,25032,25033,25034,25035,25036,25037,25038,25039,25056,25057,25058,25059,25060,25061,25062,25063,25064,25065,25066,25067,25068,25069,25070,25071,25072,25073,25074,25075,25076,25077,25078,25079,25080,25081,25082,25083,25084,25085,25086,25087,25088,25089,25090,25091,25092,25093,25094,25095,25096,25097,25098,25099,25100,25101,25102,25103,25104,25105,25106,25107,25108,25109,25110,25111,25112,25113,25114,25115,25116,25117,25118,25119,25120,25121,25122,25123,25124,25125,25126,25127,25128,25129,25130,25131,25132,25133,25134,25135,25136,25137,25138,25139,25140,25141,25142,25143,25144,25145,25146,25147,25148,25149,25150,25151,25152,25153,25154,25155,25156,25157,25158,25159,25160,25161,25162,25163,25164,25165,25166,25167,25168,25169,25170,25171,25172,25173,25174,25175,25176,25177,25178,25179,25180,25181,25182,25183,25184,25185,25186,25187,25188,25189,25190,25191,25192,25193,25194,25195,25196,25197,25198,25199,25200,25201,25202,25203,25204,25205,25206,25207,25208,25209,25210,25211,25212,25213,25214,25215,25216,25217,25218,25219,25220,25221,25222,25223,25224,25225,25226,25227,25228,25229,25230,25231,25232,25233,25234,25235,25236,25237,25238,25239,25240,25241,25242,25243,25244,25245,25246,25247,25248,25249,25250,25251,25252,25253,25254,25255,25256,25257,25258,25259,25260,25261,25262,25263,25264,25265,25266,25267,25268,25269,25270,25271,25272,25273,25274,25275,25276,25277,25278,25279,25280,25281,25282,25283,25284,25285,25286,25287,25288,25289,25290,25291,25292,25293,25294,25295,25296,25297,25298,25299,25300,25301,25302,25303,25304,25305,25306,25307,25308,25309,25310,25311,25312,25313,25314,25315,25316,25317,25318,25319,25320,25321,25322,25323,25324,25325,25326,25327,25328,25329,25330,25331,25332,25333,25334,25335,25336,25337,25338,25339,25340,25341,25342,25343,25344,25345,25346,25347,25348,25349,25350,25351,25352,25353,25354,25355,25356,25357,25358,25359,25360,25361,25362,25363,25364,25365,25366,25367,25368,25369,25370,25371,25372,25373,25374,25375,25376,25377,25378,25379,25380,25381,25382,25383,25384,25385,25386,25387,25388,25389,25390,25391,25392,25393,25394,25395,25396,25397,25398,25399,25400,25401,25402,25403,25404,25405,25406,25407,25408,25409,25410,25411,25412,25413,25414,25415,25416,25417,25418,25419,25420,25421,25422,25423,25424,25425,25426,25427,25428,25429,25430,25431,25432,25433,25434,25435,25436,25437,25438,25439,25440,25441,25442,25443,25444,25445,25446,25447,25448,25449,25450,25451,25452,25453,25454,25455,25456,25457,25458,25459,25460,25461,25462,25463,25464,25465,25466,25467,25468,25469,25470,25471,25472,25473,25474,25475,25476,25477,25478,25479,25480,25481,25482,25483,25484,25485,25486,25487,25488,25489,25490,25491,25492,25493,25494,25495,25496,25497,25498,25499,25500,25501,25502,25503,25504,25505,25506,25507,25508,25509,25510,25511,25512,25513,25514,25515,25516,25517,25518,25519,25520,25521,25522,25523,25524,25525,25526,25527,25528,25529,25530,25531,25532,25533,25534,25535,25536,25537,25538,25539,25540,25541,25542,25543,25544,25545,25546,25547,25548,25549,25550,25551,25552,25553,25554,25555,25556,25557,25558,25559,25560,25561,25562,25563,25564,25565,25566,25567,25568,25569,25570,25571,25572,25573,25574,25575,25576,25577,25578,25579,25580,25581,25582,25583,25584,25585,25586,25587,25588,25589,25590,25591,25592,25593,25594,25595,25596,25597,25598,25599,25600,25601,25602,25603,25604,25605,25606,25607,25608,25609,25610,25611,25612,25613,25614,25615,25616,25617,25618,25619,25620,25621,25622,25623,25624,25625,25626,25627,25628,25629,25630,25631,25632,25633,25634,25635,25636,25637,25638,25639,25640,25641,25642,25643,25644,25645,25646,25647,25648,25649,25650,25651,25652,25653,25654,25655,25656,25657,25658,25659,25660,25661,25662,25663,25664,25665,25666,25667,25668,25669,25670,25671,25672,25673,25674,25675,25676,25677,25678,25679,25680,25681,25682,25683,25684,25685,25686,25687,25688,25689,25690,25691,25692,25693,25694,25695,25696,25697,25698,25699,25700,25701,25702,25703,25704,25705,25706,25707,25708,25709,25710,25711,25712,25713,25714,25715,25716,25717,25718,25719,25720,25721,25722,25723,25724,25725,25726,25727,25728,25729,25730,25731,25732,25733,25734,25735,25736,25738,25781,25782,25783,25784,25785,25786,25787,25788,25789,25790,25791,25792,25793,25794,25795,25796,25797,25798,25799,25800,25801,25802,25803,25804,25805,25806,25807,25808,25809,25810,25811,25812,25813,25814,25815,25816,25817,25818,25819,25820,25821,25822,25823,25824,25825,25826,25827,25828,25829,25830,25831,25832,25833,25834,25835,25836,25837,25838,25839,25840,25841,25842,25843,25844,25845,25846,25847,25848,25849,25850,25851,25852,25853,25854,25855,25856,25857,25858,25859,25860,25861,25862,25863,25864,25865,25866,25867,25868,25869,25870,25871,25872,25873,25874,25875,25876,25877,25878,25879,25880,25881,25882,25883,25884,25885,25886,25887,25888,25889,25890,25891,25892,25893,25894,25895,25896,25897,25898,25899,25900,25901,25902,25903,25904,25905,25906,25907,25908,25909,25910,25911,25912,25913,25914,25915,25916,25917,25918,25919,25920,
             30000,30010,30020,30030,30040,30050,30060,30070,30080,30090,30100,30110,30120,30130,30140,30150,30160,30170,30180,30190,30200,30210,30220,30230,30240,30250,30260,30270,30280,30290,30300,30500,30510,30520,30530,30600,30610,30620,30630,30640,30650,30660,30670,30680,30690,30700,30710,30720,30730,30740,30750,30760,30770,30780,30790,30800,30810,30820,30830,30840,30850,30860,30870,30880,30890,31,
             4079,4080,43,54,
             6150,6153,6177,6218,6219,6220,6221,6222]

# use column query to return and collate all columns with matching substring IDs
all_colnames = sum([field_names_for_id(field_id) for field_id in field_ids], [])

# insert patient ID at front
all_colnames.insert(0, "eid")

### Code to Preprocess UKB Raw Dataset (.csv Downloaded from AMS)

In [1]:
# import library
import pyspark
from pyspark.sql import SparkSession

# define data path
path_in = "../../ukb51139_v2.csv"
path_out = "../../ukb_subset_v2.csv"

# set up PySpark session
spark = SparkSession.builder.master('local[*]') \
                            .config("spark.driver.memory", "50g") \
                            .appName("UKB").getOrCreate()

22/12/16 15:46:42 WARN Utils: Your hostname, ccrfai resolves to a loopback address: 127.0.1.1; using 163.1.212.155 instead (on interface eno8303)
22/12/16 15:46:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/16 15:46:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Example: Load, Manipulate, Save Dataset

In [2]:
# Load ukb as spark data frame
df = spark.read.csv(path_in, header=True)

# Filter missing blood pressure rows
df = df.filter("(`X4080.2.0` is NOT NULL OR `X93.2.0` is NOT NULL) AND \
                (`X4079.2.0` is NOT NULL OR `X94.2.0` is NOT NULL)")

# Show blood pressure columns
df.select(df["`X4080.2.0`"], df["`X4079.2.0`"], 
          df["`X93.2.0`"], df["`X94.2.0`"]).limit(10).show()

# Save data to file for analysis somewhere else (takes about 15 mins)
#   not using coalesce takes about 3 mins as mutliple workers are used
#   but outputs multiple files
df.coalesce(1).write.csv(path = path_out, header = "true",
                         mode = "overwrite", sep = ",")

# register data frame as a sql table to run queries
df.registerTempTable('ukb')

# run SQL queries
spark.sql('SELECT COUNT(*) FROM ukb').show()
#spark.sql('SELECT COUNT(*) FROM ukb \
#           WHERE `X4080.2.0` > 160 AND `X4079.2.0` > 100').show()

+---------+---------+-------+-------+
|X4080.2.0|X4079.2.0|X93.2.0|X94.2.0|
+---------+---------+-------+-------+
|      116|       58|   null|   null|
|      157|      108|   null|   null|
|      151|       86|   null|   null|
|     null|     null|    139|     82|
|      144|       83|   null|   null|
|      152|       76|   null|   null|
|      145|       75|   null|   null|
|      140|       74|   null|   null|
|      161|       92|   null|   null|
|      112|       62|   null|   null|
+---------+---------+-------+-------+

22/12/16 15:46:56 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/12/16 15:46:56 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


