In [3]:
import logging
from pathlib import Path
from Pegasus.api import *
import glob
import os
import pandas as pd
logging.basicConfig(level = logging.DEBUG)

#Properties
props = Properties()
props["dagman.retry"] = "100"
props["pegasus.transfer.arguments"] = "-m 1"
props.write()


# import os
# os.environ['KAGGLE_USERNAME'] = "vedula"
# os.environ['KAGGLE_KEY'] = "482a5c14ced45f63f3698eacb8fa0c62"

# import kaggle
# kaggle.api.dataset_download_files('nikhilpandey360/chest-xray-masks-and-labels/download', path='.', unzip=True)

tc = TransformationCatalog()


preprocess = Transformation(
                "preprocess",
                site="local",
                pfn=str(Path(".").resolve() / "preprocess.py"),
                is_stageable=True
            )

data_split = Transformation(
                "data_split",
                site="local",
                pfn=str(Path(".").resolve() / "data_split.py"),
                is_stageable=True
            )

train_model = Transformation( 
                "train_model",
                site="local",
                pfn=str(Path(".").resolve() / "train_model.py"),
                is_stageable=True
            )

predict_masks = Transformation( 
                "predict_masks",
                site="local",
                pfn=str(Path(".").resolve() / "prediction.py"),
                is_stageable=True
            )


tc = TransformationCatalog().add_transformations(preprocess, data_split, train_model, predict_masks).write()

file_list = []
output_list = []

rc = ReplicaCatalog()

for file in glob.glob("./train_images/*.png"):
    f = file.replace("./train_images/", '')
    file_list.append(File(f))
    rc.add_replica("local", File(f), Path("./train_images/").resolve() / f)
    
for file in glob.glob("./train_masks/*.png"):
    f = file.replace("./train_masks/", '')
    file_list.append(File(f))
    rc.add_replica("local", File(f), Path("./train_masks/").resolve() / f)
    
for file in glob.glob("./test/*.png"):
    f = file.replace("./test/", '')
    file_list.append(File(f))
    rc.add_replica("local", File(f), Path("./test/").resolve() / f)
    

    
checkpoint_file = "study_checkpoint.pkl"

if not os.path.isfile(checkpoint_file):
    df = pd.DataFrame(list())
    df.to_pickle(checkpoint_file)

rc.add_replica("local", checkpoint_file, Path(".").resolve() / checkpoint_file)
    
rc.write()

for filename in glob.glob("./train_images/*.png"):
    f = filename.replace("./train_images/", '').strip(".png")+"_norm.png"
    output_list.append(File(f))


for filename in glob.glob("./train_masks/*.png"):
    f = filename.replace("./train_masks/", '').strip(".png")+"_norm.png"
    output_list.append(File(f))

for filename in glob.glob("./test/*.png"):
    f = filename.replace("./test/", '').strip(".png")+"_norm.png"
    output_list.append(File(f))

        
wf = Workflow("preprocess")
    
job_preprocess = Job(preprocess)\
                    .add_inputs(*file_list)\
                    .add_outputs(*output_list)

data_split_file = File("data_split.pkl")

job_data_split = Job(data_split)\
                    .add_inputs(*output_list)\
                    .add_outputs(data_split_file)

model = File("model.h5")

train_files = []

for filename in glob.glob("./train_images/*.png"):
    f = filename.replace("./train_images/", '').strip(".png")+"_norm.png"
    train_files.append(File(f))


for filename in glob.glob("./train_masks/*.png"):
    f = filename.replace("./train_masks/", '').strip(".png")+"_norm.png"
    train_files.append(File(f))

job_train = Job(train_model)\
                    .add_checkpoint(File(checkpoint_file), stage_out=True)\
                    .add_inputs(*train_files, data_split_file)\
                    .add_profiles(Namespace.PEGASUS, key="maxwalltime", value=1)\
                    .add_outputs(model)

test_list = []
mask_output = []

for filename in glob.glob("./test/*.png"):
    f = filename.replace("./test/", '').strip(".png")+"_norm.png"
    test_list.append(File(f))

for filename in glob.glob("./test/*.png"):
    f = filename.replace("./test/", '').strip(".png")+"_norm_mask.png"
    mask_output.append(File(f))

job_predict = Job(predict_masks)\
                    .add_inputs(model, *test_list, data_split_file)\
                    .add_outputs(*mask_output)


wf.add_jobs(job_preprocess, job_data_split, job_train, job_predict)

try:
    wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)  


