In [None]:
import os
from datetime import datetime

import pandas as pd

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn

In [None]:
# Functions
def build_dfp_jobs(jobs):
    informations = [job.describe() for job in jobs]
    return pd.DataFrame(informations).sort_values(['CreationTime'], ascending=False)

def collect_infos_from_output_config(output_config, keyword):
    outputs = output_config['Outputs']
    for output in outputs:
        if keyword in output['OutputName']:
            return output
    return {}

# these functions are just collecting the first jobs that is completed, nothing perfect 
def collect_output_config_processor(dfp_jobs_processor, prefix):
    for idx, row in dfp_jobs_processor[dfp_jobs_processor['ProcessingJobStatus'] == 'Completed'].iterrows():
        # Taking of the one of the laste completed execution for the 0_process job
        if prefix in row['ProcessingJobName']:
            return row['ProcessingOutputConfig']
    return ''

def collect_output_config_trainer(dfp_jobs_processor, prefix):
    # Collect the informations that can be useful for the evaluation and coring features
    for idx, row in dfp_jobs_trainer[dfp_jobs_trainer['TrainingJobStatus'] == 'Completed'].iterrows():
        # Taking of the one of the laste completed execution for the 0_process job
        name_train_job = row['TrainingJobName']
        if prefix in name_train_job:
            output_config_train_job = row['OutputDataConfig']
            return name_train_job, output_config_train_job
    
    return '', ''

In [None]:
# Set some region and role for the execution of the jobs
region = boto3.session.Session().region_name
role = get_execution_role()

In [None]:
# Define an operator sklearn to process (give some infos on the type of machine, what role and the version of sklearn to use)
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=1)

In [None]:
# Setup the location of the data to process
input_data_ml = f'{os.environ["AWS_SAGEMAKER_S3_LOCATION"]}/data/dataset_ml.csv'
input_data_to_score = f'{os.environ["AWS_SAGEMAKER_S3_LOCATION"]}/data/dataset_toscore.csv'

## Process the data

In [None]:
# build the job to prpcess the data
job_name = f'0-process-{datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")}'

# Execute a new job to process the data with the code define in the script
sklearn_processor.run(code='jobs/0_process.py',
                      inputs=[
                          ProcessingInput(# Define the location of the dataset for the ml part, and where it will be stored on the container
                              source=input_data_ml,
                              destination='/opt/ml/processing/input/ml'),
                          ProcessingInput(# Define the location of the dataset for the scoring part, and where it will be stored on the container
                              source=input_data_to_score,
                              destination='/opt/ml/processing/input/toscore')],
                      outputs=[# Define the various outputs of the processing job (train, test and score datasets)
                          ProcessingOutput(
                              source='/opt/ml/processing/train',
                              output_name='train_data'),
                          ProcessingOutput(
                              source='/opt/ml/processing/test', 
                              output_name='test_data'),
                          ProcessingOutput(
                              source='/opt/ml/processing/score',
                              output_name='score_data')],
                      arguments=['--test_size', '0.21'],# Define some arguments to processing job
                      job_name = job_name)# Build an human understandable name for the job

In [None]:
# Build a pandas dataframe to store the data on the execution of the jobs
dfp_jobs_processor = build_dfp_jobs(sklearn_processor.jobs)

In [None]:
output_config_process_job = collect_output_config_processor(dfp_jobs_processor, '0-process')
output_config_process_job

## Train a model

In [None]:
# build a training job
sklearn_trainer = SKLearn(
    entry_point='jobs/1_train.py',
    train_instance_type="ml.m4.xlarge",
    role=role)

In [None]:
# Train a model based on the output of the previous job
infos_train_data = collect_infos_from_output_config(output_config_process_job, 'train_data')
if infos_train_data != {}:
    job_name = f'1-train-{datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")}'
    sklearn_trainer.fit(
        {'train': infos_train_data['S3Output']['S3Uri']},
        job_name = job_name
    )
else:
    print(f'Check the variable output_config_process_job that seems to miss some informations:{output_config_process_job}')

In [None]:
dfp_jobs_trainer = build_dfp_jobs(sklearn_trainer.jobs)
name_train_job, output_config_train_job = collect_output_config_trainer(dfp_jobs_trainer, '1-train')

In [None]:
# Get the location fo the model selected
model_data_s3_uri = '{}{}/{}'.format(
    output_config_train_job['S3OutputPath'],
    name_train_job,
    'output/model.tar.gz')

## Evaluate a model

In [None]:
infos_test_data = collect_infos_from_output_config(output_config_process_job, 'test_data')

