Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add cloudknot example #533

Merged
merged 9 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path .
exclude_patterns = []
exclude_patterns = ["examples/cloudknot_*"]

# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
Expand Down
165 changes: 165 additions & 0 deletions examples/cloudknot_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
==========================================
Using cloudknot to run pyAFQ on AWS batch:
==========================================
One of the purposes of ``pyAFQ`` is to analyze large-scale openly-available datasets,
such as those in the `Human Connectome Project <https://www.humanconnectome.org/>`_.

36000 marked this conversation as resolved.
Show resolved Hide resolved
To analyze these datasets, large amounts of compute are needed. One way to gain access
to massive computational power is by using cloud computing. Here, we will demonstrate
how to use ``pyAFQ`` in the Amazon Web Services cloud.

We will rely on the `AWS Batch Service <https://aws.amazon.com/batch/>`_ , and we will
submit work into AWS Batch using software that our group developed called
`Cloudknot <https://nrdg.github.io/cloudknot/>`_.
"""

##########################################################################
# Import cloudknot and set the AWS region within which computations will take place. Setting a
# region is important, because if the data that you are analyzing is stored in
# `AWS S3 <https://aws.amazon.com/s3/>`_ in a particular region, it is best to run the computation
# in that region as well. That is because AWS charges for inter-region transfer of data.
import cloudknot as ck
ck.set_region('us-east-1')

##########################################################################
# Define the function to use
# --------------------------
# ``Cloudknot`` uses the single program multiple data paradigm of computing. This means that the same
# function will be run on multiple different inputs. For example, a ``pyAFQ`` processing function run
# on multiple different subjects in a dataset.
# Below, we define the function that we will use. Notice that ``Cloudknot`` functions include the
# import statements of the dependencies used. This is necessary so that ``Cloudknot`` knows
# what dependencies to install into AWS Batch to run this function.


36000 marked this conversation as resolved.
Show resolved Hide resolved
def afq_process_subject(subject):
# define a function that each job will run
# In this case, each process does a single subject
import logging
import s3fs
# all imports must be at the top of the function
# cloudknot installs the appropriate packages from pip
import AFQ.data as afqd
import AFQ.api as api
import AFQ.mask as afm

# set logging level to your choice
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# Download the given subject to your local machine from s3
study_ixi = afqd.S3BIDSStudy(
"my_study",
"my_study_bucket",
"my_study_prefix",
subjects=[subject],
anon=False)
study_ixi.download(
"local_bids_dir",
include_derivs=["pipeline_name"])

# you can optionally provide your own segmentation file
# in this case, we look for a file with suffix 'seg'
# in the 'pipeline_name' pipeline,
# and we consider all non-zero labels to be a part of the brain
brain_mask = afm.LabelledMaskFile(
'seg', {'scope': 'pipeline_name'}, exclusive_labels=[0])

# define the api AFQ object
myafq = api.AFQ(
local_bids_dir,
dmriprep="pipeline_name",
brain_mask=brain_mask,
viz_backend='plotly', # this will generate both interactive html and GIFs
scalars=["dki_fa", "dki_md"])

# export_all runs the entire pipeline and creates many useful derivates
myafq.export_all()

# upload the results to some location on s3
myafq.upload_to_s3(
s3fs.S3FileSystem(),
f"my_study_bucket/my_study_prefix/derivatives/afq")


##########################################################################
# Here we provide a list of subjects that we have selected to process
# to randomly select 3 subjects without replacement, instead do:
# subjects = [[1], [2], [3]]
# see the docstring for S3BIDSStudy.__init__ for more information
subjects = [123456, 123457, 123458]

##########################################################################
# Defining a ``Knot`` instance
# ---------------------------------
# We instantiate a class instance of the :class:`ck.Knot` class. This object will be used to run your jobs.
# The object is instantiated with the `'AmazonS3FullAccess'` policy, so that it can write the results
# out to S3, into a bucket that you have write permissions on.
# Setting the `bid_percentage` key-word makes AWS Batch use
# `spot EC2 instances <https://aws.amazon.com/ec2/spot/>`_ for the computation.
# This can result in substantial cost-savings, as spot compute instances can cost
# much less than on-demand instances. However, not that spot instances can also
# be evicted, so if completing all of the work is very time-sensitive, do not set this
# key-word argument. Using the `image_github_installs` key-word argument will
# install pyAFQ from GitHub. You can also specify other forks and branches to
# install from.
knot = ck.Knot(
name='afq_process_subject-201009-0',
func=afq_process_subject,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

##########################################################################
# Launching the computation
# --------------------------------
# The :meth:`map` method of the :class:`Knot object maps each of the inputs provided
# as a sequence onto the function and executes the function on each one of them in
# parallel.
result_futures = knot.map(subjects)

##########################################################################
# Once computations have started, you can call the following
# function to view the progress of jobs::
#
# knot.view_jobs()
#
# You can also view the status of a specific job::
#
# knot.jobs[0].status


##########################################################################
# When all jobs are finished, remember to use the :meth:`clobber` method to
# destroy all of the AWS resources created by the :class:`Knot`
result_futures.result()
knot.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)

##########################################################################
# In a second :class:`Knot` object, we use a function that takes the resulting profiles of each subject
# and combines them into one csv file.


def afq_combine_profiles(dummy_argument):
from AFQ.api import download_and_combine_afq_profiles
download_and_combine_afq_profiles(
36000 marked this conversation as resolved.
Show resolved Hide resolved
"temp", "my_study_bucket", "my_study_prefix/derivatives/afq")


knot2 = ck.Knot(
name='afq_combine_subjects-201009-0',
func=afq_combine_profiles,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

##########################################################################
# This knot is called with a dummy argument, which is not used within the function itself. The
# `job_type` key-word argument is used to signal to ``Cloudknot`` that only one job is submitted
# rather than the default array of jobs.
result_futures2 = knot2.map(["dummy_argument"], job_type="independent")
36000 marked this conversation as resolved.
Show resolved Hide resolved
result_futures2.result()
knot2.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)
210 changes: 210 additions & 0 deletions examples/cloudknot_hcp_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""
==========================
AFQ with HCP data
==========================
This example demonstrates how to use the AFQ API to analyze HCP data.
For this example to run properly, you will need to gain access to the HCP data.
This can be done by following this instructions on the webpage
`here <https://wiki.humanconnectome.org/display/PublicData/How+To+Connect+to+Connectome+Data+via+AWS>`_.
We will use the ``Cloudknot`` library to run our AFQ analysis in the AWS
Batch service (see also
`this example <http://yeatmanlab.github.io/pyAFQ/auto_examples/cloudknot_example.html>`_).
In the following we will use ``Cloudknot`` to run multiple
configurations of pyAFQ on the HCP dataset. Specifically, here we will run
pyAFQ with different tractography seeding strategies.
"""

