In [23]:
import cloudknot as ck
import itertools
import numpy as np

In [37]:
def create_tfrecs(sites, seed):
    import nobrainer
    import numpy as np
    import os
    import os.path as op
    import pandas as pd
    import re
    
    from glob import glob
    from s3fs import S3FileSystem

    fs = S3FileSystem()
    bucket = "hbn-pod2-deep-learning"
    output_s3_dir = "/".join(["tfrecs", "_".join(sites)])
    s3_output_path = "/".join([bucket, output_s3_dir])

    if fs.exists("/".join([s3_output_path, "_".join(sites), f"seed-{seed}", "num_volumes.csv"])):
        return

    # Download the QC scores from S3 FCP-INDI
    s3_participants = pd.read_csv(
        "s3://fcp-indi/data/Projects/HBN/BIDS_curated/derivatives/qsiprep/participants.tsv",
        sep="\t",
        usecols=["subject_id", "scan_site_id", "expert_qc_score", "xgb_qc_score"],
        index_col="subject_id",
    )

    # Download nifti files from S3 to local
    local_nifti_dir = "niftis"
    local_tfrec_dir = op.join("tfrecs", "_".join(sites))
    os.makedirs(local_nifti_dir, exist_ok=True)
    os.makedirs(local_tfrec_dir, exist_ok=True)
        
    fs = S3FileSystem()
    bucket = "hbn-pod2-deep-learning"
    s3_input_dir = "b0-tensorfa-dwiqc"
    s3_path = "/".join([bucket, s3_input_dir])
    fs.get(s3_path, local_nifti_dir, recursive=True)

    nifti_files = [op.abspath(filename) for filename in glob(f"{local_nifti_dir}/*.nii.gz")]
    nifti_files = [fn for fn in nifti_files if "irregularsize" not in fn]
    sub_id_pattern = re.compile("sub-[a-zA-Z0-9]*")
    subjects = [sub_id_pattern.search(s).group(0) for s in nifti_files]
    
    participants = pd.DataFrame(data=nifti_files, index=subjects, columns=["features"])
    participants = participants.merge(
        s3_participants, left_index=True, right_index=True, how="left"
    )
    participants.dropna(subset=["xgb_qc_score"], inplace=True)
    participants.rename(columns={"xgb_qc_score": "labels"}, inplace=True)

    n_classes = 1
    batch_size = 16
    n_channels = 5
    volume_shape = (128, 128, 128, n_channels)
    block_shape = (128, 128, 128, n_channels)
    num_parallel_calls = 4

    # Get site inclusion indices for both the report set and
    # the remaining sets, which we denote test becaume the
    # train and validate sets will always be from different sites
    # Take out the report set by checking for existence of expert QC scores
    split_dataframes = {
        "report": participants.loc[
            np.logical_and(
                participants["scan_site_id"].isin(sites),
                ~participants["expert_qc_score"].isna(),
            )
        ],
        "test": participants.loc[
            np.logical_and(
                participants["scan_site_id"].isin(sites),
                participants["expert_qc_score"].isna(),
            )
        ],
    }
    
    test_size = len(split_dataframes["test"])
    report_size = len(split_dataframes["report"])

    shuffled = split_dataframes["test"].sample(
        frac=1, random_state=seed
    )
    train_size = int(0.8 * test_size)
    validate_size = test_size - train_size

    split_dataframes["train"] = shuffled.iloc[:train_size]
    split_dataframes["validate"] = shuffled.iloc[train_size:]

    filepaths = {
        split: list(df[["features", "labels"]].itertuples(index=False, name=None))
        for split, df in split_dataframes.items()
    }
    
    # Verify that all volumes have the same shape
    for split, fpaths in filepaths.items():
        invalid = nobrainer.io.verify_features_labels(
            fpaths, volume_shape=volume_shape, check_labels_int=False
        )
        print(f"{split}, Invalid:", invalid)
        assert not invalid

    # Save different sets of shuffled data
    os.makedirs(local_tfrec_dir, exist_ok=True)

    for split, fpaths in filepaths.items():
        if split in ["report", "test"] and seed == 0:
            write_path = op.join(
                local_tfrec_dir,
                f"data-{'_'.join(sites)}-{split}" + "_shard-{shard:03d}.tfrec",
            )

            print(f"Writing {len(fpaths)} {split} TFRecords to {local_tfrec_dir}")
            nobrainer.tfrecord.write(
                features_labels=fpaths,
                filename_template=write_path,
                examples_per_shard=2 * batch_size,
            )
        elif split in ["train", "validate"]:
            os.makedirs(op.join(local_tfrec_dir, f"seed-{seed}"), exist_ok=True)
            write_path = op.join(
                local_tfrec_dir,
                f"seed-{seed}",
                f"data-{'_'.join(sites)}-{split}" + "_shard-{shard:03d}.tfrec",
            )

            print(f"Writing {len(fpaths)} {split} TFRecords to {local_tfrec_dir}")
            nobrainer.tfrecord.write(
                features_labels=fpaths,
                filename_template=write_path,
                examples_per_shard=2 * batch_size,
            )

    volume_numbers = pd.DataFrame([
        {"split": "report", "n_volumes": report_size},
        {"split": "test", "n_volumes": test_size},
        {"split": "train", "n_volumes": train_size},
        {"split": "validate", "n_volumes": validate_size},
    ])
    volume_numbers.set_index("split", inplace=True, drop=True)
    volume_numbers.to_csv(op.join(local_tfrec_dir, f"seed-{seed}", "num_volumes.csv"))

    fs.put(
        local_tfrec_dir,
        s3_output_path,
        recursive=True
    )

