# Using Spark/pyspark to extract datasets from Apollo

If our query is takes longer than 2 minutes to execute, `dx extract_datasets` will not run. In that case, we will start up a Spark instance of JupyterLab and run the SparkSQL directly in the notebook.

1. Extract the relevant dictionary information
2. Connect to our Spark Cluster using `pyspark`
3. Extract the dataset SQL using the `--sql` option with `dx extract_dataset` and save output to a file
4. Load query from file, clean the SQL
5. Use `spark.sql` to load the dataset as a Spark DataFrame
6. Convert Spark DataFrame to Pandas
7. Use `.to_csv()` to save Pandas `DataFrame` as CSV file
8. Upload our CSV file back to project storage using `dx upload`.

## Import Necessary Packages

We'll need to import a number of Python packages, including `dxpy`. When in doubt, make sure to update to the latest version of `dxpy` using `pip install dxpy`.

In [2]:
import subprocess
import dxpy
import pandas as pd
import os
import glob
pd.set_option('display.max_columns', None)

## Use `dx extract_dataset` to extract Data Dictionary Files

Now we'l extract the dictionary files using the `-ddd` option for `dx extract_dataset`.

In [4]:
dataset = 'record-G5Ky4Gj08KQYQ4P810fJ8qPp'

In [None]:
cmd = ["dx", "extract_dataset", dataset, "-ddd", "--delimiter", ","]
subprocess.check_call(cmd)

If we load up the data dictionary and the codings files, we'll be able to decode the categorical data.

In [7]:
path = os.getcwd()

data_dict_csv = glob.glob(os.path.join(path, "*.data_dictionary.csv"))[0]
data_dict_df = pd.read_csv(data_dict_csv)

codings_csv = glob.glob(os.path.join(path, "*.codings.csv"))[0]
codings_df = pd.read_csv(codings_csv)

Unnamed: 0,entity,name,type,primary_key_type,coding_name,concept,description,folder_path,is_multi_select,is_sparse_coding,linkout,longitudinal_axis_type,referenced_entity_field,relationship,title,units
0,participant,p22608_a24,integer,,data_coding_493,,,Online follow-up > Work environment > Employme...,,,http://biobank.ctsu.ox.ac.uk/crystal/field.cgi...,,,,Workplace very hot | Array 24,
1,participant,p2784_i1,integer,,data_coding_100349,,,UK Biobank Assessment Centre > Touchscreen > S...,,,http://biobank.ctsu.ox.ac.uk/crystal/field.cgi...,,,,Ever taken oral contraceptive pill | Instance 1,
2,participant,p102780_i4,integer,,data_coding_100001,,,Online follow-up > Diet by 24-hour recall > Br...,,,http://biobank.ctsu.ox.ac.uk/crystal/field.cgi...,,,,Other grain intake | Instance 4,Serving
3,participant,p41217,integer,,data_coding_228,,,Health-related outcomes > Hospital inpatient >...,yes,,http://biobank.ctsu.ox.ac.uk/crystal/field.cgi...,,,,Mental categories,
4,participant,p22704_a7,integer,,,,,Additional exposures > Local environment > Hom...,,,http://biobank.ctsu.ox.ac.uk/crystal/field.cgi...,,,,Home location - north co-ordinate (rounded) | ...,metre-grid


## Specify Field Names and Generate Spark SQL

Here we specify the list of entity/field names as a list, and then use substitution to build our actual `dx extract_dataset` statement.

In [25]:
entity_field2 = ["participant.p31", "participant.p21022", "participant.p100240_i1"]
entity_field2.append('participant.eid')
entity_field2

['participant.p31',
 'participant.p21022',
 'participant.p100240_i1',
 'participant.eid']

Now we have our list of fields, we can now extract the Spark SQL using the `--sql` version.

The SQL is returned as a text file, and we can specify its name using the `-o` option. Here we have called it `cohort.sql`.

In [26]:
cmd = ["dx", "extract_dataset", dataset, "--fields", ','.join(entity_field2), 
       "--sql", "-o", "cohort.sql"]

subprocess.check_call(cmd)

0

## Connect to Spark 

Now we have the SparkSQL, we can execute it as a query on our dataset.

Make sure you only run this code block once, as Spark doesn't like to be initialized twice. 

If you accidentally do that, use **Kernel > Restart Kernel** in the JupyterLab menu and start rerunning the code blocks from scratch.

In [14]:
import pyspark
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

## Load SQL and then retrieve as Spark DataFrame

We now load our SQL from the file and clean it a little bit. 

Then we can run our query using `spark.sql()`. Running this returns a Spark DataFrame

In [27]:
with open("cohort.sql", "r") as file:
    retrieve_sql=""
    for line in file: 
        retrieve_sql += line.strip()


df = spark.sql(retrieve_sql.strip(";"))



## Convert to Pandas DataFrame

Now we need to convert our Spark DataFrame to a Pandas one. We use the built in `.toPandas()` method to do this

In [28]:
df_pandas = df.toPandas()
df_pandas.head()

Unnamed: 0,participant.p31,participant.p21022,participant.p100240_i1,participant.eid
0,0,43,1.0,sample_100_116
1,0,60,,sample_100_142
2,0,53,0.0,sample_100_285
3,0,62,,sample_100_290
4,0,67,,sample_100_304


## Save as CSV file in JupyterLab Storage

In [29]:
df_pandas.to_csv("sql_output.csv")

## Upload CSV files to Project

In [31]:
%%bash

dx upload *.csv --destination /users/tladeras/

ID                    file-GK7x6V80F5X8gxkp4v9qQXB6
Class                 file
Project               project-G3fz4600F5X7FkJz6qyZfb3g
Folder                /users/tladeras
Name                  cohort_data.csv
State                 closing
Visibility            visible
Types                 -
Properties            -
Tags                  -
Outgoing links        -
Created               Tue Dec  6 23:41:57 2022
Created by            tladeras
 via the job          job-GK7vJPQ0F5X7Y33Z4k4vkK39
Last modified         Tue Dec  6 23:41:57 2022
Media type            
archivalState         "live"
cloudAccount          "cloudaccount-dnanexus"
ID                    file-GK7x6VQ0F5XKpVb44yZPZbz5
Class                 file
Project               project-G3fz4600F5X7FkJz6qyZfb3g
Folder                /users/tladeras
Name                  extracted_data_with_code_meanings.csv
State                 closing
Visibility            visible
Types                 -
Properties            -
Tags                