# [Lab1] SageMaker Processing

In [1]:
%store -r

In [2]:
import sagemaker
import boto3
from time import gmtime, strftime

boto_session = boto3.Session()
sess = sagemaker.Session()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


## 1. Preparing for Proecessing Job

### Dependencies

In [3]:
%mkdir -p './processing/requirements/'

In [4]:
%%writefile './processing/requirements/requirements.txt'
mlflow==2.13.2
sagemaker-mlflow==0.1.0

Writing ./processing/requirements/requirements.txt


### Processing Script

In [5]:
%%writefile processing/preprocessing.py

import pandas as pd
import numpy as np
import argparse
import os
import mlflow
from sklearn.preprocessing import OrdinalEncoder

user_profile_name = os.getenv('USER')

def _parse_args():

    parser = argparse.ArgumentParser()

    # Data, model, and output directories
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='bank-additional-full.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    parser.add_argument('--categorical_features', type=str, default='y, job, marital, education, default, housing, loan, contact, month, day_of_week, poutcome')

    return parser.parse_known_args()

if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_ARN'])
    mlflow.autolog()
    
    with mlflow.start_run(run_id=os.environ['MLFLOW_RUN_ID']) as run:
        # Load data
        df = pd.read_csv(os.path.join(args.filepath, args.filename))
        
        # Change the value . into _
        df = df.replace(regex=r'\.', value='_')
        df = df.replace(regex=r'\_$', value='')
        
        # Add two new indicators
        df["no_previous_contact"] = (df["pdays"] == 999).astype(int)
        df["not_working"] = df["job"].isin(["student", "retired", "unemployed"]).astype(int)
        df = df.drop(['duration', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed'], axis=1)
        
        # Encode the categorical features
        df = pd.get_dummies(df)
        
        # Train, test, validation split
        train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=42), [int(0.7 * len(df)), int(0.9 * len(df))])   # Randomly sort the data then split out first 70%, second 20%, and last 10%
        
        mlflow.log_params(
            {
                "train": train_data.shape,
                "validate": validation_data.shape,
                "test": test_data.shape
            }
        )

        mlflow.set_tags(
            {
                'mlflow.user':user_profile_name,
                'mlflow.source.type':'JOB'
            }
        )
        
        # Local store
        pd.concat([train_data['y_yes'], train_data.drop(['y_yes','y_no'], axis=1)], axis=1).to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
        pd.concat([validation_data['y_yes'], validation_data.drop(['y_yes','y_no'], axis=1)], axis=1).to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
        test_data['y_yes'].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
        test_data.drop(['y_yes','y_no'], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)

        # For logging artifacts
        df.drop(['y_yes','y_no'], axis=1).to_csv(os.path.join(args.outputpath, 'baseline/baseline.csv'), index=False, header=False)
        mlflow.log_artifact(local_path=os.path.join(args.outputpath, 'baseline/baseline.csv'))
        
    print("## Processing complete. Exiting.")

Writing processing/preprocessing.py


## 2. Input / Output Settings

### Input Path

In [6]:
input_source = sess.upload_data('./bank-additional/bank-additional-full.csv', bucket=bucket, key_prefix=f'{prefix}/input_data')
input_source

's3://sagemaker-ap-northeast-2-185567426878/sagemaker/DEMO-xgboost-dm/input_data/bank-additional-full.csv'

In [7]:
%store input_source

Stored 'input_source' (str)


### Output Path

In [8]:
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"
baseline_path = f"s3://{bucket}/{prefix}/baseline"

In [9]:
%store train_path
%store validation_path
%store test_path
%store baseline_path

Stored 'train_path' (str)
Stored 'validation_path' (str)
Stored 'test_path' (str)
Stored 'baseline_path' (str)


### Processing with Experimental Setup

- Experiment Start : `mlflow.start_run()`
  
- **Processing | Training | Evaluating / ...**
  
- Experiment End : `mlflow.end_run()`

In [11]:
import mlflow

mlflow.set_tracking_uri(mlflow_arn)
experiment = mlflow.set_experiment(experiment_name=experiment_name)

2024/10/20 12:17:10 INFO mlflow.tracking.fluent: Experiment with name 'end-to-end-experiment-20-11-52-06' does not exist. Creating a new experiment.


In [12]:
run_suffix = strftime('%d-%H-%M-%S', gmtime())
run_name = f"container-processing-{run_suffix}"

run_id = mlflow.start_run(run_name=run_name, description="feature-engineering with a processing job").info.run_id

In [13]:
from sagemaker.sklearn import SKLearn
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role


sklearn_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version="1.2-1",
    role=get_execution_role(),
    instance_type="ml.m5.large",
    instance_count=1, 
    base_job_name='sm-immday-skprocessing',
    # Environmet Variables
    env={
        'MLFLOW_TRACKING_ARN': mlflow_arn,
        'MLFLOW_RUN_ID': run_id,
        'USER': user_profile_name
    }
)

processing_inputs = [
    ProcessingInput(
        source=input_source, 
        destination="/opt/ml/processing/input",
        s3_input_mode="File",
        s3_data_distribution_type="ShardedByS3Key"
    )
]

