In [None]:
!sudo pip3 install --upgrade pip keras==2.1.5 tensorflow==1.13.1 numpy pandas pillow sklearn optuna scikit-image  optkeras h5py==2.10.0

In [5]:
#!/usr/bin/env python3
import os
import logging
from pathlib import Path
import requests 
from glob import glob
from zipfile import ZipFile
import pickle
import pandas as pd

logging.basicConfig(level=logging.DEBUG)

#Import Pegasus API
from Pegasus.api import *

#Properties
props = Properties()
props["dagman.retry"] = "100"
props["pegasus.transfer.arguments"] = "-m 1"
props.write()

#Replica Catalog
rc = ReplicaCatalog()
input_files = glob('*.jpg')
input_files.sort()
in_files=[]

checkpoint_file = "checkpoint_file2.hdf5"
if not os.path.isfile(checkpoint_file):
    df = pd.DataFrame(list())
    df.to_csv(checkpoint_file)
    
hpo_checkpoint_file = 'hpo_checkpoint.pkl'
if not os.path.isfile(hpo_checkpoint_file):
    df = pd.DataFrame(list())
    df.to_csv(hpo_checkpoint_file)
    
for file in input_files:
    in_files.append(File(file))
    rc.add_replica("local", File(file), str(Path(".").resolve() / file))  
rc.add_replica("local", checkpoint_file, Path(".").resolve() / checkpoint_file)
rc.add_replica("local", hpo_checkpoint_file, Path(".").resolve() / hpo_checkpoint_file)
rc.write()


#Transformation
tools_container = Container(
                    "tools-container", 
                    Container.DOCKER, 
                    image="docker:///ssrujanaa/catsanddogs:latest"
                )

pre_process_resize = Transformation( "preprocess1.py",
            site="local",
            pfn="/home/scitech/shared-data/CatsAndDogs/preprocess1.py",
            is_stageable=True
            )

pre_process_augment = Transformation( "Augmentation.py",
            site="condorpool",
            pfn="/usr/bin/Augmentation.py",
            is_stageable=False,
            container=tools_container
            )

data_split  = Transformation( "Data_Split.py",
            site="condorpool",
            pfn="/usr/bin/Data_Split.py",
            is_stageable=False,
            container=tools_container
            )


hpo  = Transformation( "hpo_checkpointing.py",
            site="condorpool",
            pfn="/usr/bin/hpo_checkpointing.py",
            is_stageable=False,
            container=tools_container
            )

vgg_model  = Transformation( "VGG_model.py",
            site="condorpool",
            pfn="/usr/bin/VGG_model.py",
            is_stageable=False,
            container=tools_container
            )

test_model =  Transformation( "Test.py",
            site="local",
            pfn="/home/scitech/shared-data/CatsAndDogs/Test.py",
            is_stageable=True
            )
                    
tc = TransformationCatalog()\
    .add_containers(tools_container)\
    .add_transformations(pre_process_resize,pre_process_augment,data_split,hpo,vgg_model,test_model)\
    .write()

#Workflow
wf = Workflow("Cats_and_Dogs", infer_dependencies=True)


resized_images = File('resized_images.txt')
all_files = [File("resized_{}".format(f.lfn)) for f in in_files]
labels = File('labels.txt')

job_preprocess1 = Job(pre_process_resize)\
                    .add_inputs(*in_files)\
                    .add_outputs(*all_files,resized_images,labels) 

aug_images_txt = File('augmentation.txt')
aug_labels_txt = File('aug_labels.txt')
augmented_files = []
for f in all_files:
    augmented_files.extend([File(str(f).replace("{}".format(os.path.splitext(str(f))[0]), "Aug_{}_{}".format(os.path.splitext(str(f))[0],i))) for i in range(3)])

    
job_preprocess2 = Job(pre_process_augment)\
                    .add_inputs(*all_files,labels)\
                    .add_outputs(aug_images_txt,aug_labels_txt,*augmented_files)

training_data = File('training.pkl')
testing_data = File('testing.pkl')
val_data = File('validation.pkl')

job_data_split = Job(data_split)\
                    .add_inputs(*augmented_files,labels)\
                    .add_outputs(training_data,testing_data,val_data)