job_name = job_name = f'2-evaluate-{datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")}'
sklearn_processor.run(code='jobs/2_evaluate.py',
                      inputs=[ProcessingInput(
                                  source=model_data_s3_uri,
                                  destination='/opt/ml/processing/model'),
                              ProcessingInput(
                                  source=infos_test_data['S3Output']['S3Uri'],
                                  destination='/opt/ml/processing/test')],
                      outputs=[ProcessingOutput(output_name='evaluation',
                                  source='/opt/ml/processing/evaluation')],
                      job_name = job_name
                     )                    

## Score some data

In [None]:
infos_score_data = collect_infos_from_output_config(output_config_process_job, 'score_data')

job_name = job_name = f'3-score-{datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")}'
sklearn_processor.run(code='jobs/3_score.py',
                      inputs=[ProcessingInput(
                                  source=model_data_s3_uri,
                                  destination='/opt/ml/processing/model'),
                              ProcessingInput(
                                  source=infos_score_data['S3Output']['S3Uri'],
                                  destination='/opt/ml/processing/score')],
                      outputs=[ProcessingOutput(output_name='predictions',
                                  source='/opt/ml/processing/predictions')],
                      job_name = job_name
                     )               

## Loop the jobs

In [None]:
for i in range(10):
    print(f'Execution {i}')
    sklearn_processor.run(code='jobs/0_process.py',
                      inputs=[
                          ProcessingInput(# Define the location of the dataset for the ml part, and where it will be stored on the container
                              source=input_data_ml,
                              destination='/opt/ml/processing/input/ml'),
                          ProcessingInput(# Define the location of the dataset for the scoring part, and where it will be stored on the container
                              source=input_data_to_score,
                              destination='/opt/ml/processing/input/toscore')],
                      outputs=[# Define the various outputs of the processing job (train, test and score datasets)
                          ProcessingOutput(
                              source='/opt/ml/processing/train',
                              output_name='train_data'),
                          ProcessingOutput(
                              source='/opt/ml/processing/test', 
                              output_name='test_data'),
                          ProcessingOutput(
                              source='/opt/ml/processing/score',
                              output_name='score_data')],
                      arguments=['--test_size', '0.21'],# Define some arguments to processing job
                      job_name = f'0-process-{datetime.utcnow().strftime("%Y%m%d")}-execution{i}')# Build an human understandable name for the job
    sklearn_trainer.fit(
        {'train': infos_train_data['S3Output']['S3Uri']},
        job_name = f'1-train-{datetime.utcnow().strftime("%Y%m%d")}-execution{i}'
    )
    sklearn_processor.run(code='jobs/2_evaluate.py',
                      inputs=[ProcessingInput(
                                  source=model_data_s3_uri,
                                  destination='/opt/ml/processing/model'),
                              ProcessingInput(
                                  source=infos_test_data['S3Output']['S3Uri'],
                                  destination='/opt/ml/processing/test')],
                      outputs=[ProcessingOutput(output_name='evaluation',
                                  source='/opt/ml/processing/evaluation')],
                      job_name = f'2-evaluate-{datetime.utcnow().strftime("%Y%m%d")}-execution{i}')        
    sklearn_processor.run(code='jobs/3_score.py',
                      inputs=[ProcessingInput(
                                  source=model_data_s3_uri,
                                  destination='/opt/ml/processing/model'),
                              ProcessingInput(
                                  source=infos_score_data['S3Output']['S3Uri'],
                                  destination='/opt/ml/processing/score')],
                      outputs=[ProcessingOutput(output_name='predictions',
                                  source='/opt/ml/processing/predictions')],
                      job_name = f'3-score-{datetime.utcnow().strftime("%Y%m%d")}-execution{i}')               
    
    
    

In [None]:
dfp_process = build_dfp_jobs(jobs):
    informations = [job.describe() for job in jobs]
    return pd.DataFrame(informations).sort_values(['CreationTime'], ascending=False)

## Check the output

In [None]:
dfp_jobs_processor = build_dfp_jobs(sklearn_processor.jobs)

In [None]:
# Collect some details on the output of the evaluation
output_config_evaluate_job = collect_output_config_processor(dfp_jobs_processor, '2-evaluate')
infos_evaluation = collect_infos_from_output_config(output_config_evaluate_job, 'evaluation')
dfp_evaluation = pd.read_csv(infos_evaluation['S3Output']['S3Uri'] + '/metrics.csv')
dfp_evaluation

In [None]:
# Collect some details on the output of the scoring
output_config_score_job = collect_output_config_processor(dfp_jobs_processor, '3-score')
infos_scoring = collect_infos_from_output_config(output_config_score_job, 'predictions')
dfp_score = pd.read_csv(infos_scoring['S3Output']['S3Uri'] + '/predictions.csv')
dfp_score.head()

## Debug

In [None]:
infos_score_data['S3Output']['S3Uri']

In [None]:
dfp_jobs_processor

In [None]:
input_data_to_score