processing_outputs = [
        ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train",
            destination=train_path,
        ),
        ProcessingOutput(
            output_name="validation_data", 
            source="/opt/ml/processing/output/validation", 
            destination=validation_path
        ),
        ProcessingOutput(
            output_name="test_data", 
            source="/opt/ml/processing/output/test", 
            destination=test_path
        ),
        ProcessingOutput(
            output_name="baseline_data", 
            source="/opt/ml/processing/output/baseline", 
            destination=baseline_path
        )
]

In [14]:
sklearn_processor.run(
    inputs=processing_inputs,
    code='processing/preprocessing.py',
    outputs=processing_outputs,
    dependencies=['processing/requirements/requirements.txt'],
    # arguments = [
    #     '--filename', 'bank-additional-full.csv',
    #     '--categorical_features', 'y, job, marital, education, default, housing, loan, contact, month, day_of_week, poutcome'
    # ],
    wait=True
)

INFO:sagemaker:Creating processing-job with name sm-immday-skprocessing-2024-10-20-12-17-41-181


............[34mFound existing installation: typing 3.7.4.3[0m
[34mUninstalling typing-3.7.4.3:
  Successfully uninstalled typing-3.7.4.3[0m
[34mCollecting mlflow==2.13.2 (from -r requirements.txt (line 1))
  Downloading mlflow-2.13.2-py3-none-any.whl.metadata (29 kB)[0m
[34mCollecting sagemaker-mlflow==0.1.0 (from -r requirements.txt (line 2))
  Downloading sagemaker_mlflow-0.1.0-py3-none-any.whl.metadata (3.3 kB)[0m
[34mCollecting alembic!=1.10.0,<2 (from mlflow==2.13.2->-r requirements.txt (line 1))
  Downloading alembic-1.13.3-py3-none-any.whl.metadata (7.4 kB)[0m
[34mCollecting cachetools<6,>=5.0.0 (from mlflow==2.13.2->-r requirements.txt (line 1))
  Downloading cachetools-5.5.0-py3-none-any.whl.metadata (5.3 kB)[0m
[34mCollecting cloudpickle<4 (from mlflow==2.13.2->-r requirements.txt (line 1))
  Downloading cloudpickle-3.1.0-py3-none-any.whl.metadata (7.0 kB)[0m
[34mCollecting docker<8,>=4.0.0 (from mlflow==2.13.2->-r requirements.txt (line 1))
  Downloading dock

In [15]:
mlflow.set_tags(
    {
        'mlflow.source.name':f'https://{region}.console.aws.amazon.com/sagemaker/home?region={region}#/processing-jobs/{sklearn_processor.latest_job.name}',
    }
)

mlflow.end_run()

In [16]:
last_run_id = mlflow.search_runs(
    experiment_ids=[mlflow.get_experiment_by_name(experiment_name).experiment_id], 
    max_results=1, 
    order_by=["attributes.start_time DESC"]
)['run_id'][0]

presigned_url = sess.sagemaker_client.create_presigned_mlflow_tracking_server_url(
    TrackingServerName=mlflow_name,
    ExpiresInSeconds=60,
    SessionExpirationDurationInSeconds=1800
)['AuthorizedUrl']

mlflow_run_link = f"{presigned_url.split('/auth')[0]}/#/experiments/1/runs/{last_run_id}"

In [17]:
from IPython.display import Javascript

# first open the MLflow UI - you can close a new opened window
display(Javascript('window.open("{}");'.format(presigned_url)))

<IPython.core.display.Javascript object>

### Checking the results

In [18]:
import pandas as pd
import io

train_data = sess.read_s3_file(
    bucket=bucket,
    key_prefix=f"{prefix}/train/train.csv"
)

df = pd.read_csv(io.StringIO(train_data), header=None)
pd.set_option('display.max_columns', 10)
print(df.head(10))
print(df.tail(10))

   0   1   2    3   4   ...  55  56  57  58  59
0   0  57   1  999   1  ...   0   0   1   0   0
1   0  55   2  999   0  ...   0   0   0   1   0
2   0  33   1  999   1  ...   0   0   1   0   0
3   0  36   4  999   0  ...   0   0   0   1   0
4   0  27   2  999   0  ...   0   0   0   1   0
5   0  58   1  999   0  ...   0   0   0   1   0
6   0  48   1  999   0  ...   0   1   0   1   0
7   0  51   7  999   0  ...   0   0   0   1   0
8   0  24   4  999   0  ...   0   1   0   1   0
9   0  36   4  999   0  ...   0   0   0   1   0

[10 rows x 60 columns]
       0   1   2    3   4   ...  55  56  57  58  59
28821   0  31   3  999   0  ...   0   0   0   1   0
28822   0  45   5  999   0  ...   0   0   0   1   0
28823   0  27   1  999   0  ...   0   0   0   1   0
28824   1  32   1    6   1  ...   0   1   0   0   1
28825   0  54   1  999   0  ...   0   0   0   1   0
28826   0  23   1  999   0  ...   0   1   0   1   0
28827   0  43   5  999   0  ...   1   0   0   1   0
28828   0  43   2  999   0  ... 