##########################################################################
# Import cloudknot and set the correct region. The HCP data is stored in `us-east-1`, so it's best
# to analyze it there.
import configparser
import itertools
import cloudknot as ck
ck.set_region('us-east-1')

##########################################################################
# Define a function to run. This function allows us to pass in the subject ID for the subjects we would
# like to analyze , as well as strategies for seeding tractography (different masks and/or different
# numbers of seeds per voxel).


36000 marked this conversation as resolved.
Show resolved Hide resolved
def afq_process_subject(subject, seed_mask, n_seeds,
aws_access_key, aws_secret_key):
# define a function that each job will run
# In this case, each process does a single subject
import logging
import s3fs
# all imports must be at the top of the function
# cloudknot installs the appropriate packages from pip
from AFQ.data import fetch_hcp
import AFQ.api as api
import AFQ.mask as afm

import numpy as np
import os.path as op

# set logging level to your choice
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# Download the given subject to the AWS Batch machine from s3
_, hcp_bids = fetch_hcp(
[subject],
profile_name=False,
study=f"HCP_1200",
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)

# We make a new seed mask for each process based off of the
# seed_mask argument, which is a string.
# This is to avoid any complications with pickling the masks.
if seed_mask == "roi":
seed_mask_obj = afm.RoiMask()
elif seed_mask == "fa":
seed_mask_obj = afm.ScalarMask("dti_fa")
else:
seed_mask_obj = afm.FullMask()

# Determined if n_seeds is per voxel or not
if n_seeds > 3:
random_seeds = True
else:
random_seeds = False

# set the tracking_params based off our inputs
tracking_params = {
"seed_mask": seed_mask_obj,
"n_seeds": n_seeds,
"random_seeds": random_seeds}

