# Batch processing with Azure pipelines
Azure Machine Learning pipelines can either be created in the designer or with the python azureml API.
In this lab we are going to create a simple Azure pipeline for batch processing. The pipeline consists of two steps- preprocessing and scoring.
Be aware that we are going to use experimental features of azureml which should not be used in a productive environment.
Lets first import all needed packages:

In [41]:
import os
import pandas as pd
from azureml.core.model import Model
from azureml.core import Workspace
from azureml.core import Experiment

from azureml.core.dataset import Dataset
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.data.output_dataset_config import OutputFileDatasetConfig
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core import RunConfiguration

## Connect to workspace, set up dataset and compute
To have a more realistic setting we are not going to use our registered dataset, but the csv file with the raw credit data directly. Be aware, with this setting we are using our training data for prediction. This is just feasible for demonstration purpose, it is not something you would want to do in production. We create a DatasetConsumptionConfig for data input at the beginning of the pipeline. Two OutputFileDatasetConfig objects serve as intermediate and final location for the output files. The result_data will be registered as new dataset (batch-scoring-results) which is accomplished with the command register_on_complete.

In [42]:
ws = Workspace.from_config()
datastore = ws.get_default_datastore()
dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, 'german_credit_dataset.csv')])
input_data = DatasetConsumptionConfig("input_dataset", dataset)
intermediate_data = OutputFileDatasetConfig(name='intermediate_dataset', destination=(datastore, 'intermediate/{run-id}'))
result_data = OutputFileDatasetConfig(name='result_dataset', destination=(datastore, 'result/{run-id}')).register_on_complete('batch-scoring-results')




If the compute "batch-comp" is not available in your workspace, it will be created.

In [43]:
compute_name = 'batch-comp'

# checks to see if compute target already exists in workspace, else create it
if compute_name in ws.compute_targets:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
else:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_DS11_V2",
                                                   vm_priority="lowpriority",
                                                   min_nodes=1,
                                                   max_nodes=2)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

A run configuration based on the conda dependencies is automatically created.

In [44]:
conda_dep = CondaDependencies()
conda_dep.add_pip_package("scikit-learn==0.22")
config = RunConfiguration(conda_dependencies=conda_dep)
config

