In [1]:
import logging, os, sys
import numpy as np
import urllib.request
import shutil
import subprocess
import zipfile
import pickle
from pathlib import Path
import pandas as pd
from Pegasus.api import *

In [2]:
from util_workflow import add_input_wf_files
from util_workflow import create_file_objects, create_file_objects_postfix, create_file_objects_postfix_range

from util_workflow import download_data, unzip_flatten_data, return_corrupted_files, return_input_files
from util_workflow import split_data_filenames, add_input_tune_model,create_tar_and_pkl, create_pkl

# --- Import Pegasus API ---
from Pegasus.api import *
logging.basicConfig(level=logging.DEBUG)
props = Properties()
props["dagman.retry"] = "2"
props["pegasus.transfer.arguments"] = "-m 1"
props.write()

In [3]:
dataset_link = "https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip"
zip_data = "kagglecatsanddogs_3367a.zip"
directory_to_extract_to = "."

DOWNLOAD_DATA = False
DATASET_SIZE = 12
DATA_DIR = "dev_data/"
UTILS_DIR = "utils/"

arch_names = ["basicnet", "densenet121", "vgg16"]
CATS = "PetImages/Cat"
DOGS = "PetImages/Dog"
LABELS = {CATS: 0, DOGS: 1}
NUM_EPOCHS = 4
NUM_TRIALS = 3
NUM_WORKERS = 8
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)   
    
if DOWNLOAD_DATA == True:
    download_data(dataset_link)

In [4]:
# Avoid corrupted files for now
corrupted_files = return_corrupted_files("corrupted_files.txt")
# Get names of image files that will serve as inputs to the workflow
input_file_names = return_input_files(corrupted_files, DATASET_SIZE, DATA_DIR, LABELS)

In [5]:
#train 70%, val 10% and test 20%
train_filenames,val_filenames,test_filenames, files_split_dict = split_data_filenames(input_file_names)

workers_ids = [x for x in range(NUM_WORKERS)]
train_workers, val_workers, test_workers, _ = split_data_filenames(workers_ids)

In [6]:
rc = ReplicaCatalog()

# TRAIN, VAL and TEST data are processed separately

# Pegasus File objects are created and ADDED to the replica catalog
input_preprocess1_train = add_input_wf_files(train_filenames, DATA_DIR,rc)
input_preprocess1_val   = add_input_wf_files(val_filenames, DATA_DIR,rc)
input_preprocess1_test  = add_input_wf_files(test_filenames, DATA_DIR,rc)

# Create FILE objects with correct postfixes
postfix_job1 = "_proc1.jpg"
output_preprocess1_train = create_file_objects_postfix(train_filenames,postfix_job1)
output_preprocess1_val   = create_file_objects_postfix(val_filenames,postfix_job1)
output_preprocess1_test  = create_file_objects_postfix(test_filenames,postfix_job1)


range_num = 4
postfix = "_proc2_"
output_preprocess2_train = create_file_objects_postfix_range(train_filenames,"train_", postfix, range_num)
output_preprocess2_val = create_file_objects_postfix_range(val_filenames,"val_", postfix, range_num)
output_preprocess2_test = create_file_objects_postfix_range(test_filenames,"test_", postfix, range_num)

rc.write()

In [7]:
# Create and add our transformations to the TransformationCatalog.
tc = TransformationCatalog()

# Data preprocessing part 1 
preprocess_tc1 = Transformation(
                "preprocess1",
                site="local",
                pfn = str(Path(".").parent.resolve() / "bin/data_preprocessing1.py"), 
                is_stageable= True
            )

# Data preprocessing part 1 
preprocess_tc2 = Transformation(
                "preprocess2",
                site="local",
                pfn = str(Path(".").parent.resolve() / "bin/data_preprocessing2_args.py"), 
               is_stageable= True
            )

tc.add_transformations(preprocess_tc1, preprocess_tc2)#tune_model_vgg16,choose_best_model)
tc.write()

In [8]:
def add_job_attributes(jobs_list, input_files, output_files):
    for i in range(len(input_files)):
        curr = i % len(jobs_list)
        jobs_list[curr].add_inputs(input_files[i])
        jobs_list[curr].add_outputs( output_files[i])
    return jobs_list


def add_job_attributes_range(jobs_list, input_files, output_files,range_num,args):
    for i in range(len(input_files)):
        curr = i % len(jobs_list)
        jobs_list[curr].add_inputs(input_files[i])
        jobs_list[curr].add_args(args)
        j = i*range_num
        jobs_list[curr].add_outputs(*output_files[j:(j+range_num)])
    return jobs_list 