model = File('model.h5')
output_file = File('hpo_results.pkl')
job_hpo = Job(hpo)\
                    .add_checkpoint(File(hpo_checkpoint_file), stage_out=True)\
                    .add_inputs(*augmented_files,training_data,testing_data,val_data)\
                    .add_profiles(Namespace.PEGASUS, key="maxwalltime", value=1)\
                    .add_outputs(output_file)

job_vgg_model = Job(vgg_model)\
                    .add_args("-epochs",6, "--batch_size",2)\
                    .add_checkpoint(File(checkpoint_file), stage_out=True)\
                    .add_inputs(*augmented_files,training_data,testing_data,val_data,output_file)\
                    .add_profiles(Namespace.PEGASUS, key="maxwalltime", value=1)\
                    .add_outputs(model)

results_file = File('Result_Metrics.txt')
job_test_model = Job(test_model)\
                    .add_inputs(*augmented_files,testing_data,model)\
                    .add_outputs(results_file)

wf.add_jobs(job_preprocess1,job_preprocess2,job_data_split,job_hpo,job_vgg_model,job_test_model)                                    

<Pegasus.api.workflow.Workflow at 0x7fe8b300eac8>

In [6]:
try:
     wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)


################
# pegasus-plan #
################
[main] WARN  schema.JsonMetaSchema  - Unknown keyword $defs - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[main] WARN  schema.JsonMetaSchema  - Unknown keyword additionalItems - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[main] WARN  schema.JsonMetaSchema  - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2020.11.17 20:16:06.625 UTC:
2020.11.17 20:16:06.631 UTC:   -----------------------------------------------------------------------
2020.11.17 20:16:06.638 UTC:   File for submitting this DAG to HTCondor           : Cats_and_Dogs-0.dag.condor.sub
2020.11.17 20:16:06.645 UTC:   Log of DAGMan debugging messages                 : Cats_and_Dogs-0.dag.dagman.out
2020.11.17 20:16:06.651 UTC:   Log of 