In [38]:
di = ck.DockerImage(
    func=create_tfrecs,
    base_image="python:3.8",
    overwrite=True
)



In [39]:
di.build_path

'/Users/richford/projects/neuro/hbn/hbn-pod2/qc/hbn-pod2-qc/notebooks/cloudknot_docker_create-tfrecs_7sfstj38'

In [40]:
di.build(tags=["hbn-pod2-tfrecs-20220103"])

In [41]:
repo = ck.aws.DockerRepo(name=ck.get_ecr_repo())

In [42]:
# The very first time you run this, this command could take a few minutes
di.push(repo=repo)

In [7]:
# Specify bid_percentage to use Spot instances
# And make sure the volume size is large enough. 55-60 GB seems about right for HBN preprocessing. YMMV.
knot = ck.Knot(
    name=f"hbn-pod2-tfrecs-20220103-1",
    docker_image=di,
    pars_policies=("AmazonS3FullAccess",),
    bid_percentage=100,
    memory=8000,
    job_def_vcpus=8,
    volume_size=100,
    max_vcpus=512,
    retries=1,
    aws_resource_tags={"Project": "HBN-FCP-INDI"},
)

In [33]:
seeds = np.arange(8)
sites = [
    ["RU"],
    ["CBIC"],
    ["RU", "CUNY"],
    ["CBIC", "CUNY"],
]
args = list(itertools.product(
    sites,
    seeds,
))
args

[(['RU'], 0),
 (['RU'], 1),
 (['RU'], 2),
 (['RU'], 3),
 (['RU'], 4),
 (['RU'], 5),
 (['RU'], 6),
 (['RU'], 7),
 (['CBIC'], 0),
 (['CBIC'], 1),
 (['CBIC'], 2),
 (['CBIC'], 3),
 (['CBIC'], 4),
 (['CBIC'], 5),
 (['CBIC'], 6),
 (['CBIC'], 7),
 (['RU', 'CUNY'], 0),
 (['RU', 'CUNY'], 1),
 (['RU', 'CUNY'], 2),
 (['RU', 'CUNY'], 3),
 (['RU', 'CUNY'], 4),
 (['RU', 'CUNY'], 5),
 (['RU', 'CUNY'], 6),
 (['RU', 'CUNY'], 7),
 (['CBIC', 'CUNY'], 0),
 (['CBIC', 'CUNY'], 1),
 (['CBIC', 'CUNY'], 2),
 (['CBIC', 'CUNY'], 3),
 (['CBIC', 'CUNY'], 4),
 (['CBIC', 'CUNY'], 5),
 (['CBIC', 'CUNY'], 6),
 (['CBIC', 'CUNY'], 7)]

In [34]:
len(args)

32

In [43]:
results = knot.map(args, starmap=True)

In [44]:
knot.view_jobs()

Job ID              Name                        Status   
---------------------------------------------------------
02c76da4-b906-4a67-a6ea-229723fc383b        hbn-pod2-tfrecs-20220103-1-4        SUBMITTED
c82573e7-88fa-4476-97da-e05c463ac634        hbn-pod2-tfrecs-20220103-1-1        FAILED   
828c7c06-01c8-4b77-8000-e1f0625c4429        hbn-pod2-tfrecs-20220103-1-3        FAILED   
a0e5893c-4aa1-4a3b-8455-8ecd6283a56c        hbn-pod2-tfrecs-20220103-1-0        FAILED   
0f6f838a-7ed3-4024-8eab-70573121119a        hbn-pod2-tfrecs-20220103-1-2        SUCCEEDED


In [45]:
knot.clobber(clobber_pars=True)