In [9]:
# create the preproces jobs
preprocess1_jobs_train = [Job(preprocess_tc1) for i in train_workers]
preprocess1_jobs_val   = [Job(preprocess_tc1) for i in val_workers]
preprocess1_jobs_test  = [Job(preprocess_tc1) for i in test_workers]

In [10]:
preprocess1_jobs_train = add_job_attributes(preprocess1_jobs_train, \
                                            input_preprocess1_train,output_preprocess1_train)

preprocess1_jobs_val = add_job_attributes(preprocess1_jobs_val, \
                                            input_preprocess1_val,output_preprocess1_val)

preprocess1_jobs_test = add_job_attributes(preprocess1_jobs_test, \
                                            input_preprocess1_test,output_preprocess1_test)

In [11]:
preprocess2_jobs_train = [Job(preprocess_tc2) for i in train_workers]
#preprocess2_jobs_val   = [Job(preprocess_tc2) for i in val_workers]
#preprocess2_jobs_test  = [Job(preprocess_tc2) for i in test_workers]

In [13]:
preprocess2_jobs_train = add_job_attributes_range(preprocess2_jobs_train, \
                                            output_preprocess1_train,output_preprocess2_train,range_num,"train")

#preprocess2_jobs_val = add_job_attributes_range(preprocess2_jobs_val, \
#                                            output_preprocess1_val,output_preprocess2_val,range_num,"val")

#preprocess2_jobs_test = add_job_attributes_range(preprocess2_jobs_test, \
#                                            output_preprocess1_test,output_preprocess2_test,range_num,"test")

In [15]:
# Set infer_dependencies=True so that they are inferred based on job input and output file usage.
wf = Workflow("catVsdog-test-wf", infer_dependencies=True)
wf.add_jobs(*preprocess1_jobs_train,*preprocess1_jobs_val,*preprocess1_jobs_test,*preprocess2_jobs_train)
 #          *preprocess2_jobs_train,*preprocess2_jobs_val, *preprocess2_jobs_test )


<Pegasus.api.workflow.Workflow at 0x7f41e1052978>

In [16]:
try:
    wf.plan(submit=True)\
    .wait()\
    .analyze()\
    .statistics()
except PegasusClientError as e:
    print(e.output)


################
# pegasus-plan #
################
2021.02.10 06:36:58.543 UTC:
2021.02.10 06:36:58.549 UTC:   -----------------------------------------------------------------------
2021.02.10 06:36:58.554 UTC:   File for submitting this DAG to HTCondor           : catVsdog-test-wf-0.dag.condor.sub
2021.02.10 06:36:58.559 UTC:   Log of DAGMan debugging messages                 : catVsdog-test-wf-0.dag.dagman.out
2021.02.10 06:36:58.564 UTC:   Log of HTCondor library output                     : catVsdog-test-wf-0.dag.lib.out
2021.02.10 06:36:58.569 UTC:   Log of HTCondor library error messages             : catVsdog-test-wf-0.dag.lib.err
2021.02.10 06:36:58.574 UTC:   Log of the life of condor_dagman itself          : catVsdog-test-wf-0.dag.dagman.log
2021.02.10 06:36:58.580 UTC:
2021.02.10 06:36:58.585 UTC:   -no_submit given, not submitting DAG to HTCondor.  You can do this with:
2021.02.10 06:36:58.595 UTC:   -----------------------------------------------------------------------


[[1;32m########################[0m------------]  66.7% ..Running ([1;32mCompleted: 18[0m, [1;33mQueued: 0[0m, [1;36mRunning: 3[0m, [1;31mFailed: 0[0m)


####################
# pegasus-analyzer #
####################



Cancelling Client.wait(). Your workflow is still running and can be monitored with pegasus-status


Your database is compatible with Pegasus version: 5.1.0dev

************************************Summary*************************************

Submit Directory   : /home/scitech/shared-data/pegasus-catdog-wf-master/scitech/pegasus/catVsdog-test-wf/run0012
Total jobs         :     27 (100.00%)
# jobs succeeded   :     18 (66.67%)
# jobs failed      :      0 (0.00%)
# jobs held        :      3 (11.11%)
# jobs unsubmitted :      9 (33.33%)

*******************************Held jobs' details*******************************


submit file            : preprocess2_ID0000011.sub
last_job_instance_id   : 20
reason                 :  Error from slot3@cae6425c697f: STARTER at 127.0.0.1 failed to send file(s) to <127.0.0.1:9618>: error reading from /var/lib/condor/execute/dir_126176/train_Cat_8224_proc2_2.jpg: (errno 2) No such file or directory; SHADOW failed to receive file(s) from <127.0.0.1:37557>


submit file            : preprocess2_ID0000010.sub
last_job_instance_id   : 17
reason             


pegasus-monitord still running. Please wait for it to complete.

