# SM08: Preprocessing Script

The code to preprocess the [Insurance Company Benchmark (COIL 2000) dataset](https://archive.ics.uci.edu/ml/datasets/Insurance+Company+Benchmark+%28COIL+2000%29) was developed in posts [SM07](). This notebook will turn that code into the script for the pipeline.

## Update EC2 instance

Writing the `preprocess.py` script is very similar to writing the `etl.py` script. The major difference is that I want to use a library that isn't installed by default and want to ensure package versions for several libraries. To do this I need to consider the pre-built EC2 instance configurations vs other options.

AWS provides several different pre-built EC2 instance configurations. Unfortunately, there's always one package that needs to be updated to a specific version or isn't included by default. AWS generally recommends the following two solutions:

- Use a `requirements.txt` (only available for estimator instances, not processor instances)
- Create a custom EC2 image, load it to ECR (elastic container registry), and reference it in your pipeline

### `requirements.txt`

When first starting out, we don't want to have to figure out how to convert a transformer to an estimator. We just want to be able to run the python script and save the outputs to a designated S3 location. So, the `requirements.txt` is out. For information on how to do it, see the [Using thrid-party libraries](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html#using-third-party-libraries) section in the documentation.

### Custom image

The directions to create a custom EC2 image generally involve going into another system, such as the AWS CLI. Our goal is to keep as much together in SageMaker as humanly possible. This rules out creating our own image. For information on how to create an image and load it to ECR, see the [Building your own algorithm container](https://sagemaker-examples.readthedocs.io/en/latest/advanced_functionality/scikit_bring_your_own/scikit_bring_your_own.html#Building-your-own-algorithm-container) section of the documentation or [Pushing a Docker image](https://docs.aws.amazon.com/AmazonECR/latest/userguide/docker-push-ecr-image.html) in the ECR User Guide. 

*Fair warning*, it isn't recommended to create a docker container from within a docker container (which is what SageMaker Studio is). To create the container, you'll need to use the AWS CLI or a SageMaker instance (not Studio).

### The solution

Stackoverflow to the rescue. [This answer](https://stackoverflow.com/a/63925135) gave us the information we needed to simply install or update the specific packages we needed. The code is included directly in the python script and is easy to use. We update the code to be able to load or upgrade a package as necessary.

If we get to the point that we frequently need a specific configuration, we'll want to further explore creating our own image to upload to ECR.

The code to install or upgrade a package on the EC2 is:

In [None]:
def install(package):
    subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package])
def upgrade(package):
    subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package, '--upgrade'])
    
upgrade('pandas==1.3.5')
upgrade('numpy')
install('category_encoders')

## Create subdirectories

I decided to split the data into train, test, and validate in this step. The reason for this is that if the processing is based in any way on the data in the features (for example normalizing numeric values), this can cause data leakage. If I train and evaluate a model on data that was preprocessed all at the same time, the preprocessing - model process will never be evaluated on data it's never seen before. It will always be training on data that was preprocessed with the rest of the data. For more information on data leakage due to preprocessing see:

- [Entry 17: Resampling](https://julielinx.github.io/blog/17_resampling/)
- [Entry 20: Scikit-Learn Pipeline](https://julielinx.github.io/blog/20_sklearn_pipeline/)

In splitting the data, I now need to save multiple datasets. In order to use them in the SageMaker pipeline, I've found it's best to save these under different names for easy referencing in different pipeline steps. To do this, I create subdirectories in the `/opt/ml/processing/output` folder. The code to create subdirectories is:

In [None]:
    try:
        os.makedirs(os.path.join(output_path, "train"))
        os.makedirs(os.path.join(output_path, "validate"))
        os.makedirs(os.path.join(output_path, "test"))
        os.makedirs(os.path.join(output_path, 'encoder'))
    except:
        pass

## Split data

There are several options when splitting data. One of the most popular options is to use the `sklearn.model_selection.train_test_split` function. This is a very handy function for splitting data into train and test datasets. It allows me to specify the size of either the train or the test data, set a random state so that the split is reproducible, shuffle the data prior to splitting, and stratify as appropriate. However, it only splits the data into two - train and test.

In order to get train, test, and validate datasets, I could use `train_test_split` twice or I could use `numpy.split`. Several examples in the [amazon-sagemaker-examples repo](https://github.com/aws/amazon-sagemaker-examples) use `numpy.split`, so I opted to use this in my code. The code to split the data is:

In [None]:
    train_data, validation_data, test_data = np.split(
        processed_df.sample(frac=1, random_state=1729),
        [int(0.7 * len(processed_df)), int(0.9 * len(processed_df))],)

## Save encoder

The last of the new functionalities added to this `.py` script is to save the encoder.

The encoder is the trained transformer that actually changes the data. In my case, it's the trained one hot encoder. The trained one hot encoder determines how the categorical values are turned into numeric features and ensures that this is reproducible regardless of whether new categories are added to subsequent observations.

Once trained, the encoder needs to be saved. I use `joblib.dump` to do this. Just like the three datasets, I put the encoder in its own folder so I can easily reference it in the SageMaker pipeline.

In [None]:
    joblib.dump(encoder, os.path.join(output_path, 'encoder', encoder_name))

## Write Script

Now that I've covered the code for the additional functionality I need to make this script work, I can put it all together.

In [2]:
%%writefile preprocess.py

import subprocess
import sys

def install(package):
    subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package])
def upgrade(package):
    subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package, '--upgrade'])
    
