<img src="Tarjeta.png">

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Conectar-área-de-trabajo" data-toc-modified-id="Conectar-área-de-trabajo-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Conectar área de trabajo</a></span></li><li><span><a href="#Preparar-los-datos-para-el-experimento" data-toc-modified-id="Preparar-los-datos-para-el-experimento-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Preparar los datos para el experimento</a></span></li><li><span><a href="#Crear-los-scripts-para-los-pasos-de-la-canalización" data-toc-modified-id="Crear-los-scripts-para-los-pasos-de-la-canalización-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Crear los scripts para los pasos de la canalización</a></span></li><li><span><a href="#Preparar-el-entorno-para-los-pasos-de-la-canalización" data-toc-modified-id="Preparar-el-entorno-para-los-pasos-de-la-canalización-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Preparar el entorno para los pasos de la canalización</a></span></li><li><span><a href="#Crear-y-ejecutar-canalización" data-toc-modified-id="Crear-y-ejecutar-canalización-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Crear y ejecutar canalización</a></span></li><li><span><a href="#Publicar-la-canalización" data-toc-modified-id="Publicar-la-canalización-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Publicar la canalización</a></span></li><li><span><a href="#Llamar-al-endpoint-de-la-canalización" data-toc-modified-id="Llamar-al-endpoint-de-la-canalización-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Llamar al endpoint de la canalización</a></span></li><li><span><a href="#Calendar-la-canalización" data-toc-modified-id="Calendar-la-canalización-8"><span class="toc-item-num">8&nbsp;&nbsp;</span>Calendar la canalización</a></span></li></ul></div>

# Canalizaciones

Se pueden realizar los distintos pasos necesarios (ingerir datos, entrenar un modelo y registrar el modelo) individualmente mediante el SDK de Azure ML para ejecutar experimentos basados en scripts. Sin embargo, en un entorno empresarial es común encapsular la secuencia de pasos necesarios para construir una solución de machine learning en una *canalización* que se puede ejecutar en uno o más instancias de procesos, ya sea bajo demanda por parte de un usuario, desde un proceso de ejecución automatizado o según un cronograma.

## Conectar área de trabajo

In [1]:
import azureml.core
from azureml.core import Workspace

# Cargar el área de trabajo del fichero de configuración
ws = Workspace.from_config()
print('Versión de Azure ML {} y área de trabajo {}'.format(azureml.core.VERSION, ws.name))

Versión de Azure ML 1.32.0 y área de trabajo aml-nuclio


## Preparar los datos para el experimento


In [2]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'wine-dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/winequality.csv'], 
                        target_path='wine-data/', 
                        overwrite=True,
                        show_progress=True)

    # Crear un dataset tabular
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'wine-data/*.csv'))

    # Registrar el dataset tabular
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='wine-dataset',
                                description='datos del vino',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registrado.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset ya existe.')

Dataset ya existe.


## Crear los scripts para los pasos de la canalización

Las canalizaciones consisten en uno o más pasos que pueden ser scripts de python o pasos especiales como por ejemplo transferencia de datos. Cada paso se puede ejecutar en su propio contexto de ejecución. Se va a hacer una canalización que contiene dos scripts de python.

In [15]:
vino_ds = ws.datasets.get("wine-dataset")
vino_ds

{
  "source": [
    "('workspaceblobstore', 'wine-data/*.csv')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ParseDelimited",
    "DropColumns",
    "SetColumnTypes"
  ],
  "registration": {
    "id": "044fff14-42f2-42bc-a9cd-6e12ceb11e9f",
    "name": "wine-dataset",
    "version": 1,
    "description": "datos del vino",
    "tags": {
      "format": "CSV"
    },
    "workspace": "Workspace.create(name='aml-nuclio', subscription_id='87f9793d-5515-43eb-b182-0f27b97da8b3', resource_group='nuclio')"
  }
}

In [3]:
import os
# Crear una carpeta para el experimento
experiment_folder = 'vino_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

vino_pipeline