# use segmentation file from HCP to get a brain mask,
# where everything not labelled 0 is considered a part of the brain
brain_mask = afm.LabelledMaskFile(
'seg', {'scope': 'dmriprep'}, exclusive_labels=[0])

# define the api AFQ object
myafq = api.AFQ(
hcp_bids,
brain_mask=brain_mask,
tracking_params=tracking_params)

# export_all runs the entire pipeline and creates many useful derivates
myafq.export_all()

# upload the results to some location on s3
myafq.upload_to_s3(
s3fs.S3FileSystem(),
(f"my_study_bucket/my_study_prefix/derivatives_afq_"
f"{seed_mask}_{n_seeds}"))


##########################################################################
# In this example, we will process the data from the following subjects
subjects = [103818, 105923, 111312]

##########################################################################
# We will test combinations of different conditions:
# subjects, seed masks, and number of seeds
seed_mask = ["fa", "roi"]
n_seeds = [1, 2, 1000000, 2000000]

##########################################################################
# The following function creates all the combinations of the above lists, such that every subject is
# run with every mask and every number of seeds.
args = list(itertools.product(subjects, seed_mask, n_seeds))

##########################################################################
# We assume that the credentials for HCP usage are stored in the home directory in a
# `~/.aws/credentials` file. This is where these credentials are stored if the AWS CLI is used to
# configure the profile. We use the standard lib ``configparser`` library
# to get the relevant hcp keys from there.
CP = configparser.ConfigParser()
CP.read_file(open(op.join(op.expanduser('~'), '.aws', 'credentials')))
CP.sections()
aws_access_key = CP.get('hcp', 'AWS_ACCESS_KEY_ID')
aws_secret_key = CP.get('hcp', 'AWS_SECRET_ACCESS_KEY')

##########################################################################
# The following function will attach your AWS keys to each list in a list of lists
# We use this with each list being a list of arguments,
# and we append the AWS keys to each list of arguments, so that we can pass
# them into the function to be used on AWS Batch to download the data into the
# AWS Batch machines.


def attach_keys(list_of_arg_lists):
new_list_of_arg_lists = []
for args in list_of_arg_lists:
arg_ls = list(args)
arg_ls.extend([aws_access_key, aws_secret_key])
new_list_of_arg_lists.append(arg_ls)
return new_list_of_arg_lists


##########################################################################
# This calls the function to attach the access keys to the argument list
args = attach_keys(args)

##########################################################################
# Define the :meth:`Knot` object to run your jobs on. See
# `this example <http://yeatmanlab.github.io/pyAFQ/auto_examples/cloudknot_example.html>`_ for more
# details about the arguments to the object.
knot = ck.Knot(
name='afq_hcp_tractography-201110-0',
func=afq_process_subject,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

##########################################################################
# This launches a process for each combination.
# Because `starmap` is `True`, each list in `args` will be unfolded
# and passed into `afq_process_subject` as arguments.
result_futures = knot.map(args, starmap=True)

##########################################################################
# The following function can be called repeatedly in a jupyter notebook
# to view the progress of jobs::
#
# knot.view_jobs()
#
# You can also view the status of a specific job::
#
# knot.jobs[0].status

##########################################################################
# When all jobs are finished, remember to clobber the knot to destroy all the resources that were
# created in AWS.
result_futures.result() # waits for futures to resolve, not needed in notebook
knot.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)

##########################################################################
# We continue processing to create another knot which takes the resulting profiles of each
# combination and combines them all into one csv file


def afq_combine_profiles(seed_mask, n_seeds):
from AFQ.api import download_and_combine_afq_profiles
download_and_combine_afq_profiles(
"temp", "my_study_bucket",
f"my_study_prefix/derivatives/afq_{seed_mask}_{n_seeds}")


knot2 = ck.Knot(
name='afq_combine_subjects-201110-0',
func=afq_combine_profiles,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

##########################################################################
# the arguments to this call to :meth:`map` are all the different configurations of pyAFQ that we ran
seed_mask = ["fa", "roi"]
n_seeds = [1, 2, 1000000, 2000000]
args = list(itertools.product(seed_mask, n_seeds))

result_futures2 = knot2.map(args, starmap=True)
result_futures2.result()
knot2.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)