upgrade('pandas==1.3.5')
upgrade('numpy')
install('category_encoders')

import pandas as pd
import numpy as np
import category_encoders as ce
import joblib
import os

if __name__ == '__main__':
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'
 
    try:
        os.makedirs(os.path.join(output_path, "train"))
        os.makedirs(os.path.join(output_path, "validate"))
        os.makedirs(os.path.join(output_path, "test"))
        os.makedirs(os.path.join(output_path, 'encoder'))
    except:
        pass
    
    cat_cols = ['zip_agg Customer Subtype', 'zip_agg Customer main type']

    ori_df = pd.read_csv(os.path.join(input_path, 'full_data.csv'))
    df = pd.DataFrame(ori_df['Nbr mobile home policies']).merge(ori_df.drop('Nbr mobile home policies', axis=1), left_index=True, right_index=True)
    print('Preprocessing data')
    encoder = ce.OneHotEncoder(cols=cat_cols, use_cat_names=True, handle_missing='return_nan')

    train_data, validation_data, test_data = np.split(
        df.sample(frac=1, random_state=1729),
        [int(0.7 * len(df)), int(0.9 * len(df))],)
    
    train_data = encoder.fit_transform(train_data)
    validation_data = encoder.transform(validation_data)
    test_data = encoder.transform(test_data)
    
    print('Saving dataframe')
    train_data.to_csv(os.path.join(output_path, 'train', 'train_feats.csv'), index=False)
    validation_data.to_csv(os.path.join(output_path, 'validate', 'validate_feats.csv'), index=False)
    test_data.to_csv(os.path.join(output_path, 'test', 'test_feats.csv'), index=False)
                              
    print('Saving preprocessor joblib')
    encoder_name = 'preprocessor.joblib'
    joblib.dump(encoder, os.path.join(output_path, 'encoder', encoder_name))

Overwriting preprocess.py


## Write Pipeline

The last step is to write pipeline to run the `.py` script. The only new functionality I add to this pipeline is specifying the locations for multiple outputs.

For simplicity, I reference the output saved to S3 from the ETL pipeline, I don't combine the two pipelines into a single pipeline at this stage. It's much faster to prototype and troubleshoot when working on a single step.

If I combine steps before debugging, I have to wait for the first step to complete before the second one runs every single time. Just spinning up an instance on which to run a single step can take five minutes or more depending on the instance type. This time adds up really fast while debugging especially with pipelines that have many steps.

Multistep pipelines will be covered in [Entry SM12: Multistep pipelines](). For a refresher on the foundations of SageMaker pipelines, see [Entry SM03: ETL Pipeline definition](). 

In [3]:
import sagemaker
import sagemaker.session

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariables

from sagemaker.workflow.pipeline import Pipeline

session = sagemaker.session.Session()
region = session.boto_region_name
role = sagemaker.get_execution_role()

bucket = session.default_bucket()
prefix = '1_ins_dataset'
pipeline_name = "InsExample"  # SageMaker Pipeline name
model_package_group_name = "Insurance Co Example"  # Model name in model registry
framework_version = "0.23-1"

input_uri = f's3://{bucket}/{prefix}/clean/full_data.csv'

tags = [
    {"Key": "PLATFORM", "Value": "FO-ML"},
    {"Key": "BUSINESS_REGION", "Value": "GLOBAL"},
    {"Key": "BUSINESS_UNIT", "Value": "MOBILITY"},
    {"Key": "CLIENT", "Value": "MULTI_TENANT"}
   ]

# tags = [
#     {"Key": "DATASET", "Value": "InsCOIL"},
#     {"Key": "SOURCE", "Value": "UCI"}
#    ]

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.t3.medium")
    
input_data = ParameterString(
    name="InputData",
    default_value=input_uri
)

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="ins-example-job"
)

step_preprocess = ProcessingStep(
    name="preprocess_data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input")
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/output/train",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    'final',
                    "train"
                ],
            ),
        ),
        ProcessingOutput(
            output_name="validate",
            source="/opt/ml/processing/output/validate",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    'final',
                    "validate"
                ],
            ),
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/output/test",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    'final',
                    "test"
                ],
            ),
        ),
        ProcessingOutput(
            output_name="encoder",
            source="/opt/ml/processing/output/encoder",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    'final',
                    'encoder'
                ],
            ),
        ),
    ],
    code="preprocess.py"
)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        input_data,
    ],
    steps=[step_preprocess])