In [16]:
%%writefile $experiment_folder/prep_vino.py
# Importar librerías
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Conseguir los argumentos del script
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Contexto de la ejecución del experimento
run = Run.get_context()

# Cargar los datos
print("Cargando los datos...")
data = run.input_datasets['raw_data'].to_pandas_dataframe()
# Cambiar formato de la variable
data["alcohol"] = pd.to_numeric(data["alcohol"], errors='coerce')
# Eliminar nulos
data = data.dropna()

# Contar filas y añadir el resultado al log
row_count = (len(data))
run.log('observaciones', row_count)
print('Analizando {} filas de los datos'.format(row_count))

# Normalizar las variables numéricas
scaler = MinMaxScaler()
num_cols = features = ['fixed_acidity', 'volatile_acidity', 'citric_acid', 'residual_sugar', 'chlorides', 'free_sulfur_dioxide', 'total_sulfur_dioxide', 'density', 'ph', 'sulphates', 'alcohol']
data[num_cols] = scaler.fit_transform(data[num_cols])

# Log de las filas  procesadas
row_count = (len(data))
run.log('observaciones_procesadas', row_count)

# Guardar los datos
print("Guardando los datos...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder, 'data.csv')
data.to_csv(save_path, index=False, header=True)

run.complete()

Overwriting vino_pipeline/prep_vino.py


In [17]:
%%writefile $experiment_folder/train_vino.py
# Importar librerías
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Conseguir los argumentos del script
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Contexto de la ejecución del experimento
run = Run.get_context()

# Cargar los datos preparados
print("Cargando datos...")
file_path = os.path.join(training_data,'data.csv')
data = pd.read_csv(file_path)

# Separar features and target
features = ['fixed_acidity', 'volatile_acidity', 'citric_acid', 'residual_sugar', 'chlorides', 'free_sulfur_dioxide', 'total_sulfur_dioxide', 'density', 'ph', 'sulphates', 'alcohol']
X, y = data[features].values, data['top_quality'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Entrenamiento de un arbol de decision
print('Entrenando un decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# Calcular el accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# Calcular AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Visualización curva ROC
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Guardar el modelo entrenado en la carpeta outputs
print("Guardando el modelo...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'vino_model.pkl')
joblib.dump(value=model, filename=model_file)

# Registrar el modelo
print('Registrando el modelo...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'vino_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

Overwriting vino_pipeline/train_vino.py


## Preparar el entorno para los pasos de la canalización

En este caso, usaremos la misma instancia para ejecutar ambos pasos, pero se pueden ejecutar en instancias independientes.

In [18]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "NuclioCluster"

try:
    # Validar si existe
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Cluster de proceso encontrado.')
except ComputeTargetException:
    # Si no existe, se crea
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Cluster de proceso encontrado.


In [19]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

Overwriting vino_pipeline/experiment_env.yml


In [20]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Creamos un entorno de python
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Registramos el entorno
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Creamos un nuevo runconfig para la canalización
pipeline_run_config = RunConfiguration()

# Usar la instancia creada
pipeline_run_config.target = pipeline_cluster

# Asignar el entorno
pipeline_run_config.environment = registered_env

print ("Configuración creada.")

Configuración creada.


## Crear y ejecutar canalización

In [21]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Conseguir el dataset de entrenamiento
vino_ds = ws.datasets.get("wine-dataset")

# Crear un OutputFileDatasetConfig (Data Reference temporal) para pasar los datos del paso 1 a paso 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Paso 1, ejecutar el data prep
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_vino.py",
                                arguments = ['--input-data', vino_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Paso 2, ejecutar el entrenamiento
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_vino.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pasos canalizaciones definidas")

Pasos canalizaciones definidas


In [22]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construir la canalización
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline creado.")

# Crear un experimento y ejecutar la canalización
experiment = Experiment(workspace=ws, name = 'nuclio-vino-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline preparado para ejecución.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Pipeline creado.
Created step Prepare Data [4cdbd917][6f926230-0aef-47a1-afa2-6b977e7a874b], (This step will run and generate new outputs)
Created step Train and Register Model [957d0ae1][553ce112-d89d-4106-a46a-3cc6148644cf], (This step will run and generate new outputs)
Submitted PipelineRun 36465eaa-cb34-45b0-848c-fc35434c9b95
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/36465eaa-cb34-45b0-848c-fc35434c9b95?wsid=/subscriptions/87f9793d-5515-43eb-b182-0f27b97da8b3/resourcegroups/nuclio/workspaces/aml-nuclio&tid=e93a2455-92c2-4ff2-add1-a6a39d490ed6
Pipeline preparado para ejecución.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: 36465eaa-cb34-45b0-848c-fc35434c9b95
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/36465eaa-cb34-45b0-848c-fc35434c9b95?wsid=/subscriptions/87f9793d-5515-43eb-b182-0f27b97da8b3/resourcegroups/nuclio/workspaces/aml-nuclio&tid=e93a2455-92c2-4ff2-add1-a6a39d490ed6
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 600472e6-332b-466e-ac83-9823e74a2146
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/600472e6-332b-466e-ac83-9823e74a2146?wsid=/subscriptions/87f9793d-5515-43eb-b182-0f27b97da8b3/resourcegroups/nuclio/workspaces/aml-nuclio&tid=e93a2455-92c2-4ff2-add1-a6a39d490ed6
StepRun( Prepare Data ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_3ad6cc171d687853928a7108b41fa04c36f995cc961ef61c995c5010a12f0978_d.txt
2021-09-01T15:03:16Z Successfully mounted a/an Blobfuse File System at /mnt/batch/tasks/shared/LS_root/jobs/aml-nuclio/azureml/600472e6-332b-466e-ac83-9823e74a2146/mounts/workspaceblobstore

'Finished'

In [23]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

Train and Register Model :
	 Accuracy : 0.8273972602739726
	 AUC : 0.7675103656028722
	 ROC : aml://artifactId/ExperimentRun/dcid.f3d27d00-3d30-433b-944c-9640ec2f73eb/ROC_1630508870.png
Prepare Data :
	 observaciones : 4864
	 observaciones_procesadas : 4864


In [24]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

vino_model version: 5
	 Training context : Pipeline
	 AUC : 0.7675103656028722
	 Accuracy : 0.8273972602739726


vino_model version: 4
	 Training context : Compute cluster
	 AUC : 0.7571058569824701
	 Accuracy : 0.8273972602739726


diabetes_model version: 3
	 Training context : Compute cluster
	 AUC : 0.7571058569824701
	 Accuracy : 0.8273972602739726


vino_model version: 3
	 Training context : Tabular dataset
	 AUC : 0.7700142283878569
	 Accuracy : 0.8068493150684931


vino_model version: 2
	 Training context : Parameterized script
	 AUC : 0.7698252576116318
	 Accuracy : 0.8068493150684931


vino_model version: 1
	 Training context : Script
	 AUC : 0.7697446671335356
	 Accuracy : 0.8047945205479452


diabetes_model version: 2
	 Training context : Parameterized script
	 AUC : 0.8483198169063138
	 Accuracy : 0.774


diabetes_model version: 1
	 Training context : Script
	 AUC : 0.8484929598487486
	 Accuracy : 0.774


AutoML2c70a24fc0 version: 1




## Publicar la canalización

Después de crearlo y probarlo, se puede publicar como servicio REST.

In [25]:
# Publicar la canalización y ejecutarla
published_pipeline = pipeline_run.publish_pipeline(
    name="vino-training-pipeline", description="Entrenamiento modelo vino", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
vino-training-pipeline,894978d8-a257-48dd-be3e-322c5cbf4358,Active,REST Endpoint


Las canalizaciones publicadas tienen un endpoint que se puede ver en **Puntos de conexión** en [Azure Machine Learning studio](https://ml.azure.com)

In [26]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://francecentral.api.azureml.ms/pipelines/v1.0/subscriptions/87f9793d-5515-43eb-b182-0f27b97da8b3/resourceGroups/nuclio/providers/Microsoft.MachineLearningServices/workspaces/aml-nuclio/PipelineRuns/PipelineSubmit/894978d8-a257-48dd-be3e-322c5cbf4358


## Llamar al endpoint de la canalización

Para usar el endpoint, las aplicaciones de cliente necesitan hacer una llamada REST sobre HTTP. Esto necesita autentificación.

In [28]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Autentificación esta preparada.")

Autentificación esta preparada.


Ejecutamos la canalización asincronamente.

In [29]:
import requests

experiment_name = 'nuclio-vino-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'660a6776-fefc-4934-be39-c23aefc8cf93'

In [30]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 660a6776-fefc-4934-be39-c23aefc8cf93
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/660a6776-fefc-4934-be39-c23aefc8cf93?wsid=/subscriptions/87f9793d-5515-43eb-b182-0f27b97da8b3/resourcegroups/nuclio/workspaces/aml-nuclio&tid=e93a2455-92c2-4ff2-add1-a6a39d490ed6

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '660a6776-fefc-4934-be39-c23aefc8cf93', 'status': 'Completed', 'startTimeUtc': '2021-09-01T15:10:33.120135Z', 'endTimeUtc': '2021-09-01T15:10:34.973718Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'Unavailable', 'runType': 'HTTP', 'azureml.parameters': '{}', 'azureml.pipelineid': '894978d8-a257-48dd-be3e-322c5cbf4358'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://amlnuclio7384818405.blob.core.windows.net/azureml/ExperimentRun/dcid.660a6776-fefc-4934-be39-c23aefc8cf93/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=BwJ7etk5AVHKSXuKroRoWAK

'Finished'

## Calendar la canalización


In [31]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Ejecutar cada lunes a las 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-vino-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='nuclio-vino-pipeline', 
                                  recurrence=recurrence)
print('Canalización calendada.')

Canalización calendada.


In [32]:
schedules = Schedule.list(ws)
schedules

[Pipeline(Name: weekly-vino-training,
 Id: 68e3f4d6-449b-485b-931c-170447a01f6b,
 Status: Active,
 Pipeline Id: 894978d8-a257-48dd-be3e-322c5cbf4358,
 Pipeline Endpoint Id: None,
 Recurrence Details: Runs at 0:00 on Monday every Week)]

In [33]:
pipeline_experiment = ws.experiments.get('nuclio-vino-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

{'runId': '43f1599a-cb2d-4234-b0fc-250a5adcaa4c',
 'status': 'Completed',
 'startTimeUtc': '2021-09-01T15:12:14.513543Z',
 'endTimeUtc': '2021-09-01T15:12:16.32142Z',
 'properties': {'azureml.runsource': 'azureml.PipelineRun',
  'runSource': 'Unavailable',
  'runType': 'Schedule',
  'azureml.parameters': '{}',
  'azureml.pipelineid': '894978d8-a257-48dd-be3e-322c5cbf4358'},
 'inputDatasets': [],
 'outputDatasets': [],
 'logFiles': {'logs/azureml/executionlogs.txt': 'https://amlnuclio7384818405.blob.core.windows.net/azureml/ExperimentRun/dcid.43f1599a-cb2d-4234-b0fc-250a5adcaa4c/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=pLnOb8yqPerqcvGLPzOG439tABna6v2%2F%2F%2F5QDZQb7QQ%3D&st=2021-09-01T15%3A02%3A33Z&se=2021-09-01T23%3A12%3A33Z&sp=r',
  'logs/azureml/stderrlogs.txt': 'https://amlnuclio7384818405.blob.core.windows.net/azureml/ExperimentRun/dcid.43f1599a-cb2d-4234-b0fc-250a5adcaa4c/logs/azureml/stderrlogs.txt?sv=2019-07-07&sr=b&sig=II1iS9gQDOzCW7oebL0HyUjVGXkB6S6%2Fh2uaxYb91qI%