################
# pegasus-plan #
################
2021.01.25 23:28:14.708 UTC:
2021.01.25 23:28:14.714 UTC:   -----------------------------------------------------------------------
2021.01.25 23:28:14.720 UTC:   File for submitting this DAG to HTCondor           : preprocess-0.dag.condor.sub
2021.01.25 23:28:14.725 UTC:   Log of DAGMan debugging messages                 : preprocess-0.dag.dagman.out
2021.01.25 23:28:14.731 UTC:   Log of HTCondor library output                     : preprocess-0.dag.lib.out
2021.01.25 23:28:14.737 UTC:   Log of HTCondor library error messages             : preprocess-0.dag.lib.err
2021.01.25 23:28:14.743 UTC:   Log of the life of condor_dagman itself          : preprocess-0.dag.dagman.log
2021.01.25 23:28:14.749 UTC:
2021.01.25 23:28:14.755 UTC:   -no_submit given, not submitting DAG to HTCondor.  You can do this with:
2021.01.25 23:28:14.767 UTC:   -----------------------------------------------------------------------
2021.01.25 23:28:16.064 UTC:  

[[1;32m##################################################[0m] 100.0% ..Success ([1;32mCompleted: 25[0m, [1;33mQueued: 0[0m, [1;36mRunning: 0[0m, [1;31mFailed: 0[0m)



####################
# pegasus-analyzer #
####################
Your database is compatible with Pegasus version: 5.0.0dev

************************************Summary*************************************

Submit Directory   : /home/scitech/shared-data/UNet_wf/scitech/pegasus/preprocess/run0017
Total jobs         :     25 (100.00%)
# jobs succeeded   :     25 (100.00%)
# jobs failed      :      0 (0.00%)
# jobs held        :      0 (0.00%)
# jobs unsubmitted :      0 (0.00%)



######################
# pegasus-statistics #
######################
Your database is compatible with Pegasus version: 5.0.0dev

#
# Pegasus Workflow Management System - http://pegasus.isi.edu
#
# Workflow summary:
#   Summary of the workflow execution. It shows total
#   tasks/jobs/sub workflows run, how many succeeded/failed etc.
#   In case of hierarchical workflow the calculation shows the
#   statistics across all the sub workflows.It shows the following
#   statistics about tasks, jobs and sub workflows.
#

In [16]:
import logging
from pathlib import Path
from Pegasus.api import *
import glob
import os
import sys
import pandas as pd
from argparse import ArgumentParser

logging.basicConfig(level = logging.DEBUG)

# parser = ArgumentParser(description="Generates and runs lung instance segmentation workflow")
# parser.add_argument(
#     "--lung-img-dir",
#     default=Path(".") / "inputs/train_images",
#     help="Path to directory containing lung images for training and validation"
# )
# parser.add_argument(
#     "--lung-mask-img-dir",
#     default=Path(".") / "inputs/train_masks",
#     help="Path to directory containing lung mask images for training and validation"
# )
# parser.add_argument(
#     "--test-img-dir",
#     default=Path(".") / "inputs/test_images",
#     help="Path to directory containing test lung images."
# )
# parser.add_argument(
#     "--num-process-jobs",
#     default=1,
#     type=int,
#     help="""Number of pre-processing jobs. Input files are divided evenly amongst
#     the number of pre-processing jobs. If this value exceeds the number input files
#     extra jobs will not be added. If the number of input files cannot be divided evenly
#     amongs each process job, one process job will be assigned extra files to process.
#     For example, given 3 input files, and num-process-jobs=2, 2 process jobs will
#     be created where one job gets a single file and the other job gets 2 files."""
# )
# args = parser.parse_args(sys.argv[1:])
# if args.num_process_jobs < 1:
#     raise ValueError("--num-process-jobs must be >= 1")


LUNG_IMG_DIR = Path("./train_images")
LUNG_MASK_IMG_DIR = Path("./train_masks")
TEST_IMG_DIR = Path("./test")


#Properties
props = Properties()
props["dagman.retry"] = "100"
props["pegasus.transfer.arguments"] = "-m 1"
props.write()


# import os
# os.environ['KAGGLE_USERNAME'] = "vedula"
# os.environ['KAGGLE_KEY'] = "482a5c14ced45f63f3698eacb8fa0c62"

# import kaggle
# kaggle.api.dataset_download_files('nikhilpandey360/chest-xray-masks-and-labels/download', path='.', unzip=True)

tc = TransformationCatalog()


preprocess = Transformation(
                "preprocess",
                site="local",
                pfn=str(Path(".").resolve() / "preprocess.py"),
                is_stageable=True
            )

# data_split = Transformation(
#                 "data_split",
#                 site="local",
#                 pfn=str(Path(".").resolve() / "data_split.py"),
#                 is_stageable=True
#             )

train_model = Transformation( 
                "train_model",
                site="local",
                pfn=str(Path(".").resolve() / "train_model.py"),
                is_stageable=True
            )

predict_masks = Transformation( 
                "predict_masks",
                site="local",
                pfn=str(Path(".").resolve() / "prediction.py"),
                is_stageable=True
            )

tc.add_transformations(preprocess, train_model, predict_masks)

# log.info("writing tc with transformations: {}, containers: {}".format([k for k in tc.transformations], [k for k in tc.containers]))
tc.write()

# --- Write ReplicaCatalog -----------------------------------------------------
training_input_files = []
test_input_files = []

rc = ReplicaCatalog()

for _dir, _list in [
        (LUNG_IMG_DIR, training_input_files), 
        (LUNG_MASK_IMG_DIR, training_input_files), 
        (TEST_IMG_DIR, test_input_files)
    ]:
    for f in _dir.iterdir():
        if f.name.endswith(".png"):
            _list.append(File(f.name))
            rc.add_replica(site="local", lfn=f.name, pfn=f.resolve())

# train job checkpoint file (empty one should be given if none exists)
p = Path(".").resolve() / "study_checkpoint.pkl"
if not p.exists():
    df = pd.DataFrame(list())
    df.to_pickle(p.name)


checkpoint = File(p.name)
rc.add_replica(site="local", lfn=checkpoint, pfn=p.resolve())

# log.info("writing rc with {} files collected from: {}".format(len(training_input_files) + len(test_input_files), [LUNG_IMG_DIR, LUNG_MASK_IMG_DIR, TEST_IMG_DIR]))
rc.write()

# --- Generate and run Workflow ------------------------------------------------
wf = Workflow("lung-instance-segmentation-wf")

# all input files to be processed
input_files = training_input_files + test_input_files

# create at most len(input_files) number of process jobs
num_process_jobs = min(1, len(input_files))

# create the preproces jobs
process_jobs = [Job(preprocess) for i in range(num_process_jobs)]

# evenly distribute input files to process jobs
for i, f in enumerate(input_files):
    curr = i % num_process_jobs
    process_jobs[curr].add_inputs(f)
    process_jobs[curr].add_outputs(File(f.lfn.replace(".png", "_norm.png")))

wf.add_jobs(*process_jobs)
# log.info("generated {} process jobs".format(num_process_jobs))

# files to be used for training/valid (lung imgs w/mask imgs)
processed_training_files = [File(f.lfn.replace(".png", "_norm.png")) for f in training_input_files]

# files to be used for prediction
processed_test_files = [File(f.lfn.replace(".png", "_norm.png")) for f in test_input_files]

# create training job
# log.info("generating train_model job")
model = File("model.h5")
train_job = Job(train_model)\
                .add_inputs(*processed_training_files)\
                .add_outputs(model)\
                .add_checkpoint(checkpoint)

wf.add_jobs(train_job)

# create mask prediction job
# log.info("generating prediction job; using {} test lung images".format(len(processed_test_files)))
predicted_masks = [File(f.lfn.replace(".png", "_mask.png")) for f in processed_test_files]
predict_job = Job(predict_masks)\
                .add_inputs(model, *processed_test_files)\
                .add_outputs(*predicted_masks)

wf.add_jobs(predict_job)

try:
    wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)

# run workflow
# log.info("begin workflow execution")
# wf.plan(submit=True, dir="runs")\
#     .wait()\
#     .analyze()\
#     .statistics()


# tc = TransformationCatalog().add_transformations(preprocess, data_split, train_model, predict_masks).write()

# file_list = []
# output_list = []

# rc = ReplicaCatalog()

# for file in glob.glob("./train_images/*.png"):
#     f = file.replace("./train_images/", '')
#     file_list.append(File(f))
#     rc.add_replica("local", File(f), Path("./train_images/").resolve() / f)
    
# for file in glob.glob("./train_masks/*.png"):
#     f = file.replace("./train_masks/", '')
#     file_list.append(File(f))
#     rc.add_replica("local", File(f), Path("./train_masks/").resolve() / f)
    
# for file in glob.glob("./test/*.png"):
#     f = file.replace("./test/", '')
#     file_list.append(File(f))
#     rc.add_replica("local", File(f), Path("./test/").resolve() / f)
    

    
# checkpoint_file = "study_checkpoint.pkl"

# if not os.path.isfile(checkpoint_file):
#     df = pd.DataFrame(list())
#     df.to_pickle(checkpoint_file)

# rc.add_replica("local", checkpoint_file, Path(".").resolve() / checkpoint_file)
    
# rc.write()

# for filename in glob.glob("./train_images/*.png"):
#     f = filename.replace("./train_images/", '').strip(".png")+"_norm.png"
#     output_list.append(File(f))


# for filename in glob.glob("./train_masks/*.png"):
#     f = filename.replace("./train_masks/", '').strip(".png")+"_norm.png"
#     output_list.append(File(f))

# for filename in glob.glob("./test/*.png"):
#     f = filename.replace("./test/", '').strip(".png")+"_norm.png"
#     output_list.append(File(f))

        
# wf = Workflow("preprocess")
    
# job_preprocess = Job(preprocess)\
#                     .add_inputs(*file_list)\
#                     .add_outputs(*output_list)

# data_split_file = File("data_split.pkl")

# job_data_split = Job(data_split)\
#                     .add_inputs(*output_list)\
#                     .add_outputs(data_split_file)

# model = File("model.h5")

# train_files = []

# for filename in glob.glob("./train_images/*.png"):
#     f = filename.replace("./train_images/", '').strip(".png")+"_norm.png"
#     train_files.append(File(f))


# for filename in glob.glob("./train_masks/*.png"):
#     f = filename.replace("./train_masks/", '').strip(".png")+"_norm.png"
#     train_files.append(File(f))

# job_train = Job(train_model)\
#                     .add_checkpoint(File(checkpoint_file), stage_out=True)\
#                     .add_inputs(*train_files, data_split_file)\
#                     .add_profiles(Namespace.PEGASUS, key="maxwalltime", value=1)\
#                     .add_outputs(model)

# test_list = []
# mask_output = []

# for filename in glob.glob("./test/*.png"):
#     f = filename.replace("./test/", '').strip(".png")+"_norm.png"
#     test_list.append(File(f))

# for filename in glob.glob("./test/*.png"):
#     f = filename.replace("./test/", '').strip(".png")+"_norm_mask.png"
#     mask_output.append(File(f))

# job_predict = Job(predict_masks)\
#                     .add_inputs(model, *test_list, data_split_file)\
#                     .add_outputs(*mask_output)


# wf.add_jobs(job_preprocess, job_data_split, job_train, job_predict)

# try:
#     wf.plan(submit=True)\
#         .wait()\
#         .analyze()\
#         .statistics()
# except PegasusClientError as e:
#     print(e.output)  


################
# pegasus-plan #
################
2021.02.08 20:11:13.963 UTC:
2021.02.08 20:11:13.969 UTC:   -----------------------------------------------------------------------
2021.02.08 20:11:13.975 UTC:   File for submitting this DAG to HTCondor           : lung-instance-segmentation-wf-0.dag.condor.sub
2021.02.08 20:11:13.980 UTC:   Log of DAGMan debugging messages                 : lung-instance-segmentation-wf-0.dag.dagman.out
2021.02.08 20:11:13.988 UTC:   Log of HTCondor library output                     : lung-instance-segmentation-wf-0.dag.lib.out
2021.02.08 20:11:13.995 UTC:   Log of HTCondor library error messages             : lung-instance-segmentation-wf-0.dag.lib.err
2021.02.08 20:11:14.002 UTC:   Log of the life of condor_dagman itself          : lung-instance-segmentation-wf-0.dag.dagman.log
2021.02.08 20:11:14.009 UTC:
2021.02.08 20:11:14.015 UTC:   -no_submit given, not submitting DAG to HTCondor.  You can do this with:
2021.02.08 20:11:14.026 UTC:   -------

[[1;32m##################################################[0m] 100.0% ..Success ([1;32mCompleted: 20[0m, [1;33mQueued: 0[0m, [1;36mRunning: 0[0m, [1;31mFailed: 0[0m)



####################
# pegasus-analyzer #
####################
Your database is compatible with Pegasus version: 5.0.0dev

************************************Summary*************************************

Submit Directory   : /home/scitech/shared-data/UNet_wf/scitech/pegasus/lung-instance-segmentation-wf/run0005
Total jobs         :     20 (100.00%)
# jobs succeeded   :     20 (100.00%)
# jobs failed      :      0 (0.00%)
# jobs held        :      0 (0.00%)
# jobs unsubmitted :      0 (0.00%)



######################
# pegasus-statistics #
######################
Your database is compatible with Pegasus version: 5.0.0dev

#
# Pegasus Workflow Management System - http://pegasus.isi.edu
#
# Workflow summary:
#   Summary of the workflow execution. It shows total
#   tasks/jobs/sub workflows run, how many succeeded/failed etc.
#   In case of hierarchical workflow the calculation shows the
#   statistics across all the sub workflows.It shows the following
#   statistics about tasks, jobs a