# Extracting proteomics data using Spark
Spark is necessary because the dataset is large. Pipeline was adapted from
https://github.com/dnanexus/UKB_RAP/blob/main/proteomics/0_extract_phenotype_protein_data.ipynb

## 1. Importing dependencies and preparing project/dataset IDs

In [None]:
# Import packages
# dxpy allows python to interact with the platform storage
# Note: This notebook is using spark since the size of the dataset we're extracting
# (i.e. the number of fields) is too large for a single node instance.
import dxpy
import pandas as pd
import subprocess
import glob
import os
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [None]:
output_dir = "/mnt/project/proteomics_results/" # only read permissions.

In [None]:
# Automatically discover dispensed dataset ID
dispensed_dataset = dxpy.find_one_data_object(
    typename="Dataset", name="app*.dataset", folder="/", name_mode="glob"
)
dispensed_dataset_id = dispensed_dataset["id"]

In [None]:
project_id = dxpy.find_one_project()["id"]

In [None]:
dataset = (":").join([project_id, dispensed_dataset_id])

In [None]:
# Note: This cell can only be run once. Otherwise, you'll need to delete the existing data tables in order to re-run
cmd = ["dx", "extract_dataset", dataset, "-ddd", "--delimiter", ","]
subprocess.check_call(cmd)

## 2. Getting field names

In [None]:
path = os.getcwd()

In [None]:
data_dict_csv = glob.glob(os.path.join(path, "*.data_dictionary.csv"))[0]
data_dict_df = pd.read_csv(data_dict_csv)
data_dict_df.head()

In [None]:
# Search for protein fields
field_names = list(
    data_dict_df.loc[data_dict_df["entity"] == "olink_instance_0", "name"].values
)
print(len(field_names))

In [None]:
# Search for age and sex field names
age_fields = list(
    data_dict_df.loc[data_dict_df["name"] == "p21003_i0", "name"].values # code for "Age when attended assessment centre", at baseline
)

age_fields

# print(data_dict_df[data_dict_df['name'].str.contains("21003")].head(5))

In [None]:
sex_fields = list(
    data_dict_df.loc[data_dict_df["name"] == "p31", "name"].values # code for "Sex"
)

sex_fields

In [None]:
field_names_str = [f"olink_instance_0.{f}" for f in field_names]
field_names_query = ",".join(field_names_str)

In [None]:
# Export field name list to file for Table Exporter
# Alternatively, instead of using dx extract_dataset you can use the Table exporter app
# This list of field names can be used as input into the Table exporter app and then
# you can ignore running the remaining cells in this notebook

file = open('field_names.txt','w')
for item in field_names:
   file.write(item+"\n")
file.close()

In [None]:
# Need to adjust this buffer otherwise will get an error in toPandas() call
conf = pyspark.SparkConf().set("spark.kryoserializer.buffer.max", "128m")

In [None]:
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
sqlContext = SQLContext(sc)

In [None]:
cmd = [
    "dx",
    "extract_dataset",
    dataset,
    "--fields",
    field_names_query,
    "--delimiter",
    ",",
    "--output",
    "extracted_data.sql",
    "--sql",
]
subprocess.check_call(cmd)

In [None]:
with open("extracted_data.sql", "r") as file:
    retrieve_sql = ""
    for line in file:
        retrieve_sql += line.strip()

In [None]:
temp_df = spark.sql(retrieve_sql.strip(";"))

In [None]:
pdf = temp_df.toPandas()

In [None]:
print(pdf.shape)
pdf.head()

In [None]:
# Save as text file
pdf.to_csv("complete_proteomics_df.txt", sep="\t", index=False)

In [None]:
!dx upload complete_proteomics_df.txt --destination proteomics_results/

In [None]:
!dx ls -l proteomics_results/

## 3. Extracting phenotype information

In [None]:
# Get all field names
import subprocess

cmd = [
    "dx",
    "extract_dataset",
    dataset,
    "--list-fields",
]

with open("all_fields.txt", "w") as f:
    subprocess.check_call(cmd, stdout=f)

In [None]:
## Now, extract phenotype information
field_names = ["participant.eid", "participant.p21003_i0", "participant.p31"]
field_names_query = ",".join(field_names)

file = open('field_names.txt','w')
for item in field_names:
   file.write(item+"\n")
file.close()

!rm extracted_data2.sql

# Need to adjust this buffer otherwise will get an error in toPandas() call
# conf = pyspark.SparkConf().set("spark.kryoserializer.buffer.max", "128m")

# sc = pyspark.SparkContext(conf=conf)
# spark = pyspark.sql.SparkSession(sc)
# sqlContext = SQLContext(sc)

cmd = [
    "dx",
    "extract_dataset",
    dataset,
    "--fields",
    field_names_query,
    "--delimiter",
    ",",
    "--output",
    "extracted_data2.sql",
    "--sql",
]
# cmd = [
#     "dx", "extract_dataset", dataset,
#     "--fields", "participant.eid",
#     "--fields", "participant.p21003_i0",
#     "--fields", "participant.p31",
#     "--delimiter", ",",
#     "--output", "extracted_data2.sql",
#     "--sql",
# ]

subprocess.check_call(cmd)

with open("extracted_data2.sql", "r") as file:
    retrieve_sql = ""
    for line in file:
        retrieve_sql += line.strip()
        
temp_df = spark.sql(retrieve_sql.strip(";"))

pdf = temp_df.toPandas()

In [None]:
print(pdf.shape)
pdf.head()

In [None]:
# Upload all field names and phenotype information
# Save as text file
pdf.to_csv("age_sex_proteomics_df.txt", sep="\t", index=False)

!dx upload age_sex_proteomics_df.txt --destination proteomics_results/

!dx upload all_fields.txt --destination proteomics_results/

In [None]:
!dx ls -l proteomics_results/