# Why use Azure machine learning(ML) pipelines to run your flows on the cloud?
In real-world scenarios, flows serve various purposes. For example, consider a flow designed to evaluate the relevance score for a communication session between humans and agents. Suppose you want to trigger this flow every night to assess today’s performance and avoid peak hours for LLM (Language Model) endpoints. In this common scenario, people often encounter the following needs:
- Handling Large Data Inputs: Running flows with thousands or millions of data inputs at once.
- Scalability and Efficiency: Requiring a scalable, efficient, and resilient platform to ensure success.
- Automations: Automatically triggering batch flows when upstream data is ready or at fixed intervals.

__Azure ML pipelines__ address all these offline requirements effectively. With the integration of prompt flows and Azure ML pipeline, flow users could very easily achieve above goals.

In this tutorial, you’ll learn:
- How to use python SDK to automatically convert your flow into a 'step' in Azure ML pipeline.
- How to feed your data into pipeline to trigger the batch flow runs.
- How to build other pipeline steps ahead or behind your prompt flow step. e.g. data preprocessing or result aggregation.
- How to setup a simple scheduler on my pipeline.
- How to deploy my pipeline to an Azure ML batch endpoint. Then I can invoke it with new data when needed.

Before you begin, consider the following prerequisites:
- Introduction to Azure ML Platform:
    - [Core site of Azure ML platform](https://learn.microsoft.com/en-us/azure/machine-learning/overview-what-is-azure-machine-learning?view=azureml-api-2).
    - Understand what [Azure ML pipelines](https://learn.microsoft.com/en-us/azure/machine-learning/concept-ml-pipelines?view=azureml-api-2) and [component](https://learn.microsoft.com/en-us/azure/machine-learning/concept-component?view=azureml-api-2) are.
- Azure cloud setup:
    - An Azure account with an active subscription - [Create an account for free](https://azure.microsoft.com/free/?WT.mc_id=A261C142F)
    - Create an Azure ML resource from Azure portal - [Create a Azure ML workspace](https://ms.portal.azure.com/#view/Microsoft_Azure_Marketplace/MarketplaceOffersBlade/searchQuery/machine%20learning)
    - Connect to your workspace then setup a basic computer cluster - [Configure workspace](../../configuration.ipynb)
- Local environment setup:
    - A python environment
    - Installed Azure Machine Learning Python SDK v2 - [install instructions](../../../README.md) - check the getting started section

# 1. Connect to Azure Machine Learning Workspace

The [workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace) is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning. In this section we will connect to the workspace in which the job will be run.

## 1.1 Import the required libraries

In [27]:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, load_component, Input, Output
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

## 1.2 Configure credential

We are using `DefaultAzureCredential` to get access to workspace. 
`DefaultAzureCredential` should be capable of handling most Azure SDK authentication scenarios. 

Reference for more available credentials if it does not work for you: [configure credential example](../../configuration.ipynb), [azure-identity reference doc](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity?view=azure-python).

In [28]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

DefaultAzureCredential failed to retrieve a token from the included credentials.
Attempted credentials:
	EnvironmentCredential: EnvironmentCredential authentication unavailable. Environment variables are not fully configured.
Visit https://aka.ms/azsdk/python/identity/environmentcredential/troubleshoot to troubleshoot this issue.
	ManagedIdentityCredential: ManagedIdentityCredential authentication unavailable, no response from the IMDS endpoint.
	SharedTokenCacheCredential: SharedTokenCacheCredential authentication unavailable. No accounts were found in the cache.
	AzureCliCredential: Failed to invoke the Azure CLI
	AzurePowerShellCredential: Az.Account module >= 2.2.0 is not installed
	AzureDeveloperCliCredential: Azure Developer CLI could not be found. Please visit https://aka.ms/azure-dev for installation instructions and then,once installed, authenticate to your Azure account using 'azd auth login'.
To mitigate this issue, please refer to the troubleshooting guidelines here at http

## 1.3 Get a handle to the workspace

We use 'config file' to connect to your workspace. Check [this notebook](../../configuration.ipynb) to get your config file from Azure ML workspace portal and paste it into this folder. Then if you pass the next code block, you've all set for the environment.

In [29]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cluster_name = "cpu-cluster"
print(ml_client.compute.get(cluster_name))

Found the config file in: D:\repo\promptflow-1\config.json


enable_node_public_ip: true
id: /subscriptions/ee85ed72-2b26-48f6-a0e8-cb5bcf98fbd9/resourceGroups/alainli-rg/providers/Microsoft.MachineLearningServices/workspaces/alainli-prs-eastus2/computes/cpu-cluster
identity:
  principal_id: a94960a4-23cc-460e-9b1b-66b30a36ea87
  tenant_id: 72f988bf-86f1-41af-91ab-2d7cd011db47
  type: system_assigned
idle_time_before_scale_down: 1800
location: eastus2
max_instances: 4
min_instances: 0
name: cpu-cluster
provisioning_state: Succeeded
size: STANDARD_D2_V2
ssh_public_access_enabled: true
tier: dedicated
type: amlcompute



# 2. Load flow as component

Suppose you already have a flow authored by Promptflow SDK or portal, you can find 'flow.dag.yaml' under the flow folder and we need this flow yaml spec to load your flow into an Azure ML component.

In [30]:
flow_component = load_component("../../flows/standard/web-classification/flow.dag.yaml")

With 'load_component' function and flow yaml spec, your flow is automatically converted into a __parallel component__. [Parallel component](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-use-parallel-job-in-pipeline?view=azureml-api-2&tabs=cliv2#why-are-parallel-jobs-needed) focuses on the offline large scale parallelized processing with efficiency and resilient. This auto converted component has:
 - Fixed input and output ports:

    ![prompt flow base component image](../../../docs/media/cloud/flow-in-pipeline/pf-base-component.png)
    | port name  |  type  | description |
    | ---------- | ------ | ----------- |
    | data | uri_folder or uri_file | Batch data input to your flow. This port accepts 4 file types: json, jsonl, csv, tsv. You could use 'uri_file' data type if your data is a single file or use 'uri_folder' with your folder path where contains all your file with same schema. The default data type would be 'jsonl' and you could overwrite this setting after declare a instance of this flow component in your pipeline. </br></br> Remark: Your data will be converted into dataframe which requires the column names to do the further mapping. Please make sure your csv or tsv data have the header line to initiate your data. |
    | flow_outputs | uri_file | The output file of all returns from every flow run. This single file has fixed name and extension: 'parallel_run_step.jsonl'. Every line in this data file is an json object of your flow returns plus an extra column 'line_number' to indicate its place from origin file. |
    | debug_info | uri_folder | Optional output port and only have data if you set your flow component run with debug mode. The data is mainly used for debugging purpose which contains all intermediate outputs between your flow steps for each of your lines. |

 - Input parameters which represent all your flow inputs and connections associate to your flow steps. Use 'web-classification' sample flow for example:
 
   ![prompt flow base component image](../../../docs/media/cloud/flow-in-pipeline/pf-component-parameters.png)

# 3. Build your pipeline
## 3.1 Declare input and output
To feed your pipeline with your data, you need declare an Input with `path`, `type`, and `mode` properties.
Pipeline output is not required to be declared. But if you need customized output path on cloud, you can follow the below example to set it path on datastore. Refer to [this doc](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-manage-inputs-outputs-pipeline?view=azureml-api-2&tabs=cli#path-and-mode-for-data-inputsoutputs) for more detail value of path.


In [31]:
data_input = Input(
    path="../../flows/standard/web-classification/data.jsonl", 
    type=AssetTypes.URI_FILE,
    mode="mount"
)

pipeline_output = Output(
    # Provide custom flow output file path if needed
    # path="azureml://datastores/<data_store_name>/paths/<path>",
    type=AssetTypes.URI_FILE,
    mode="rw_mount",
)

# 3.2.1 Build pipeline with single flow component
As all prompt flow component are based on Azure ML parallel component, user could leverage parallel component __run settings__ to control the parallelization of flow runs. Here are some useful settings as below:

| run settings | description | allowed values | default value |
| ------------ | ----------- | -------------- | ------------- |
| PF_INPUT_FORMAT | When you use `uri_folder` as the input data, this setting helps to define which file extension will be treated as data file to be initiated for flow runs. | json, jsonl, csv, tsv | jsonl |
| compute | Define which compute cluster from your Azure ML workspace will be used for this job. | | |
| instance_count | Define how many nodes from your compute cluster will be assigned to this job. | from 1 to node count of compute cluster. | 1 |
| max_concurrency_per_instance | Define how many dedicated processor to run flow in parallel on 1 node. Combine with 'instance_count' setting, the total parallelization of your flow would be instance_count*max_concurrency_per_instance.| >1 | 1 |
| mini_batch_size | Define the number of lines for each mini-batches. __Mini-batch__ is the basic granularity when process full date with parallelization. Every worker processor take one by one mini-batches and all workers work in parallel in different nodes. | > 0 | 1 |
| max_retries | Define the retry count if any mini-batch encounter inner exception. </br></br> Remark: the retry granulartiy is based on mini-batch. For instance, with previous setting, user can set 100 lines a mini-batch. When one line execution meets transient issue or un-handled exception, this 100 lines will be tried together even the rest 99 lines are successful. Additionaly, LLM response 429 will be handled internal flow runs for every LLM steps and it will not trigger mini-batch failure in most of the cases. | >= 0 | 3 |
| error_threshold | Define how many failures of lines could be acceptable. If the count of failed lines become higher than this threashold, the job will be stopped and marked as failed. Set '-1' to disable this failure check. | -1 or >=0 | -1 |
| mini_batch_error_threshold | Define how many failed mini-batches could be acceptable after all retries. Set '-1' to disable this failure check. | -1 or >=0 | -1 |
| logging_level | Define how parallel jobs save logs to disk. Set 'DEBUG' for flow component will allow the component to output flow intermediate logs into 'debug_info' port. | INFO, WARNING, DEBUG | INFO |
| timeout | Define the timeout checker for each mini-batch execution in millisecond. If a mini-batch runs longer than this threshold, that mini-batch will be stopped as failed then trigger the next retry. Consider to set a higher number according to your mini-batch size and total traffic throuputs for your LLM endpoints. | > 0 | 600 |


In [32]:
@pipeline()
def pipeline_func_with_flow(data):
    flow_node = flow_component(
        data=data,
        url="${data.url}",
        connections={
            "summarize_text_content": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
            "classify_with_llm": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
        },
    )

    flow_node.environment_variables = {
        "PF_INPUT_FORMAT": "jsonl",
        # "AZUREML_COLLIE_ENABLED": "true",
    }
    
    flow_node.compute = "cpu-cluster"
    flow_node.resources = { 'instance_count': 3 }
    flow_node.max_concurrency_per_instance = 2
    flow_node.mini_batch_size = 10
    flow_node.retry_settings = {
       "max_retries": 1,
       "timeout":1200,
    }

    flow_node.error_threshold = -1
    flow_node.mini_batch_error_threshold = -1
    flow_node.logging_level = "DEBUG"

    return {
        "flow_result_folder" : flow_node.outputs.flow_outputs
    }


# create pipeline instance
pipeline_job = pipeline_func_with_flow(data=data_input)
pipeline_job.outputs.flow_result_folder = pipeline_output


In [None]:
data_input = Input(
    path="../../flows/standard/web-classification/data.jsonl", type=AssetTypes.URI_FILE
)

data_prep_component = load_component("./components/data-prep/data-prep.yaml")
result_parser_component = load_component("./components/result-parser/result-parser.yaml")
flow_component = load_component("../../flows/standard/web-classification/flow.dag.yaml")
@pipeline()
def pipeline_func_with_flow(data):
    data_prep_node = data_prep_component(
        input_data_file=data,
    )
    data_prep_node.compute = "cpu-cluster"

    flow_node = flow_component(
        data=data_prep_node.outputs.output_data_folder,
        url="${data.url}",
        connections={
            "summarize_text_content": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
            "classify_with_llm": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
        },
    )

    flow_node.environment_variables = {"PF_INPUT_FORMAT": "csv"}
    flow_node.compute = "cpu-cluster"
    flow_node.resources = { 'instance_count': 3 }
    flow_node.outputs.flow_outputs.mode = "rw_mount"

    result_parser_node = result_parser_component(
        source_data=data_prep_node.outputs.output_data_folder,
        pf_output_data=flow_node.outputs.flow_outputs,
    )
    
    result_parser_node.compute = "cpu-cluster"

# create pipeline instance
pipeline_job = pipeline_func_with_flow(data=data_input)

## 3.2 Submit pipeline job

In [33]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job

Experiment,Name,Type,Status,Details Page
pipeline_samples,polite_pizza_qvfhq41gfj,pipeline,Preparing,Link to Azure Machine Learning studio


In [None]:
# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

# Next Steps
You can see further examples of running a pipeline job [here](../)

In [None]:
# test section 
import pandas as pd

input_data = pd.read_csv("./processed_data.csv")
output_data = pd.read_json("./parallel_run_step.jsonl", lines=True)
print(output_data.head())

if len(input_data) != len(output_data):
    raise Exception("Index mismatch between data source and result")

input_data = input_data.merge(output_data, how='left', left_index=True, right_on="line_number", suffixes=('', '_predict'))

print(input_data.head())

with open("./processed_data_with_predictions.jsonl", "w") as file:
    file.write(input_data.to_json(orient="records", lines=True))

