## Anotações
### **Definição Básica de Pipeline:** <br>

Uma instância de pipeline é composta de um name,parameters, esteps. Os nomes de pipeline devem ser exclusivos dentro de um (account, region)Par. Todos os parâmetros usados nas definições de etapa devem ser definidos no pipeline. As etapas passadas para o pipeline não precisam ser listadas na ordem de execução porque as próprias etapas definem as relações entre elas usando dependências de dados. O serviço SageMaker Pipelines resolve as relações entre as etapas no DAG de dependência de dados para criar uma série de etapas concluídas pela execução. <br>

#### **Estrutura Básica de uma Pipeline:** 
```python
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)
```

### **Passos de uma Pipeline:**<br>
Os Pipelines SageMaker são compostos por etapas. Essas etapas definem as ações que o pipeline executa e as relações entre as etapas usando propriedades.<br>

#### **Tipos de etapas:**<br>

Existem tipos de etapas em um processo de criação de pipeline. Os pipelines de criação de modelos do Amazon SageMaker suportam os seguintes tipos de etapas:<br>
* **Processamento (ProcessingStep):** Esse etapa serve para criar um trabalho de processamento e tratamento  de dadods;
* **Treinamento (TrainingStep):** Essa etapa serve para criar um fluxo de trabalho de treinamento para um modelo.
* **Ajuste (TuningStep):** Etapa de ajuste serve para criar um trabalho de ajuste de hiperparâmetro, também conhecido como otimização de hiperparâmetro (HPO). Um trabalho de ajuste de hiperparâmetro executa vários trabalhos de treinamento, cada um produzindo uma versão de modelo.
* **CreateModel (CreateModelStep):** Etapa de criação de modelo
* **RegisterModel (RegisterModel):** Usa uma etapa de modelo de registro para registrar um SageMaker.model.modelou um SageMaker.Pipeline.PipelineModelcom o registro do modelo do Amazon SageMaker.
* **Transformação (TransformStep):** Use-se uma etapa de transformação para a transformação em lote para executar inferência em um conjunto de dados inteiro.
* **Condição (ConditionStep):** Use uma etapa de condição para avaliar a condição das propriedades da etapa para avaliar qual ação deve ser tomada em seguida no pipeline.
* **Retorno de Chamada ():** Use uma etapa de retorno de chamada para incorporar processos adicionais eAWSem seu fluxo de trabalho que não são fornecidos diretamente pelo Amazon SageMaker Model Building Pipelines.
* **Lambda ():** Você usa uma etapa lambda para executar um AWS Lambda função. Você pode executar uma função existente do Lambda, ou o SageMaker pode criar e executar uma nova função do Lambda.

---
### Definindo as Variáveis constantes 

In [37]:
import boto3
import sagemaker


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = 'pipeline-wine'

model_package_group_name = "PipelineModelWine"
prefix = "pipeline-model-wine"
pipeline_name = "TrainingPipelineForModel"
#input_data_uri = 's3://pipeline-demo-wine/input/wine.data'
input_data_uri = 's3://pipeline-demo-wine/input/raw.csv'

---
### Definindo os parâmetros paro o Pipeline

In [38]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# Quantidade de instâncias de processamento
processing_instance_count = ParameterInteger(name='ProcessingInstanceCount', default_value = 1)
# Podemos escolher instâncias para cada tipo de processo que queremos
processing_instance_type  = ParameterString(name='ProcessingInstanceType', default_value='ml.m5.xlarge')
training_instance_type =  ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
# Status do modelo
model_approval_status     = ParameterString(name='ModelApprovalStatus', default_value='PendingManualApproval')
# Caminho dos dados brutos
input_data                = ParameterString(name='InputData', default_value=input_data_uri)

---
### Script de Pré-Processamento

In [39]:
!mkdir -p scripts

