In [1]:
import dxpy 
import dxdata 
import pandas as pd
import pyspark
import re

In [2]:
# Initialize Spark
# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [3]:
# Automatically discover dispensed dataset ID and load the dataset
dispensed_dataset = dxpy.find_one_data_object(
    typename="Dataset", 
    name="app*.dataset", 
    folder="/", 
    name_mode="glob")
dispensed_dataset_id = dispensed_dataset["id"]
dataset = dxdata.load_dataset(id=dispensed_dataset_id)

In [4]:
participant = dataset['participant']

In [5]:
# load cohorts from cohort browser
case = dxdata.load_cohort("/HTN_ICD10/ICD10_HTN_Cases")  
cont = dxdata.load_cohort("/HTN_ICD10/ICD10_HTN_Control")  

In [6]:
# Specify fields ID to retrieve, get corresponding UKB RAP field names and print description table.
field_ids = ['31', '22001', '22006', '22019', '22021', '21022', '41270']

In [7]:
# This function is used to grab all field names (e.g. "p<field_id>_iYYY_aZZZ") of a list of field IDs
def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

In [10]:
# Create a list of fields
fields = [
    fields_for_id(f)[0] for f in field_ids  # Get the first field for each field ID
] + [
    participant.find_field(name='p20160_i0')  # Add a specific field 'p20160_i0'
] + [
    participant.find_field(name='eid')  # Add another specific field 'eid'
]

# Create a DataFrame to describe the fields
field_description = pd.DataFrame({
    'Field': [f.name for f in fields],  # Column for field names
    'Title': [f.title for f in fields],  # Column for field titles
    'Coding': [
        f.coding.codes if f.coding is not None else ''  # Column for field coding
        for f in fields
    ]
})

# Display the field description DataFrame
field_description

  return sorted(fields, key=lambda f: LooseVersion(f.name))


Unnamed: 0,Field,Title,Coding
0,p31,Sex,"{'0': 'Female', '1': 'Male'}"
1,p22001,Genetic sex,"{'0': 'Female', '1': 'Male'}"
2,p22006,Genetic ethnic grouping,{'1': 'Caucasian'}
3,p22019,Sex chromosome aneuploidy,{'1': 'Yes'}
4,p22021,Genetic kinship to other participants,{'-1': 'Participant excluded from kinship infe...
5,p21022,Age at recruitment,
6,p41270,Diagnoses - ICD10,{'Chapter I': 'Chapter I Certain infectious an...
7,p20160_i0,Ever smoked | Instance 0,"{'1': 'Yes', '0': 'No'}"
8,eid,Participant ID,


In [11]:
# Retrieve data for cases
case_df = participant.retrieve_fields(
    fields = fields,  # Use the previously defined 'fields' list
    filter_sql = case.sql,  # Apply a SQL filter for cases
    engine=dxdata.connect()  # Use the default connection
).toPandas()  # Convert the result to a pandas DataFrame

# Retrieve data for controls
cont_df = participant.retrieve_fields(
    fields = fields,  # Use the same 'fields' list as above
    filter_sql = cont.sql,  # Apply a SQL filter for controls
    engine=dxdata.connect(
        dialect="hive+pyspark",  # Use Hive with PySpark
        connect_args={
            'config': {
                'spark.kryoserializer.buffer.max': '256m',  # Set max buffer size for Kryo serializer
                'spark.sql.autoBroadcastJoinThreshold': '-1'  # Disable automatic broadcast joins
            }
        }
    )
).toPandas()  # Convert the result to a pandas DataFrame


In [12]:
# combine df
df = pd.concat([case_df, cont_df])

In [14]:
# check size -- should be 502137, 9
df.shape

(502137, 9)

In [18]:
# Create a new column 'HTN_cc' and initialize all values to 0
df['HTN_cc'] = 0

# Set 'HTN_cc' to 1 for all rows where 'eid' is in the case_df
df.loc[df.eid.isin(case_df.eid), 'HTN_cc'] = 1

# Count the occurrences of each value in the 'HTN_cc' column -- should be 162k
df.HTN_cc.value_counts()

0    339876
1    162261
Name: HTN_cc, dtype: int64

In [19]:
# Sample QC
# Gender and genetic sex are the same, white british ancestry, no sex chromosome aneuploidy, no kinship found
df_qced = df[
    (df['p31'] == df['p22001']) & # Filter in sex and genetic sex are the same           
    (df['p22006'] == 1) &         # in_white_british_ancestry_subset           
    (df['p22019'].isnull()) &     # Not Sex chromosome aneuploidy           
    (df['p22021'] == 0)           # No kinship found
]

In [20]:
# see count for QC'd df
df_qced.HTN_cc.value_counts()

0    188930
1     87132
Name: HTN_cc, dtype: int64

In [21]:
# Rename columns for better readibility and format table for regenie
df_qced = df_qced.rename(columns=
                         {'eid':'IID', 'p31': 'sex', 'p21022': 'age',
                          'p20160_i0': 'ever_smoked',
                          'p22006': 'ethnic_group',                           
                          'p22019': 'sex_chromosome_aneuploidy',                          
                          'p22021': 'kinship_to_other_participants'})
# Add FID column -- required input format for regenie 
df_qced['FID'] = df_qced['IID']

# Create a phenotype table from our QCed data
df_phenotype = df_qced[['FID', 'IID', 'HTN_cc', 'sex', 'age', 'ethnic_group', 'ever_smoked']]

In [22]:
# Define the path to the PLINK family file
path_to_family_file = f'/mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, PLINK format - final release/ukb23158_cY_b0_v1.fam'

# Read the PLINK family file into a pandas DataFrame
plink_fam_df = pd.read_csv(path_to_family_file, 
                           delimiter='\s',  # Use whitespace as delimiter
                           dtype='object',  # Read all columns as object type
                           names=['FID','IID','Father ID','Mother ID', 'sex', 'Pheno'],  # Define column names
                           engine='python')  # Use the python engine for reading

# Join the phenotype DataFrame with the PLINK family DataFrame
HTN_wes_500k_df = df_phenotype.join(plink_fam_df.set_index('IID'), 
                                    on='IID',  # Join on the 'IID' column
                                    rsuffix='_fam',  # Add '_fam' suffix to overlapping columns from plink_fam_df
                                    how='inner')  # Use inner join to keep only matching rows

# Drop unnecessary columns from the joined DataFrame
HTN_wes_500k_df.drop(
    columns=['FID_fam','Father ID','Mother ID','sex_fam', 'Pheno'],  # Columns to drop
    axis=1,  # Drop columns (not rows)
    inplace=True,  # Modify the DataFrame in place
    errors='ignore'  # Ignore errors if columns don't exist
)


In [23]:
# Write phenotype files to a TSV file
HTN_wes_500k_df.to_csv('HTN_wes_500k.phe', sep='\t', na_rep='NA', index=False, quoting=3)
df_phenotype.to_csv('HTN500k_wes.phe', sep='\t', na_rep='NA', index=False, quoting=3)