In [None]:
import itertools
import sagemaker
import pandas as pd
from sagemaker.pytorch import PyTorch
import os
from sagemaker.inputs import TrainingInput
import json
import time 

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

In [None]:
algorithm = "Coral"

In [None]:
task_name = "Parking"  

In [None]:
setting_name = "Tune100Exp3"

In [None]:
# Data must come from S3 (in the same region as the notebook instance is)

region = sagemaker_session.boto_region_name 

# datainput
if region == "eu-west-1":
    seattle = 's3://vwfs-pred-park-irland/input/open_data/seattle/train_data_with_trans_100_with_transaction.csv'
    bucket_name = 'vwfs-pred-park-irland'
    
elif region == "eu-central-1":
    seattle = 's3://bucket-vwfs-pred-park-global-model-serving-dev/input/open_data/seattle/train_data_with_trans_100_with_transaction.csv'
    bucket_name = 'bucket-vwfs-pred-park-global-model-serving-dev'

else:
    raise NotImplementedError("Region must be eu-west-1 or eu-central-1")

In [None]:
if not region == "eu-west-1":
    raise NotImplementedError("Hyperparameter tuning was performed in eu-west-1.")

#if not algorithm == "DFA":
#    raise NotImplementedError("Algorithm must be DFA")
    
if task_name == "Parking":
    # add the jobs
    tuner_job_list = ["Parking-coral-with-a-210912-1012"]
    training_input = {'seattle': seattle} # csv files from the seattle
    job_specific_params = ["source_only"] # this will take the params from the current tuning job
    
else:
    raise NotImplementedError("Task must be Parking")

In [None]:
s3_resources = {'ml.g4dn.xlarge' : 10,
               'ml.g4dn.2xlarge': 10} # needed for function to check which resources are available

In [None]:
# list of param in hyperparams, n experience >1
tune_params = ["lr", "batch_size" , "optimizer","include_pbp", "use_batchnorm", "hidden_dim", "output_dim", "lambda_coral"]
#tune_params = ["lr", "batch-size" , "optimizer", "use_batchnorm", "hidden_dim",'lambda_1','lambda_2', "output_dim"]
fixed_params = {
                "sm_mode": 1,
                "n_experiments": 20, # no. of experiments, eg. tuner jobs -> call main file -> no. experiments -> no. of epoch
                "max_epoch": 200,
                "patience": 10,
                "early_stop": 1
                }

In [None]:
# same as other notebook
entry_script = "train.py"
source_dir = '/home/ec2-user/SageMaker/mobility-predpark-global-ML/research/PyTorch-Deep-CORAL'

In [None]:
def start_single_training_job(instance, params, entry_script, job_name, output_path, training_input, source_dir):
    estimator= PyTorch(entry_point=entry_script,
                role=role,
                source_dir=source_dir,
                framework_version="1.4.0",
                py_version="py3",
                instance_count=1,
                instance_type=instance,
                hyperparameters=params,
                base_job_name=job_name,
                output_path = output_path)
    
    estimator.fit(training_input, wait=False)
    return estimator

In [None]:
def select_available_resource(s3_resources):
    avail_resource = [resource for resource, avail_num in s3_resources.items() if avail_num > 0]
    if len (avail_resource)==0:
        return -1
    select_resource = avail_resource[0]
    s3_resources[select_resource] = s3_resources[select_resource] -1
    return select_resource

In [None]:
def get_training_job_parameters(tuner_job, tune_params, job_specific_params, fixed_params):
    tuner = sagemaker.tuner.HyperparameterTuner.attach(tuner_job)
    # retrieve job specific from tuner
    job_specific_params = pd.DataFrame([tuner.describe()['TrainingJobDefinition']['StaticHyperParameters']])[job_specific_params].iloc[0].to_dict()
    print('job_specific_params', job_specific_params)
    job_specific_params['source_only'] = int(job_specific_params['source_only'])
    fixed_params = {**fixed_params, **job_specific_params}

    #from tuner.analystics, retrieve get best params
    res = tuner.analytics().dataframe()
    for col in res.columns:
        res[col] = res[col].map(lambda x: x.replace('"','') if type(x)==str else x)
    print(res.columns)
    print(tune_params)
    best_params = res.iloc[res.FinalObjectiveValue.argmax()][tune_params].to_dict()
    best_params['batch_size'] = int(best_params['batch_size'])
    best_params['use_batchnorm'] = int(best_params['use_batchnorm'])
    #best_params['include_pbp'] = int(best_params['include_pbp'])
    best_params['hidden_dim'] = int(best_params['hidden_dim'])
    best_params['output_dim'] = int(best_params['output_dim'])
    final_params = {**best_params, **fixed_params}
    #sagemaker saves the cate to str, now we need to remove it
    for key in final_params:
        final_params[key] = final_params[key].replace('"', '') if type(final_params[key]) == str else final_params[key]
    print(final_params)
    
    return final_params

In [None]:
def create_names(algorithm, source_only, bucket):
    if source_only: # if string was always
        task_name = f"source-only"
    else:
        task_name = f"domain-adaptation" # names needs to be- not _
        
    output_path = f"s3://{bucket}/research/{algorithm}/saved_model/{task_name}"
    training_job_name = f"{algorithm}-final-{task_name}-{setting_name}".replace(' ', '-')
    
    return output_path, training_job_name

In [None]:
def wait_for_resource(estimator_list):
    resource = -1
    max_wait_time = 600 
    for i in range(max_wait_time):
        for estimator in estimator_list:
            if estimator.latest_training_job.describe()['TrainingJobStatus'] == 'Completed':
                resource = estimator.latest_training_job.describe()['ResourceConfig']['InstanceType']
                estimator_list.remove(estimator)
                return resource
        time.sleep(60)

In [None]:
# Start the training jobs, result of whole loop will be a json file
estimator_list = []
name_dict = {'estimators': []}
for tuner_job in tuner_job_list:
    #for iterator in iterator_list:
    #    print(f"starting new job: {iterator}")
    params = get_training_job_parameters(tuner_job, tune_params, job_specific_params, fixed_params)
    print(params)
    resource = select_available_resource(s3_resources)
    if resource == -1:
        resource = wait_for_resource(estimator_list)
    # create names
    output_path, training_job_name = create_names(algorithm, params['source_only'],bucket_name)
    print('output_path')
    print(output_path)
    print('training_job_name')
    print(training_job_name)
    estimator = start_single_training_job(resource, params, entry_script, training_job_name, output_path, training_input, source_dir)
    estimator_list.append(estimator)
    output_path_name = estimator.output_path
    estimator_job_name = estimator.latest_training_job.describe()['TrainingJobName']
    estimator_path = '/'.join([output_path_name, estimator_job_name])
    name_dict['estimators']= name_dict['estimators'] + [estimator_path]
    with open(f'{task_name}-{algorithm}-jobs.json', 'w') as f:
        json.dump(name_dict, f)