# Pipelines

En Azure ML, las tareas se ejecutan como experimentos utilizando recursos informáticos y datos.

Para los procesos empresariales de ciencia de datos, se recomienda dividir el proceso en tareas individuales y orquestarlas con Pipelines (secuencias de pasos conectados).

**Los Pipelines son clave para implementar una solución MLOps efectiva.**

- Aclaración sobre el término "Pipeline"
    
    El término "Pipeline" se usa mucho en ML con significados diferentes.

    - **Scikit-learn**: enlaza preprocesamiento de datos con algoritmos de entrenamiento.
    - **Azure DevOps**: automatiza tareas de compilación y configuración de software.
    
    Es posible tener un Pipeline de Azure DevOps que ejecute un Pipeline de Azure ML, el cual puede incluir pasos para que entrene un modelo basado en un Pipeline de Scikit-learn.

1. Concepto de Pipeline
    - Es un flujo de trabajo compuesto de tareas de aprendizaje automático.
    - Cada tarea se implementa como un paso (secuencial o paralelo).
    - Permite construir lógica de flujo sofisticada para organizar operaciones de aprendizaje automático.
    
2. Ejecución de pasos
    - Cada paso se ejecuta en un objetivo de cómputo específico.
    - Se pueden combinar diferentes tipos de procesamiento para lograr un objetivo general.

3. Ejecución del Pipeline
    - Se ejecuta como un experimento.
    - Cada paso se ejecuta en su objetivo asignado como parte del experimento.

4. Componentes de un Pipeline
    - Consta de uno o más pasos que realizan tareas.
    - Azure ML admite muchos tipos de pasos:
      - `PythonScriptStep`: Ejecuta un script de Python específico.
      - `DataTransferStep`: Copia datos entre almacenes mediante Azure Data Factory.
      - `DataBrickStep`: Ejecuta un script de notebook o un JAR compilado en un cluster de Databricks.
      - `AdlaStep`: Ejecuta un trabajo de SQL en Azure Data Lake Analytics.
      - `ParallelRunStep`: Ejecuta un script de Python como una tarea distribuida en múltiples nodos de cómputo.
      - Ver la documentación para una lista completa de tipos de pasos compatibles.

5. Creación de un Pipeline
   - Se requiere definir cada paso primero.
   - Luego se crea un pipeline que incluye los pasos.
   - La configuración específica de cada paso depende del tipo.
     - Ejemplo: definir dos pasos de script de Python para preparar datos y entrenar un modelo.
   - Una vez definidos los pasos, se asignan al pipeline y se ejecutan como un experimento.
  
6. Flujo de trabajo
    - Un pipeline suele tener pasos que dependen de la salida de pasos anteriores.
      - Ejemplo: un script preprocesando datos (paso 1) usados luego para entrenar un modelo (paso 2).

7. Objeto de configuración de conjunto de datos de archivo de salida
    - Objeto especial que referencia una ubicación para almacenamiento intermedio de datos.
    - Crea una dependencia de datos entre pasos del pipeline.
    - Actúa como un almacenamiento intermedio para pasar datos entre pasos.

8. Pasando datos entre los pasos
   - Se usa el objeto de configuración de conjunto de datos de archivo de salida.
   - Debes definir un objeto con nombre que referencie una ubicación en un almacén de datos.
   - Si no se especifica un almacén, se usa el predeterminado.
   - Pasa el objeto como argumento de script en pasos que ejecutan scripts.
   - Incluye código en esos scripts para escribir o leer datos del objeto de argumento.

#### 1. OutputFileDatasetConfig pasos entre Inputs y Outputs

Para utilizar un objeto `OutputFileDatasetConfig` para pasar datos entre los pasos, se debe:

1. Definir un objeto `OutputFileDatasetConfig` con un nombre que haga referencia a una ubicación en un almacén de datos. Si no se especifica un almacén de datos explicitamente, se utilizará el almacén de datos predeterminado.
2. Pasar el objeto `OutputFileDatasetConfig` como argumento en los pasos que ejecutan scripts.
3. Incluya código en esos scripts para escribir en el argumento `OutputFileDatasetConfig` como salida o leerlo como entrada.

Por ejemplo, el siguiente código define un objeto `OutputFileDatasetConfig` que para los datos preprocesados que deben pasarse entre los pasos.

In [None]:
# Input

from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

data_store = ws.get_default_datastore()                                                        
prepped_data = OutputFileDatasetConfig('prepped')                                               # Creamos un objeto OutputFileDatasetConfig con la ubicación de salida


step1 = PythonScriptStep(name = 'prepare data',                                                 # Creamos el primer paso del pipeline para ejecutar data_prep.py
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         
                         # Los argumentos del script incluyen PipelineData
                         arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'),            # Pasamos el conjunto de datos como argumento
                                      '--out_folder', prepped_data])                            # Pasamos la ubicación de salida como argumento 


step2 = PythonScriptStep(name = 'train model',                                                  # Creamos el segundo paso del pipeline para ejecutar train_model.py                                                             
                         source_directory = 'scripts',
                         script_name = 'train_model.py',
                         compute_target = 'aml-cluster',
                         
                         # Pasamos como argumento del script
                         arguments=['--training-data', prepped_data.as_input()])                # Pasamos la ubicación de salida como argumento

En los propios scripts, se puede obtener una referencia al objeto `OutputFileDatasetConfig` desde el argumento, y utilizarlo como una carpeta local.

In [None]:
# Output

from azureml.core import Run
import argparse
import os

