Skip to content

Commit

Permalink
Fixed stuff...
Browse files Browse the repository at this point in the history
  • Loading branch information
jrussell9000 committed Mar 30, 2024
1 parent bb68ede commit 34077b3
Show file tree
Hide file tree
Showing 21 changed files with 1,777 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
__pycache__/
*.py[cod]
*$py.class
abcd/config.py
1 change: 0 additions & 1 deletion abcd

This file was deleted.

4 changes: 4 additions & 0 deletions abcd/.license
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
deretzlaff@wisc.edu
56145
*CmnfUG3ssjKQ
FSV0KOKap6at2
211 changes: 211 additions & 0 deletions abcd/ABCDfastPipe_Docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# fmt:off
import os
# FastSurfer (actually numpy) crashes when run with multiple threads
# https://github.com/Deep-MI/FastSurfer/issues/371
# Temporary fix is to set all possible numpy threads variables before loading numpy
nThreads = "4"
os.environ["OMP_NUM_THREADS"] = nThreads
os.environ["OPENBLAS_NUM_THREADS"] = nThreads
os.environ["MKL_NUM_THREADS"] = nThreads
os.environ["VECLIB_MAXIMUM_THREADS"] = nThreads
os.environ["NUMEXPR_NUM_THREADS"] = nThreads

import argparse
import config
import keyring
import sys
from dataclasses import dataclass
from ndaDownload import Downloader
from fastSurferShell import FastSurfer
# from subSegment import SubSegment
from utils import s3Upload, xzPack, setup_logger, HelpFormatter
from pathlib import Path
# fmt:on

####################
# ## ARGUMENTS ### #
####################

# For k8s, the only required argument is "-s"
def make_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="ABCD FastSurfer Pipeline v1.0",
formatter_class=HelpFormatter,
description="""This pipeline automagically downloads minimally-processed T1 (and optionally, T2) \
NIFTI files from the NDA ABCD Study repository, processes them using FastSurfer, optionally sub-segments \
subcortical regions using FreeSurfer's tools, then uploads to compressed output to an S3 bucket. \
\
Specifically, using the credentials for a pre-created miNDAR package containing minimally-processed scans, \
this pipeline connects to the NDA database server to get the data table inside that package. The S3 paths \
to each file are read from the data table. NIFTI file(s) for each subject-timepoint pairing are downloaded \
from the NDA repository to local storage. The T1 scan file is then parcellated and segmented using FastSurfer. \
Optionally, FreeSurfer's various subcortial segmentation tools may be run on the FastSurfer output, which may \
include the T2 scan file if available. Finally, the output directory is compressed and uploaded to an \
AWS bucket specified by the user.""")

parser.add_argument(
"-s", "--sbjid_time",
dest="rawsubjses_str",
required=True,
help="""The underscore separated pairing of the subject ID and timepoint (e.g., NDARINV00XXXXY_baselineYear1Arm1) representing
the subject-timepoint pair to be processed.""")

parser.add_argument(
'-i', '--input',
dest='input_dir',
default='/input',
help="""Path to the input directory where NIFTIs from NDA will be stored. Optional (default: /input)""")

parser.add_argument(
"-l", "--log",
dest="log_dir",
default="/log",
help="""Optional path where a running logfile will be written. Optional (default: /log)""")

parser.add_argument(
"-u", "--username",
dest="NDA_username",
default="jrusse10",
help="""NDA username (NOT miNDAR or Login.gov).""")

# For some reason I can't currently figure out, including this argument causes
# miNDAR_packageID to be set to None when passed to the Downloader
# parser.add_argument(
# "-p", "--packageID",
# dest="miNDAR_packageID",
# required=False,
# help="""Package ID for the miNDAR containing the scans to be processed. \
# This is equivalent to the last seven characters of the miNDAR username.""")

# parser.add_argument(
# "-t", "--table",
# dest="miNDAR_tablename",
# default="fmriresults01",
# help="""Optional name of the miNDAR package's data table (default: fmriresults01).""")

parser.add_argument(
"--nThreads",
default="4",
help="""Number of threads to assign to FastSurfer instance (default: 4).""")