{
    "script": "train.py",
    "arguments": [],
    "target": "local",
    "framework": "Python",
    "communicator": "None",
    "maxRunDurationSeconds": null,
    "nodeCount": 1,
    "environment": {
        "name": null,
        "version": null,
        "environmentVariables": {
            "EXAMPLE_ENV_VAR": "EXAMPLE_VALUE"
        },
        "python": {
            "userManagedDependencies": false,
            "interpreterPath": "python",
            "condaDependenciesFile": null,
            "baseCondaEnvironment": null,
            "condaDependencies": {
                "name": "project_environment",
                "dependencies": [
                    "python=3.6.2",
                    {
                        "pip": [
                            "azureml-defaults",
                            "scikit-learn==0.22"
                        ]
                    }
                ],
                "channels": [
                    "anaconda",
                    "conda-forge"

## Prepare the pipeline steps
We create two PythonScriptStep objects. For each object we need to supply a python script. The scripts are prepared in the batch_script folder and we load them only to have a look at it. You can find different pipeline steps [here](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps?view=azure-ml-py).

In [45]:
with open("batch_scripts/preprocessing_step.py", "r") as f:
    print(f.read())

import os
import pickle
import joblib
import argparse
from azureml.core.run import Run
from azureml.core.model import Model

# load data
run = Run.get_context()
data = run.input_datasets['input_dataset'].to_pandas_dataframe()

# load model
ws = run.experiment.workspace
# model = Model(ws, 'german-credit-local-model').download(exist_ok=True)
pipeline_path = Model.get_model_path('german-credit-local-model')
pipeline = joblib.load(pipeline_path)

# preprocess
data.drop("Sno", axis=1, inplace=True)
X_raw = data.drop('Risk', axis=1)
out = pipeline['preprocessor'].transform(X_raw)

# save intermediate file
parser = argparse.ArgumentParser("preprocess")
parser.add_argument("--intermediate-data-path", type=str)
args = parser.parse_args()
if args.intermediate_data_path is not None:
    os.makedirs(args.intermediate_data_path, exist_ok=True)
    print(f"{args.intermediate_data_path} created")
pickle.dump(out, open(f"{args.intermediate_data_path}/preprocessed_features.pkl", "wb" ) )
data.to_csv(f

In [46]:
with open("batch_scripts/scoring_step.py", "r") as f:
    print(f.read())

import os
import pickle
import joblib
import argparse
import pandas as pd
from azureml.core.run import Run
from azureml.core.model import Model

# load arguments
parser = argparse.ArgumentParser("preprocess")
parser.add_argument("--intermediate-data-path", type=str)
parser.add_argument("--result-data-path", type=str)
args = parser.parse_args()

# load data
run = Run.get_context()
print(args.intermediate_data_path)
features = pickle.load(open(f'{args.intermediate_data_path}/preprocessed_features.pkl', "rb"))
data = pd.read_csv(f'{args.intermediate_data_path}/preprocessed_data.csv')

# load model
ws = run.experiment.workspace
# model = Model(ws, 'german-credit-local-model').download(exist_ok=True)
pipeline_path = Model.get_model_path('german-credit-local-model')
pipeline = joblib.load(pipeline_path)

# score
out = pipeline['classifier'].predict(features)

# save result file

if args.result_data_path is not None:
    os.makedirs(args.result_data_path, exist_ok=True)
    print(f"{args.resu

The two scripts, together with the locations and compute are given as inputs to the PythonScriptStep constructors. The allow_reuse flag will allow us to use the intermediate results from earlier runs, if there are any and the pipeline step has not changed since the last run.

In [47]:
preprocessing_step = PythonScriptStep(
    script_name="preprocessing_step.py",
    name='preprocessing_step',
    arguments=['--intermediate-data-path', intermediate_data],
    compute_target=compute_target,
    runconfig=config,
    inputs=[input_data],
    outputs=[intermediate_data],
    source_directory='./batch_scripts',
    allow_reuse=True
)
scoring_step = PythonScriptStep(
    script_name="scoring_step.py",
    name='scoring_step',
    arguments=['--intermediate-data-path', intermediate_data, '--result-data-path', result_data],
    compute_target=compute_target,
    runconfig=config,
    inputs=[intermediate_data],
    outputs=[result_data],
    source_directory='./batch_scripts'
)

## Run the pipeline
We can combine the steps to a whole pipeline and submit the pipeline as a new experiment run. You can find all logs in your workspace. The intermediate and final file locations and data can be found your Azure blob storage which was created automatically.

In [55]:
scoring_pipeline = Pipeline(workspace=ws, steps=[preprocessing_step, scoring_step])
pipeline_run = Experiment(ws, 'batch-score').submit(scoring_pipeline)
pipeline_run.wait_for_completion(show_output=False)

Created step preprocessing_step [c6418e91][7844aa30-ce1b-4ceb-801a-67f52d2f27a3], (This step will run and generate new outputs)
Created step scoring_step [848b1189][0decd8a9-c3f6-4294-8a82-157f3df0682c], (This step will run and generate new outputs)
Submitted PipelineRun cc0f09ef-e18a-4e66-8eeb-32cef1630041
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch-score/runs/cc0f09ef-e18a-4e66-8eeb-32cef1630041?wsid=/subscriptions/823af982-da0d-47e1-8124-3c00e4053556/resourcegroups/jrie_test/workspaces/aml_test
PipelineRunId: cc0f09ef-e18a-4e66-8eeb-32cef1630041
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/batch-score/runs/cc0f09ef-e18a-4e66-8eeb-32cef1630041?wsid=/subscriptions/823af982-da0d-47e1-8124-3c00e4053556/resourcegroups/jrie_test/workspaces/aml_test
{'runId': 'cc0f09ef-e18a-4e66-8eeb-32cef1630041', 'status': 'Completed', 'startTimeUtc': '2021-01-18T15:41:49.074019Z', 'endTimeUtc': '2021-01-18T15:45:33.278511Z', 'properties': {'az

'Finished'

As you are used from the designer, you can still monitor the pipeline during training in the experiments section (open the specific run) in your workspace
<img src="images/pipeline-steps.jpg" alt="Pipeline" style="width: 800px;"/>

## Results
Let us have a look at the resulting data. We can easily access the results from the registered dataset. The result was automatically registered as batch-scoring-results as defined at the output location creation above. For comparison we open the original credit risk set, that we have registered in lab 3. We can see the added column "prediction". Of course, in a real-life scenario, you would not have the "Risk" column i.e. unlabeled data.

In [56]:
dataset = Dataset.get_by_name(ws, name='batch-scoring-results', version = "latest")
df_path = dataset.download('data/batch_scoring_results', overwrite=True)
pd.read_csv(df_path[0]).head()

Unnamed: 0,Age,Sex,Job,Housing,Saving accounts,Checking account,Credit amount,Duration,Purpose,Risk,prediction
0,67,male,2,own,,little,1169,6,radio/TV,good,good
1,22,female,2,own,little,moderate,5951,48,radio/TV,bad,bad
2,49,male,1,own,little,,2096,12,education,good,good
3,45,male,2,free,little,little,7882,42,furniture/equipment,good,bad
4,53,male,2,free,little,little,4870,24,car,bad,good


In [57]:
dataset = Dataset.get_by_name(ws, name='german_credit_dataset', version = "latest")
ds_df = dataset.to_pandas_dataframe()
ds_df.head()


Unnamed: 0,Sno,Age,Sex,Job,Housing,Saving accounts,Checking account,Credit amount,Duration,Purpose,Risk
0,0,67,male,2,own,,little,1169,6,radio/TV,good
1,1,22,female,2,own,little,moderate,5951,48,radio/TV,bad
2,2,49,male,1,own,little,,2096,12,education,good
3,3,45,male,2,free,little,little,7882,42,furniture/equipment,good
4,4,53,male,2,free,little,little,4870,24,car,bad