pipeline.upsert(role_arn=role, tags=tags)

pipeline.start(execution_display_name="InsPreprocess10")

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:707031497630:pipeline/insexample/execution/bz7vxb9z8uag', sagemaker_session=<sagemaker.session.Session object at 0x7faddbf563d0>)

In [8]:
import sys
!{sys.executable} -q -m pip install category_encoders

Collecting category_encoders
  Using cached category_encoders-2.5.1.post0-py2.py3-none-any.whl (72 kB)
Collecting numpy>=1.14.0
  Using cached numpy-1.22.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.9 MB)
Installing collected packages: numpy, category_encoders
  Attempting uninstall: numpy
    Found existing installation: numpy 1.23.4
    Uninstalling numpy-1.23.4:
      Successfully uninstalled numpy-1.23.4
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
daal4py 2021.3.0 requires daal==2021.2.3, which is not installed.
numba 0.54.1 requires numpy<1.21,>=1.17, but you have numpy 1.22.4 which is incompatible.[0m[31m
[0mSuccessfully installed category_encoders-2.5.1.post0 numpy-1.22.4
[0m

In [9]:
import pandas as pd
import numpy as np
import category_encoders as ce
import joblib
import os
import sagemaker.session

session = sagemaker.session.Session()
region = session.boto_region_name
role = sagemaker.get_execution_role()
bucket = session.default_bucket()
prefix = '1_ins_dataset'

cat_cols = ['zip_agg Customer Subtype', 'zip_agg Customer main type']
ori_df = pd.read_csv(f's3://{bucket}/{prefix}/clean/full_data.csv')

In [10]:
ori_df.head()

Unnamed: 0,zip_agg Customer Subtype,zip_agg Number of houses,zip_agg Avg size household,zip_agg Avg age,zip_agg Customer main type,zip_agg Roman catholic,zip_agg Protestant,zip_agg Other religion,zip_agg No religion,zip_agg Married,...,Nbr private accident ins policies,Nbr family accidents ins policies,Nbr disability ins policies,Nbr fire policies,Nbr surfboard policies,Nbr boat policies,Nbr bicycle policies,Nbr property ins policies,Nbr ss ins policies,Nbr mobile home policies
0,Lower class large families,1,3,2,Family with grown ups,0,5,1,3,7,...,0,0,0,1,0,0,0,0,0,0
1,Mixed small town dwellers,1,2,2,Family with grown ups,1,4,1,4,6,...,0,0,0,1,0,0,0,0,0,0
2,Mixed small town dwellers,1,2,2,Family with grown ups,0,4,2,4,3,...,0,0,0,1,0,0,0,0,0,0
3,"Modern, complete families",1,3,3,Average Family,2,3,2,4,5,...,0,0,0,1,0,0,0,0,0,0
4,Large family farms,1,4,2,Farmers,1,4,1,4,7,...,0,0,0,1,0,0,0,0,0,0


In [4]:


if __name__ == '__main__':
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'
 
    try:
        os.makedirs(os.path.join(output_path, "train"))
        os.makedirs(os.path.join(output_path, "validate"))
        os.makedirs(os.path.join(output_path, "test"))
        os.makedirs(os.path.join(output_path, 'encoder'))
    except:
        pass
    
    cat_cols = ['zip_agg Customer Subtype', 'zip_agg Customer main type']

    ori_df = pd.read_csv(os.path.join(input_path, 'full_data.csv'))
    df = ori_df['Nbr mobile home policies'].merge(ori_df.drop('Nbr mobile home policies', axis=1), left_index=True, right_index=True)
    print('Preprocessing data')
    encoder = ce.OneHotEncoder(cols=cat_cols, use_cat_names=True, handle_missing='return_nan')

    train_data, validation_data, test_data = np.split(
        df.sample(frac=1, random_state=1729),
        [int(0.7 * len(df)), int(0.9 * len(df))],)
    
    train_data = encoder.fit_transform(train_data)
    validation_data = encoder.transform(validation_data)
    
    print('Saving dataframe')
    train_data.to_csv(os.path.join(output_path, 'train', 'train_feats.csv'))
    validation_data.to_csv(os.path.join(output_path, 'validate', 'validate_feats.csv'))
    test_data.to_csv(os.path.join(output_path, 'test', 'test_feats.csv'))
                              
    print('Saving preprocessor joblib')
    encoder_name = 'preprocessor.joblib'
    joblib.dump(encoder, os.path.join(output_path, 'encoder', encoder_name))

Overwriting preprocess.py
