Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add cloudknot example #533

Merged
merged 9 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path .
exclude_patterns = []
exclude_patterns = ["examples/cloudknot_*"]

# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
Expand Down
121 changes: 121 additions & 0 deletions examples/cloudknot_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
==============================
Using cloudknot to run pyAFQ on AWS batch:
==============================
36000 marked this conversation as resolved.
Show resolved Hide resolved

36000 marked this conversation as resolved.
Show resolved Hide resolved
The following is an example of running tractometry on AWS using a
software called cloudknot: https://nrdg.github.io/cloudknot/
36000 marked this conversation as resolved.
Show resolved Hide resolved
"""

# import cloudknot and set the correct region
36000 marked this conversation as resolved.
Show resolved Hide resolved
import cloudknot as ck
ck.set_region('us-east-1')


36000 marked this conversation as resolved.
Show resolved Hide resolved
def afq_process_subject(subject):
# define a function that each job will run
# In this case, each process does a single subject
import logging
import s3fs
# all imports must be at the top of the function
# cloudknot installs the appropriate packages from pip
import AFQ.data as afqd
import AFQ.api as api
import AFQ.mask as afm

# set logging level to your choice
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# Download the given subject to your local machine from s3
study_ixi = afqd.S3BIDSStudy(
"my_study",
"my_study_bucket",
"my_study_prefix",
subjects=[subject],
anon=False)
study_ixi.download(
"local_bids_dir",
include_derivs=["pipeline_name"])

# you can optionally provide your own segmentation file
# in this case, we look for a file with suffix 'seg'
# in the 'pipeline_name' pipeline,
# and we consider all non-zero labels to be a part of the brain
brain_mask = afm.LabelledMaskFile(
'seg', {'scope': 'pipeline_name'}, exclusive_labels=[0])

# define the api AFQ object
myafq = api.AFQ(
local_bids_dir,
dmriprep="pipeline_name",
brain_mask=brain_mask,
viz_backend='plotly', # this will generate both interactive html and GIFs
scalars=["dki_fa", "dki_md"])

# export_all runs the entire pipeline and creates many useful derivates
myafq.export_all()

# upload the results to some location on s3
myafq.upload_to_s3(
s3fs.S3FileSystem(),
f"my_study_bucket/my_study_prefix/derivatives/afq")


# here we provide a list of subjects that we have selected to process
36000 marked this conversation as resolved.
Show resolved Hide resolved
# to randomly select 3 subjects without replacement, instead do:
# subjects = [[1], [2], [3]]
# see the docstring for S3BIDSStudy.__init__ for more information
subjects = [123456, 123457, 123458]

# define the knot to run your jobs on
# this not bids for access to ec2 resources,
# so its jobs are cheaper to run but may be evicted
# installs pyAFQ from github
36000 marked this conversation as resolved.
Show resolved Hide resolved
knot = ck.Knot(
name='afq_process_subject-201009-0',
func=afq_process_subject,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

# launch a process for each subject
36000 marked this conversation as resolved.
Show resolved Hide resolved
result_futures = knot.map(subjects)

##########################################################################
# this function can be called repeatedly in a jupyter notebook
# to view the progress of jobs
# knot.view_jobs()

# you can also view the status of a specific job
# knot.jobs[0].status
##########################################################################
36000 marked this conversation as resolved.
Show resolved Hide resolved


# When all jobs are finished, remember to clobber the knot
# either using the aws console or this function in jupyter notebook:
result_futures.result() # waits for futures to resolve, not needed in notebook
knot.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)
36000 marked this conversation as resolved.
Show resolved Hide resolved

# we create another knot which takes the resulting profiles of each subject
# and combines them into one csv file
36000 marked this conversation as resolved.
Show resolved Hide resolved


def afq_combine_profiles(dummy_argument):
from AFQ.api import download_and_combine_afq_profiles
download_and_combine_afq_profiles(
36000 marked this conversation as resolved.
Show resolved Hide resolved
"temp", "my_study_bucket", "my_study_prefix/derivatives/afq")


knot2 = ck.Knot(
name='afq_combine_subjects-201009-0',
func=afq_combine_profiles,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

result_futures2 = knot2.map(["dummy_argument"], job_type="independent")
36000 marked this conversation as resolved.
Show resolved Hide resolved
result_futures2.result()
knot2.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)
183 changes: 183 additions & 0 deletions examples/cloudknot_hcp_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""
==============================
Using cloudknot to run pyAFQ with multiple configurations on AWS batch,
using the HCP dataset:
==============================

The following is an example of how to use cloudknot to run multiple
confiugrations of pyAFQ on the HCP dataset. Specifically, here we will run
pyAFQ with different tractography seeding strategies.
"""
36000 marked this conversation as resolved.
Show resolved Hide resolved

# import cloudknot and set the correct region
36000 marked this conversation as resolved.
Show resolved Hide resolved
import configparser
import itertools
import cloudknot as ck
ck.set_region('us-east-1')


36000 marked this conversation as resolved.
Show resolved Hide resolved
def afq_process_subject(subject, seed_mask, n_seeds,
aws_access_key, aws_secret_key):
# define a function that each job will run
# In this case, each process does a single subject
import logging
import s3fs
# all imports must be at the top of the function
# cloudknot installs the appropriate packages from pip
from AFQ.data import fetch_hcp
import AFQ.api as api
import AFQ.mask as afm

import numpy as np
import os.path as op