[[1;32m##################################################[0m] 100.0% ..Success ([1;32mCompleted: 32[0m, [1;33mQueued: 0[0m, [1;36mRunning: 0[0m, [1;31mFailed: 0[0m)



####################
# pegasus-analyzer #
####################
Your database is compatible with Pegasus version: 5.0.0dev

************************************Summary*************************************

Submit Directory   : /home/scitech/shared-data/CatsAndDogs/scitech/pegasus/Cats_and_Dogs/run0003
Total jobs         :     32 (100.00%)
# jobs succeeded   :     32 (100.00%)
# jobs failed      :      0 (0.00%)
# jobs held        :      0 (0.00%)
# jobs unsubmitted :      0 (0.00%)



######################
# pegasus-statistics #
######################
Your database is compatible with Pegasus version: 5.0.0dev

#
# Pegasus Workflow Management System - http://pegasus.isi.edu
#
# Workflow summary:
#   Summary of the workflow execution. It shows total
#   tasks/jobs/sub workflows run, how many succeeded/failed etc.
#   In case of hierarchical workflow the calculation shows the
#   statistics across all the sub workflows.It shows the following
#   statistics about tasks, jobs and sub workf

In [None]:
pwd


In [None]:
#!/usr/bin/env python3
# coding: utf-8
import pickle
import signal
from os import listdir
from numpy import asarray
from numpy import save
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
import numpy as np
from keras import layers
from keras.layers import Input,Dense,BatchNormalization,Flatten,Dropout,GlobalAveragePooling2D
from keras.models import Model, load_model
from keras.utils import layer_utils
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from keras.callbacks import CSVLogger
import keras.backend as K
import traceback
from keras.applications.vgg16 import VGG16
from keras.models import Model,load_model
import pandas as pd
import h5py
import sys
import joblib
import argparse
import keras
import tensorflow as tf

def parse_args(args):
    parser = argparse.ArgumentParser(description='Cat and Dog image classification using Keras')
    parser.add_argument('-epochs',  metavar='num_epochs', type=int, default = 5, help = "Number of training epochs")
    parser.add_argument('-batch_size',  metavar='batch_size', type=int, default = 16, help = "Batch Size")
    parser.add_argument('-f')
    return parser.parse_args()

#get training, testing and validation data from the saved pickle files.
def get_data():
    with open('training.pkl', 'rb') as f:
         train = pickle.load(f)

    with open('testing.pkl', 'rb') as f:
         test = pickle.load(f)

    with open('validation.pkl','rb') as f:
        val = pickle.load(f)

    train_photos, train_labels = list(), list()
    tp = list()
    for file in train:
        if 'Cat' in file:
            output = 1.0
        else:
            output = 0.0
        photo = load_img(file)
        photo = img_to_array(photo)
        train_photos.append(photo)
        train_labels.append(output)
    train_photos = asarray(train_photos)
    train_labels = asarray(train_labels)

    test_photos, test_labels = list(), list()
    for file in test:
        if 'Cat' in file:
            output = 1.0
        else:
            output = 0.0
        photo = load_img(file)
        photo = img_to_array(photo)
        tp.append(photo)
        test_photos.append(photo)
        test_labels.append(output)
    test_photos = asarray(test_photos)
    test_labels = asarray(test_labels)

    val_photos, val_labels = list(), list()
    for file in val:
        if 'Cat' in file:
            output = 1.0
        else:
            output = 0.0
        photo = load_img(file)
        photo = img_to_array(photo)
        val_photos.append(photo)
        val_labels.append(output)
    val_photos = asarray(val_photos)
    val_labels = asarray(val_labels)
    return train_photos,train_labels,test_photos,test_labels,val_photos,val_labels

#Definition of the VGG16 model and changing the output layer according to our requirements.
#i.e., 2 output classes
def get_model():
    nb_classes = 2
    
    Study = joblib.load('hpo_results.pkl')
    _dict = Study.best_trial.params
    activation_optuna = _dict['activation']
    optimizer_optuna = _dict['optimizer']
    
    vgg16_model = VGG16(weights = 'imagenet', include_top = False)
    x = vgg16_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    predictions = Dense(nb_classes, activation = activation_optuna)(x)
    model = Model(input = vgg16_model.input, output = predictions)

    for layer in vgg16_model.layers:
        layer.trainable = False
    model.compile(optimizer = optimizer_optuna,loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
    return model,optimizer_optuna


def main():
    train_photos,train_labels,test_photos,test_labels,val_photos,val_labels = get_data()
    model,optimizer_optuna = get_model()
    
    #checkpoint file that saves the weights after each epoch - weights are overwritten to the same file
    checkpoint_file = 'checkpoint_file2.hdf5'
    checkpoint = ModelCheckpoint(checkpoint_file, monitor='loss', verbose=1, mode='auto',save_weights_only = True, period=1)
    
    train_from_beginning = False
    try:
        #Since our hdf5 file contains additional data = epochs, skip_mismatch is used to avoid that column
        model.load_weights("checkpoint_file2.hdf5",skip_mismatch=True)
        with h5py.File('checkpoint_file2.hdf5', "r+") as file:
            data = file.get('epochs')[...].tolist()
            
        #loading the number of epochs already performed to resume training from that epoch
        initial_epoch = data
        model.compile(optimizer = optimizer_optuna,loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
        for i in range(initial_epoch,epochs):
            model.fit(x=train_photos, y=train_labels,batch_size=BATCH_SIZE , epochs=1, verbose=1,
                      validation_data=(val_photos,val_labels), callbacks = [checkpoint])
            checkpoint = ModelCheckpoint(checkpoint_file, monitor='loss', verbose=1, mode='auto',
                                         save_weights_only = True, period=1)
            
            #saving the number of finished epochs to the same hdf5 file
            with h5py.File('checkpoint_file2.hdf5', "a") as file:
                file['epochs'] = i
    except OSError:
        train_from_beginning = True

    if train_from_beginning:
        model.compile(optimizer = 'rmsprop',loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
        for i in range(EPOCHS):
            model.fit(x=train_photos, y=train_labels,batch_size=BATCH_SIZE , epochs=1, 
                           verbose=1,validation_data=(val_photos,val_labels), callbacks = [checkpoint])
            checkpoint = ModelCheckpoint(checkpoint_file, monitor='loss', verbose=1, mode='auto',save_weights_only = True, period=1)
            #saving the number of finished epochs to the same hdf5 file
            with h5py.File('checkpoint_file2.hdf5', "a") as file:
                file['epochs']=i

    model.save('model.h5')
    return 0
    
if __name__ == '__main__':
    global EPOCHS
    global BATCH_SIZE
    args = parse_args(sys.argv[1:])
    EPOCHS = args.epochs
    BATCH_SIZE = args.batch_size
    main()