In [40]:
%%writefile scripts/preprocessing.py
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    print('lendo a base')
    #df = pd.read_csv(f"{base_dir}/input/wine.data", header=None)
    df = pd.read_csv(f"{base_dir}/input/raw.csv")
    #Separando os atributos e a target
    print(df.columns)
    print(df['0'].value_counts)
    print('separando x e y')
    y, x = df.iloc[:, 0], df.iloc[:, 1:]
    #Separando entre treino e teste
    X_train, X_test, y_train, y_test = train_test_split(x, y,random_state=0, test_size=0.2)
    #Criando pipeline para normalizar os dados
    print('Criando pipeline')
    pipe = Pipeline([('scaler', StandardScaler())])
    X_train_processed = pipe.fit_transform(X_train)
    X_test_processed = pipe.transform(X_test)
    y_train = y_train.to_numpy().reshape(len(y_train), 1)
    y_test = y_test.to_numpy().reshape(len(y_test), 1)
    print('Consolidando os dados de treino, teste e validação')
    #Consolidando os dados de treino, teste e validação
    df_train = pd.DataFrame(np.concatenate((y_train, X_train_processed), axis=1))
    df_test = np.concatenate((y_test, X_test_processed), axis=1)
    #Dividindo entre teste e validação
    print('Dividindo entre teste e validação')
    test, validation = np.split(df_test, 2)
    #Salvando os dados de treino, teste e validação
    print('Salvando os dados de treino, teste e validação')
    pd.DataFrame(df_train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting scripts/preprocessing.py


---
### Definindo Processamento

In [41]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-wine-process",
    role=role
)

In [42]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.steps import CacheConfig
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
step_process = ProcessingStep(
    name="WineProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="scripts/preprocessing.py",
    cache_config=cache_config
)

---
### Treinamento do Modelo

In [43]:
from sagemaker.estimator import Estimator


model_path = f"s3://{default_bucket}/WineTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.2-2",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    disable_profiler=True
)
num_labels = 3
xgb_train.set_hyperparameters(
    max_depth = 5,
    eta = 0.2,
    gamma = 4,
    min_child_weight = 6,
    objective = "multi:softprob",
    num_class = num_labels,
    num_round = 10
)

In [44]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="WineTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

---
### Script de Evolução do Modelo

In [45]:
%%writefile scripts/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score


if __name__ == "__main__":
    print('lendo o modelo')
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
        
    model = pickle.load(open("xgboost-model", "rb"))
    print('lendo base de teste')
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    #df.drop(columns=[0], inplace=True)
    x = df.iloc[:, 1:]
    X_test = xgboost.DMatrix(x.values)

    predictions = model.predict(X_test)
    print(predictions)
    predictions = np.array(predictions)
    y_pred = []
    for i in predictions:
        pred = np.where(i == max(i))[0][0]
        y_pred.append(pred)
    print('calculo acurácia')
    acuracia = accuracy_score(y_test, y_pred)
    #f1 = f1_score(y_test, predictions)
    #precisao = precision_score(y_test, predictions)
    #recall = recall_score(y_test, predictions)
    print(acuracia)
    report_dict = {
        "classification_wine_metrics": {
            "acuracia": {"value":acuracia,"standard_deviation": "NaN"}
        }
    }
    print('Salvando as métricas')
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting scripts/evaluation.py


---
### Etapa de evolução do modelo

In [46]:
from sagemaker.processing import ScriptProcessor

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-wine-evaluation",
    role=role,
)

In [47]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="WineEvaluation",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")
    ],
    code="scripts/evaluation.py",
    property_files=[evaluation_report]
)

---
### Etapa de Registrar o Modelo

In [48]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
step_register = RegisterModel(
    name="WineRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics 
)

---
### Etapa Condicional

In [49]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_wine_metrics.acuracia.value",
    ),
    right=0.7,
)

step_cond = ConditionStep(
    name="WineAccuracyCond",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

### Criando Pipeline

In [50]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"WinePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        training_instance_type,
        processing_instance_count,
        model_approval_status,
        input_data
    ],
    steps=[step_process, step_train, step_eval, step_cond]
)

In [51]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


[{'StepName': 'WineRegisterModel',
  'StartTime': datetime.datetime(2022, 1, 9, 14, 8, 16, 342000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 1, 9, 14, 8, 17, 434000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-2:130962303082:model-package/pipelinemodelwine/3'}}},
 {'StepName': 'WineAccuracyCond',
  'StartTime': datetime.datetime(2022, 1, 9, 14, 8, 15, 372000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 1, 9, 14, 8, 15, 705000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Condition': {'Outcome': 'True'}}},
 {'StepName': 'WineEvaluation',
  'StartTime': datetime.datetime(2022, 1, 9, 14, 3, 51, 816000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 1, 9, 14, 8, 10, 565000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:130962303082:pro

---