**Note:** Make sure you go through the [Setup Notebook](mnist-setup.ipynb) notebook once at the start of the program.

In [2]:
%load_ext autoreload
%autoreload 2

import sys
from pathlib import Path

CODE_FOLDER = Path("codemnist")
sys.path.append(f"./{CODE_FOLDER}")

In [3]:
import os
import numpy as np
import json
import numpy as np
import tempfile

from constantsmnist import *
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

In [4]:
%%writefile {CODE_FOLDER}/preprocessormnist.py

import os
import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH_TRAIN = Path(BASE_DIRECTORY) / "input_train" / "mnist_train.csv" #input independiente para cada una
DATA_FILEPATH_TEST = Path(BASE_DIRECTORY) / "input_test" / "mnist_test.csv"



def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """

    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        validation_path / "validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)


def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", "wb"))


def _save_classes(base_directory, classes):
    """
    Saves the list of classes from the dataset.
    """
    path = Path(base_directory) / "classes"
    path.mkdir(parents=True, exist_ok=True)

    np.asarray(classes).tofile(path / "classes.csv", sep=",")


def _save_baseline(base_directory, df_train, df_test):
    """
    During the data and quality monitoring steps, we will need a baseline
    to compute constraints and statistics. This function will save that
    baseline to the disk.
    """

    for split, data in [("train", df_train), ("test", df_test)]:
        baseline_path = Path(base_directory) / f"{split}-baseline"
        baseline_path.mkdir(parents=True, exist_ok=True)

        df = data.copy().dropna()
        df.to_json(
            baseline_path / f"{split}-baseline.json", orient="records", lines=True
        )


def preprocess(base_directory, data_filepath_train, data_filepath_test):  # comparado con pinguins este solicita el test file porque no lo genera en la funcion
    """
    Preprocesses the supplied raw dataset and splits it into a train,
    validation, and a test set.
    """

    df_train = pd.read_csv(data_filepath_train)
    df_test = pd.read_csv(data_filepath_test)

    numeric_features = df_train.select_dtypes(include=['int']).columns.tolist()
    numeric_features.remove("label")
    
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("scaler", StandardScaler()),
        ]
    )

# No hay valores categoricos todos son int incluso label 
    
#    categorical_transformer = Pipeline(
#        steps=[
#            ("imputer", SimpleImputer(strategy="most_frequent")),
#            ("encoder", OneHotEncoder()),
#        ]
#    )

    preprocessor = ColumnTransformer(
        transformers=[
            ("numeric", numeric_transformer, numeric_features),
            # ("categorical", categorical_transformer, ["island"]),
        ]
    )

    pipeline = Pipeline(
        steps=[
            ("preprocessing", preprocessor)
        ]
    )

#    df.drop(["sex"], axis=1, inplace=True)
#    df = df.sample(frac=1, random_state=42)
    
# no se pueden ordenar randomicamente porque son imagenes

#    df_train, temp = train_test_split(df, test_size=0.3)
#    df_validation, df_test = train_test_split(temp, test_size=0.5)

# se dividira el set de train en train y validation en 80/20

    df_train, df_validation = train_test_split(df_train, test_size=0.2)

# el set de test se mantiene como viene en la data original
    
    
    label_encoder = LabelEncoder()
    y_train = label_encoder.fit_transform(df_train.label)
    y_validation = label_encoder.transform(df_validation.label)
    y_test = label_encoder.transform(df_test.label)
    
    _save_baseline(base_directory, df_train, df_test)

    df_train = df_train.drop(["label"], axis=1)
    df_validation = df_validation.drop(["label"], axis=1)
    df_test = df_test.drop(["label"], axis=1)

    X_train = pipeline.fit_transform(df_train)
    X_validation = pipeline.transform(df_validation)
    X_test = pipeline.transform(df_test)

    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)

    _save_splits(base_directory, train, validation, test)
    _save_pipeline(base_directory, pipeline=pipeline)
    _save_classes(base_directory, label_encoder.classes_)  # hay que tomar en cuenta que las classes no estan representadas todas en df_train


if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, DATA_FILEPATH_TRAIN, DATA_FILEPATH_TEST)


Overwriting codemnist/preprocessormnist.py


## Testing the processing script

In [5]:
from preprocessormnist import preprocess

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath_train=DATA_FILEPATH_TRAIN,
        data_filepath_test=DATA_FILEPATH_TEST
    )
    
    print(f"Folders: {os.listdir(directory)}")

Folders: ['train-baseline', 'test-baseline', 'train', 'validation', 'test', 'pipeline', 'classes']


## - Pipeline Configuration

In [5]:
dataset_location_train = ParameterString(
    name="dataset_location_train",
    default_value=f"{S3_FILEPATH}/mnist_train.csv",  #cuidad con la definicion del path del S3
)

dataset_location_test = ParameterString(
    name="dataset_location_test",
    default_value=f"{S3_FILEPATH}/mnist_test.csv",
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

## Setting up the process step

In [6]:
sklearn_processor = SKLearnProcessor(
    base_job_name="mnist-preprocessing",
    framework_version="0.23-1",
    #instance_type="ml.t3.medium",
    instance_type="ml.m5.large",
    instance_count=1,
    role=role,
)

preprocess_data_step = ProcessingStep(
    name="preprocess-data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=dataset_location_train, destination="/opt/ml/processing/input_train"), #LocalPath values must be unique
        ProcessingInput(source=dataset_location_test, destination="/opt/ml/processing/input_test"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline", destination=preprocessor_destination),
        ProcessingOutput(output_name="classes", source="/opt/ml/processing/classes", destination=preprocessor_destination),
        ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline"),
        ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline"),
    ],
    code=f"{CODE_FOLDER}/preprocessormnist.py",
    cache_config=cache_config
)

## Step 5 - Setting up the Pipeline

In [7]:
session1_pipeline_mnist = Pipeline(
    name="mnist-session1-pipeline",
    parameters=[
        dataset_location_train,
        dataset_location_test,
        preprocessor_destination,
    ],
    steps=[
        preprocess_data_step, 
    ],
    pipeline_definition_config=pipeline_definition_config
)

session1_pipeline_mnist.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:759575403360:pipeline/mnist-session1-pipeline',
 'ResponseMetadata': {'RequestId': 'f5ee06ce-52e0-43d8-a3d9-8ce340e6f2d6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f5ee06ce-52e0-43d8-a3d9-8ce340e6f2d6',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Fri, 28 Jul 2023 01:29:24 GMT'},
  'RetryAttempts': 0}}

## Runnig sessions

In [8]:
session1_pipeline_mnist.start()
# session2_pipeline.start()
# session3_pipeline.start()
# session4_pipeline.start()
# session5_pipeline.start()
# session6_pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:759575403360:pipeline/mnist-session1-pipeline/execution/h4vcb55svcv6', sagemaker_session=<sagemaker.session.Session object at 0x7f059d052700>)