parser.add_argument(
"-so", "--segonly",
dest="seg_only",
default=False,
help="""Only run the volumetric segmentation portion of FastSurfer.
Note: Subcortical subfield segmentation cannot be performed if this \
option is selected (default: False).""")

parser.add_argument(
"-sc", "--subseg",
dest="run_subseg",
default=False,
help="""Run the subcortical subfield segmentation portion of FastSurfer \
(default: False).""")

parser.add_argument(
"-s3", "--s3bucket",
dest="aws_s3bucket",
default="brc.abcd",
help="""Name of the S3 bucket that will hold the compressed output files. \
(e.g., brc.abcd)""")

parser.add_argument(
"--aws_accesskey",
dest="aws_access_key_id",
required=False,
help="""AWS access key for the S3 bucket.""")

parser.add_argument(
"--aws_secretkey",
dest="aws_secret_access_key",
required=False,
help="""AWS secret key for the S3 bucket.""")

return parser

@dataclass
class FastPipe:
rawsubjses_str: str
input_dir: str
log_dir: str
nThreads: str
seg_only: bool
run_subseg: bool
NDA_username: str
miNDAR_packageID: str
aws_access_key_id: str
aws_secret_access_key: str
aws_s3_bucket: str

def __post_init__(self):
self.pipeSetup()
self.download()
self.fastSurfer()
self.compress()
self.upload()

# Creating directories
def pipeSetup(self):
Path(self.input_dir).mkdir(parents=True, exist_ok=True)
Path(self.log_dir).mkdir(parents=True, exist_ok=True)

# Downloading files specific to this subject_ses from NDA
def download(self):
Downloader(rawsubjses_str = self.rawsubjses_str, input_dir = self.input_dir,
NDA_username = self.NDA_username, miNDAR_packageID = self.miNDAR_packageID,
nThreads = self.nThreads)

# Running FastSurfer segmentation and parcellation (optional)
def fastSurfer(self):
FastSurfer(self.rawsubjses_str, self.input_dir, self.nThreads, self.seg_only)

# def subSegment(self):
# # Starting the logger
# setup_logger(self.rawsubjses_str, self.log_dir)
# # Running subcortical subfield segmentation
# logTitle(f'Running Subcortical Subfield Segmentation for {self.rawsubjses_str}')
# SubSegment(self.rawsubjses_str, self.input_dir, self.nThreads, MCRdir)
# return self.rawsubjses_str

# Packing up the results
def compress(self):
xzPack(self.rawsubjses_str)

# Uploading to S3
def upload(self):
s3Upload(rawsubjses_str=self.rawsubjses_str, access_key_id=self.aws_access_key_id,
secret_access_key=self.aws_secret_access_key, s3bucket=self.aws_s3_bucket)


def main(
rawsubjses_str: str = None,
input_dir: str = '/input',
log_dir: str = '/log',
nThreads: str = 1,
seg_only: bool = False,
run_subseg: bool = False,
NDA_username: str = config.nda_username,
miNDAR_packageID: str = "1226193",
aws_access_key_id: str = config.aws_access_key_id,
aws_secret_access_key: str = config.aws_secret_access_key,
aws_s3bucket: str = "brc.abcd"):


# Need to replace with args
keyring.set_password('nda-tools', config.nda_username, config.nda_password)

try:
setup_logger(rawsubjses_str, log_dir)
FastPipe(rawsubjses_str, input_dir, log_dir, nThreads, seg_only, run_subseg, NDA_username,
miNDAR_packageID, aws_access_key_id, aws_secret_access_key, aws_s3bucket)
except RuntimeError as e:
print(e)
return e.args[0]

return 0


if __name__ == '__main__':

arguments = make_parser().parse_args()
sys.exit(main(**vars(arguments)))
157 changes: 157 additions & 0 deletions abcd/ABCDfastPipe_Docker_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# fmt:off
import os
# FastSurfer (actually numpy) crashes when run with multiple threads
# https://github.com/Deep-MI/FastSurfer/issues/371
# Temporary fix is to set all possible numpy threads variables before loading numpy
nThreads = "4"
os.environ["OMP_NUM_THREADS"] = nThreads
os.environ["OPENBLAS_NUM_THREADS"] = nThreads
os.environ["MKL_NUM_THREADS"] = nThreads
os.environ["VECLIB_MAXIMUM_THREADS"] = nThreads
os.environ["NUMEXPR_NUM_THREADS"] = nThreads