# set logging level to your choice
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# Download the given subject to your local machine from s3
36000 marked this conversation as resolved.
Show resolved Hide resolved
_, hcp_bids = fetch_hcp(
[subject],
profile_name=False,
study=f"HCP_1200",
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)

# We make a new seed mask for each process based off of the
# seed_mask argument, which is a string.
# This is to avoid any complications with pickling the masks.
if seed_mask == "roi":
seed_mask_obj = afm.RoiMask()
elif seed_mask == "fa":
seed_mask_obj = afm.ScalarMask("dti_fa")
else:
seed_mask_obj = afm.FullMask()

# Determined if n_seeds is per voxel or not
if n_seeds > 3:
random_seeds = True
else:
random_seeds = False

# set the tracking_params based off our inputs
tracking_params = {
"seed_mask": seed_mask_obj,
"n_seeds": n_seeds,
"random_seeds": random_seeds}

# use segmentation file from HCP to get a brain mask,
# where everything not labelled 0 is considered a part of the brain
brain_mask = afm.LabelledMaskFile(
'seg', {'scope': 'dmriprep'}, exclusive_labels=[0])

# define the api AFQ object
myafq = api.AFQ(
hcp_bids,
brain_mask=brain_mask,
tracking_params=tracking_params)

# export_all runs the entire pipeline and creates many useful derivates
myafq.export_all()

# upload the results to some location on s3
myafq.upload_to_s3(
s3fs.S3FileSystem(),
(f"my_study_bucket/my_study_prefix/derivatives_afq_"
f"{seed_mask}_{n_seeds}"))


# here we provide a list of subjects that we have selected to process
36000 marked this conversation as resolved.
Show resolved Hide resolved
subjects = [103818, 105923, 111312]

# here we construct lists of everything we want to test:
36000 marked this conversation as resolved.
Show resolved Hide resolved
subjects = [str(i) for i in subjects]
36000 marked this conversation as resolved.
Show resolved Hide resolved
seed_mask = ["fa", "roi"]
n_seeds = [1, 2, 1000000, 2000000]

# and here we mix the above lists, such that every subject is tried with
# every mask and every number of seeds
36000 marked this conversation as resolved.
Show resolved Hide resolved
args = list(itertools.product(subjects, seed_mask, n_seeds))

# Use configparser to get the relevant hcp keys
# Requires an hcp entry in your ~/.aws/credentials file
36000 marked this conversation as resolved.
Show resolved Hide resolved
CP = configparser.ConfigParser()
CP.read_file(open(op.join(op.expanduser('~'), '.aws', 'credentials')))
CP.sections()
aws_access_key = CP.get('hcp', 'AWS_ACCESS_KEY_ID')
aws_secret_key = CP.get('hcp', 'AWS_SECRET_ACCESS_KEY')

# This function will attach your keys to each list in a list of lists
# We use this with each list being a list of arguments,
# and we append the aws keys to each list of arguments.
36000 marked this conversation as resolved.
Show resolved Hide resolved


def attach_keys(list_of_arg_lists):
new_list_of_arg_lists = []
for args in list_of_arg_lists:
arg_ls = list(args)
arg_ls.extend([aws_access_key, aws_secret_key])
new_list_of_arg_lists.append(arg_ls)
return new_list_of_arg_lists


# here we attach the access keys to the argument list
36000 marked this conversation as resolved.
Show resolved Hide resolved
args = attach_keys(args)

# define the knot to run your jobs on
# this not bids for access to ec2 resources,
# so its jobs are cheaper to run but may be evicted
# installs pyAFQ from github
36000 marked this conversation as resolved.
Show resolved Hide resolved
knot = ck.Knot(
name='afq_hcp_tractography-201110-0',
func=afq_process_subject,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

# launch a process for each combination
# Because starmap is True, each list in args will be unfolded
# and passed into afq_process_subject as arguments
36000 marked this conversation as resolved.
Show resolved Hide resolved
result_futures = knot.map(args, starmap=True)

##########################################################################
# this function can be called repeatedly in a jupyter notebook
# to view the progress of jobs
# knot.view_jobs()

# you can also view the status of a specific job
# knot.jobs[0].status
##########################################################################
36000 marked this conversation as resolved.
Show resolved Hide resolved

# When all jobs are finished, remember to clobber the knot
# either using the aws console or this function in jupyter notebook:
36000 marked this conversation as resolved.
Show resolved Hide resolved
result_futures.result() # waits for futures to resolve, not needed in notebook
knot.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)

# we create another knot which takes the resulting profiles of each combination
# and combines them into one csv file
36000 marked this conversation as resolved.
Show resolved Hide resolved


def afq_combine_profiles(seed_mask, n_seeds):
from AFQ.api import download_and_combine_afq_profiles
download_and_combine_afq_profiles(
"temp", "my_study_bucket",
f"my_study_prefix/derivatives/afq_{seed_mask}_{n_seeds}")


knot2 = ck.Knot(
name='afq_combine_subjects-201110-0',
func=afq_combine_profiles,
base_image='python:3.8',
image_github_installs="https://github.com/yeatmanlab/pyAFQ.git",
pars_policies=('AmazonS3FullAccess',),
bid_percentage=100)

# the args here are all the different configurations of pyAFQ that we ran
36000 marked this conversation as resolved.
Show resolved Hide resolved
seed_mask = ["fa", "roi"]
n_seeds = [1, 2, 1000000, 2000000]
args = list(itertools.product(seed_mask, n_seeds))

result_futures2 = knot2.map(args, starmap=True)
result_futures2.result()
knot2.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)