run = Run.get_context()

parser = argparse.ArgumentParser()                                                              # Creamos un objeto ArgumentParser para manejar los argumentos del script
parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id')
parser.add_argument('--out_folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()                                   # Obtenemos el conjunto de datos por su nombre y lo convertimos a un DataFrame de pandas

# code to prep data (in this case, just select specific columns)
prepped_df = raw_df[['col1', 'col2', 'col3']]

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)

#### 2. Reutilizar pasos de pipeline

Las canalizaciones con varios pasos de larga duración pueden tardar mucho tiempo en completarse. Azure ML incluye algunas características de almacenamiento en caché y reutilización para reducir estos tiempos.

- **Gestión de la reutilización de la salida de pasos**
  
    De forma predeterminada, la salida del paso de una ejecución de pipeline anterior se reutiliza sin volver a ejecutar el paso, siempre que el script, el directorio de origen y otros parámetros del paso no hayan cambiado. La reutilización de pasos puede reducir el tiempo que se tarda en ejecutar cada pipeline, pero tambien puede dar lugar a resultados obsoletos cuando no se han tenido en cuenta los cambios en los datos posteriores.

    Para controlar la reutilización de un paso individual, puede establecer el  parámetro `allow_reuse` en la configuración del paso, de la siguiente manera:

In [None]:
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         inputs=[raw_ds.as_named_input('raw_data')],
                         outputs=[prepped_data],
                         arguments = ['--folder', prepped_data]),
                         # Disable step reuse
                         allow_reuse = False)

- **Forzar la ejecución de todos los pasos**
    Cuando tiene varios pasos, podemos forzar la ejecución de todos ellos independientemente de la configuración de reutilización individual, estableciendo el parámetro `regenerate_outputs` al enviar el experimento de pipeline:

In [None]:
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)

#### 3. Publicar pipelines

Después de crear una canalización, puede publicarla para crear un punto de conexión REST a través del cual se pueda ejecutar la canalización a petición.

- **Publicación de una canalización**

    Para publicar una canalización, puede llamar a su  método de publicación:

In [None]:
published_pipeline = pipeline.publish(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

Como alternativa, puede llamar al método de publicación en una ejecución correcta de la canalización:

In [None]:
# Get the most recent run of the pipeline
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]

# Publish the pipeline from the run
published_pipeline = run.publish_pipeline(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

Una vez publicada la canalización, puede verla en Azure Machine Learning Studio. También puede determinar el URI de su punto de conexión de la siguiente manera:

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

- **Uso de una canalización publicada**

    Para iniciar un punto de conexión publicado, realice una solicitud HTTP a su punto de conexión REST, pasando un encabezado de autorización con un token para una entidad de servicio con permiso para ejecutar la canalización y una carga JSON que especifique el nombre del experimento. La canalización se ejecuta de forma asincrónica, por lo que la respuesta de una llamada REST correcta incluye el identificador de ejecución. Puede usarlo para realizar un seguimiento de la ejecución en Azure Machine Learning Studio.

Por ejemplo, el siguiente código de Python realiza una solicitud REST para ejecutar una canalización y muestra el identificador de ejecución devuelto.

In [None]:
import requests

response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline"})
run_id = response.json()["Id"]
print(run_id)

#### 4. Uso de parámetros de pipeline

Podemos aumentar la flexibilidad de una canalización definiendo parámetros.

- **Definición de parámetros para una canalización**

    Para definir parámetros para una canalización, cree un  objeto PipelineParameter para cada parámetro y especifique cada parámetro en al menos un paso.

    NOTA: Debemos definir los parámetros de una canalización antes de publicarla!

Por ejemplo, puede usar el siguiente código para incluir un parámetro para una tasa de regularización en el script utilizado por un estimador:

In [None]:
from azureml.pipeline.core.graph import PipelineParameter

reg_param = PipelineParameter(name='reg_rate', default_value=0.01)

...

step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         # Pass parameter as script argument
                         arguments=['--in_folder', prepped_data,
                                    '--reg', reg_param],
                         inputs=[prepped_data])

- **Ejecución de una canalización con un parámetro**

Después de publicar una canalización con parámetros, puede pasar valores de parámetro en la carga JSON para la interfaz REST:

In [None]:
response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline",
                               "ParameterAssignments": {"reg_rate": 0.1}})

#### 5. Programar Pipelines

Después de publicar una canalización, puede iniciarla a petición a través de su punto de conexión REST, o puede hacer que la canalización se ejecute automáticamente en función de una programación periódica o en respuesta a actualizaciones de datos.

- **Programación de una canalización para intervalos periódicos**

    Para programar una canalización para que se ejecute a intervalos periódicos, debe definir un ScheduleRecurrence que determine la frecuencia de ejecución y usarlo para crear un Schedule.

Por ejemplo, el código siguiente programa una ejecución diaria de una canalización publicada.

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(ws, name='Daily Training',
                                        description='trains model every day',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Training_Pipeline',
                                        recurrence=daily)

- **Desencadenar una ejecución de canalización en los cambios de datos**

    Para programar una canalización para que se ejecute cada vez que cambien los datos, debe crear una programación que supervise una ruta de acceso especificada en un almacén de datos, como se muestra a continuación:

In [None]:
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(ws, name='Reactive Training',
                                    description='trains model on data change',
                                    pipeline_id=published_pipeline_id,
                                    experiment_name='Training_Pipeline',
                                    datastore=training_datastore,
                                    path_on_datastore='data/training')