import argparse
import keyring
import sys
from dataclasses import dataclass
from ndaDownload import Downloader
from fastSurferShell import FastSurfer
from subSegment import SubSegment
from utils import s3Upload, xzPack, setup_logger, logTitle, HelpFormatter
from pathlib import Path
# fmt:on

@dataclass
class FastPipe:
rawsubjses_str: str
input_dir: str
log_dir: str
nThreads: str
seg_only: bool
run_subseg: bool
NDA_username: str
miNDAR_packageID: str
aws_access_key_id: str
aws_secret_access_key: str
aws_s3_bucket: str

def __post_init__(self):
self.pipeSetup()
self.download()
self.fastSurfer()
if self.run_subseg:
self.subSegment()
self.compress()
self.upload()

def pipeSetup(self):
# Creating directories
Path(self.input_dir).mkdir(parents=True, exist_ok=True)
Path(self.log_dir).mkdir(parents=True, exist_ok=True)

def download(self):
# Starting the logger
setup_logger(self.rawsubjses_str, self.log_dir)
# Downloading files specific to this subject_ses from NDA
logTitle(f'Downloading and Unpacking Scan Files for {self.rawsubjses_str}')
try:
Downloader(rawsubjses_str = self.rawsubjses_str, input_dir = self.input_dir,
NDA_username = self.NDA_username, miNDAR_packageID = self.miNDAR_packageID,
nThreads = self.nThreads)
except RuntimeError as e:
print(e)
return self.rawsubjses_str

def fastSurfer(self):
# Starting the logger
setup_logger(self.rawsubjses_str, self.log_dir)
# Running FastSurfer segmentation
logTitle(f'Running FastSurfer Segmentation for {self.rawsubjses_str}')
FastSurfer(self.rawsubjses_str, self.input_dir, self.nThreads, self.seg_only)
return self.rawsubjses_str

def subSegment(self):
# Starting the logger
setup_logger(self.rawsubjses_str, self.log_dir)
# Running subcortical subfield segmentation
logTitle(f'Running Subcortical Subfield Segmentation for {self.rawsubjses_str}')
SubSegment(self.rawsubjses_str, self.input_dir, self.nThreads, MCRdir)
return self.rawsubjses_str

def compress(self):
# Starting the logger
setup_logger(self.rawsubjses_str, self.log_dir)
# Packing up the results
logTitle(f'Compressing Output files for {self.rawsubjses_str}')
xzPack(self.rawsubjses_str)

def upload(self):
# Starting the logger
setup_logger(self.rawsubjses_str, self.log_dir)
# Uploading to S3
logTitle(f'Uploading Output files for {self.rawsubjses_str} to S3')
s3Upload(rawsubjses_str=self.rawsubjses_str, access_key_id=self.aws_access_key_id,
secret_access_key=self.aws_secret_access_key, s3bucket=self.aws_s3_bucket)


def fastmain(rawsubjses_str):

input_dir: str = '/input'
log_dir: str = '/log'
nThreads: str = 1
seg_only: bool = False
run_subseg: bool = False
NDA_username: str = "jrusse10"
miNDAR_packageID: str = "1226193"
aws_access_key_id: str = 'AKIAQMJTZW6JRMGTOUCW'
aws_secret_access_key: str = 'Qv3kvwius5Z/QnyQH7K2CIkkHEhaO0IpjIiqAomP'
aws_s3bucket: str = "brc.abcd"


# Need to replace with args
keyring.set_password('nda-tools', 'jrusse10', '19Ireland61')

try:
FastPipe(rawsubjses_str, input_dir, log_dir, nThreads, seg_only, run_subseg, NDA_username,
miNDAR_packageID, aws_access_key_id, aws_secret_access_key, aws_s3bucket)
except RuntimeError as e:
print(e)
return e.args[0]

return 0

#!/usr/bin/env python
import pika, sys, os

def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='fastqueue')

def callback(ch, method, properties, body):
print(f" [x] Received {body}")
fastmain(body)

channel.basic_consume(queue='fastqueue', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

return 0

if __name__ == '__main__':
sys.exit(main())




if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Loading

0 comments on commit 34077b3

Please sign in to comment.