In [1]:
#!/usr/bin/env python3
import os
import logging
from pathlib import Path
import requests 
from glob import glob
from zipfile import ZipFile
import pickle
import pandas as pd
from sklearn.model_selection import train_test_split
from workflow_utils import parallelize_jobs,create_data_split
import sys
import argparse

logging.basicConfig(level=logging.DEBUG)

#Import Pegasus API
from Pegasus.api import *

def parse_args(args):
    parser = argparse.ArgumentParser(description='Cat and Dog image classification using Keras')
    parser.add_argument('-workers',  metavar='num_workers', type=int, default = 2, help = "Number of available workers")
    parser.add_argument('-f')
    return parser.parse_args()

args = parse_args(sys.argv[1:])
WORKERS = args.workers

props = Properties()
props["pegasus.mode"] = "development"
props.write()

#Replica Catalog
rc = ReplicaCatalog()
input_files = glob('*.jpg')
input_files.sort()
in_files=[]

hpo_checkpoint_file = 'hpo_checkpoint.pkl'
if not os.path.isfile(hpo_checkpoint_file):
    df = pd.DataFrame(list())
    df.to_csv(hpo_checkpoint_file)
    
checkpoint_file = 'checkpoint_file.hdf5'
if not os.path.isfile(checkpoint_file):
    df = pd.DataFrame(list())
    df.to_csv(checkpoint_file)
    
for file in input_files:
    in_files.append(File(file))
    rc.add_replica("local", File(file), str(Path(".").resolve() / file))  
rc.add_replica("local", checkpoint_file, Path(".").resolve() / checkpoint_file)
rc.add_replica("local", hpo_checkpoint_file, Path(".").resolve() / hpo_checkpoint_file)
rc.write()

#Transformation
pre_process_resize = Transformation( "resize.py",
            site="local",
            pfn="/home/scitech/shared-data/CatsAndDogs2/resize.py",
            is_stageable=True
            ) 

pre_process_augment = Transformation( "augment.py",
            site="local",
            pfn="/home/scitech/shared-data/CatsAndDogs2/augment.py",
            is_stageable=True
            )
hpo  = Transformation( "hpo_checkpointing.py",
            site="local",
            pfn="/home/scitech/shared-data/CatsAndDogs2/hpo_checkpointing.py",
            is_stageable=True
            )
vgg_model  = Transformation( "VGG_model.py",
            site="local",
            pfn="/home/scitech/shared-data/CatsAndDogs2/VGG_model.py",
            is_stageable=True
            )
test_model =  Transformation( "Test.py",
            site="local",
            pfn="/home/scitech/shared-data/CatsAndDogs2/Test.py",
            is_stageable=True
            )
tc = TransformationCatalog()\
    .add_transformations(pre_process_resize, pre_process_augment, hpo, vgg_model)\
    .write()

#Workflow
wf = Workflow("Cats_and_Dogs", infer_dependencies=True)

#Run the resize job in parallel by specifying the number of workers available.
parallel_resize = parallelize_jobs(WORKERS,pre_process_resize, in_files, 'Resize')

all_files = list()
for job in parallel_resize:
    all_files.extend(job.get_outputs())
    
input_images = []
for f in all_files:
    input_images.append(f.lfn)

#Split the resized images into train, test and validation dataset
train_data, test_data, validation_data = create_data_split(input_images)

#Run the augmentation job on train dataset by specifying the number of workers available.
parallel_augment = parallelize_jobs(WORKERS,pre_process_augment, train_data, 'Augment')

augmented_files = list()
for job in parallel_augment:
    augmented_files.extend(job.get_outputs())
    
#Train the model and find the best hyperparameters using optuna.
hpo_file = File('hpo_results.pkl')
job_hpo = Job(hpo)\
                    .add_checkpoint(File(hpo_checkpoint_file), stage_out=True)\
                    .add_inputs(*augmented_files,*validation_data)\
                    .add_profiles(Namespace.PEGASUS, key="maxwalltime", value=1)\
                    .add_outputs(hpo_file)

#Train the model using the hyperparameters obtained in the previous job.
model_json = File('model.json')
model_file = File('model.h5')
job_vgg_model = Job(vgg_model)\
                    .add_args("-epochs",6, "--batch_size",2)\
                    .add_checkpoint(File(checkpoint_file), stage_out=True)\
                    .add_inputs(*augmented_files,*validation_data,hpo_file)\
                    .add_profiles(Namespace.PEGASUS, key="maxwalltime", value=1)\
                    .add_outputs(model_json,model_file)

#Test the model to get various evalution metrics
results_file = File('Result_Metrics.txt')
job_test_model = Job(test_model)\
                    .add_inputs(*test_data,model_json,model_file)\
                    .add_outputs(results_file)


wf.add_jobs(*parallel_resize,*parallel_augment,job_hpo,job_vgg_model)  

<Pegasus.api.workflow.Workflow at 0x7f618a5074e0>

In [2]:
try:
     wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)


################
# pegasus-plan #
################
[main] WARN  schema.JsonMetaSchema  - Unknown keyword $defs - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[main] WARN  schema.JsonMetaSchema  - Unknown keyword additionalItems - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[main] WARN  schema.JsonMetaSchema  - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021.05.06 22:27:05.718 UTC:
2021.05.06 22:27:05.724 UTC:   -----------------------------------------------------------------------
2021.05.06 22:27:05.730 UTC:   File for submitting this DAG to HTCondor           : Cats_and_Dogs-0.dag.condor.sub
2021.05.06 22:27:05.735 UTC:   Log of DAGMan debugging messages                 : Cats_and_Dogs-0.dag.dagman.out
2021.05.06 22:27:05.741 UTC:   Log of 

[[1;32m##################################################[0m] 100.0% ..Success ([1;32mCompleted: 28[0m, [1;33mQueued: 0[0m, [1;36mRunning: 0[0m, [1;31mFailed: 0[0m)



####################
# pegasus-analyzer #
####################
Your database is compatible with Pegasus version: 5.0.0dev

************************************Summary*************************************

Submit Directory   : /home/scitech/shared-data/CatsAndDogs2/scitech/pegasus/Cats_and_Dogs/run0015
Total jobs         :     28 (100.00%)
# jobs succeeded   :     28 (100.00%)
# jobs failed      :      0 (0.00%)
# jobs held        :      0 (0.00%)
# jobs unsubmitted :      0 (0.00%)



######################
# pegasus-statistics #
######################
Your database is compatible with Pegasus version: 5.0.0dev

#
# Pegasus Workflow Management System - http://pegasus.isi.edu
#
# Workflow summary:
#   Summary of the workflow execution. It shows total
#   tasks/jobs/sub workflows run, how many succeeded/failed etc.
#   In case of hierarchical workflow the calculation shows the
#   statistics across all the sub workflows.It shows the following
#   statistics about tasks, jobs and sub work