# Create a batch pipeline to execute R model with a change-based trigger

## Check Azure ML SDK

In [None]:
import azureml.core
# check core SDK version number
print("Azure ML SDK Version: ", azureml.core.VERSION)

## Get Azure ML Workspace

In [None]:
from azureml.core import Workspace
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

## Get Compute Cluster

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# choose a name for your cluster
cluster_name = "cpu-cluster"

try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target')
except ComputeTargetException:
    print('Compute target not found')

## Configure the environment
Using a `DockerFile` we got from this [link](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-commandstep-r.ipynb)

In [None]:
from azureml.core import Environment
import os

env = Environment.from_dockerfile(name='r_env', dockerfile='Dockerfile')

## Configure the Pipeline Steps

In [None]:
from azureml.core import ScriptRunConfig, Datastore
from azureml.pipeline.core import PipelineParameter
from azureml.data.datapath import DataPath, DataPathComputeBinding
from azureml.pipeline.steps import CommandStep, PythonScriptStep
from azureml.data import OutputFileDatasetConfig
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

source_directory = '.'

# Get the Data Store
datastore = Datastore.get(workspace=ws, datastore_name="<your-data-store>")
datastore_output = ws.get_default_datastore()

# RDS Input Data parameter
input_rds_datapath = DataPath(datastore=datastore, path_on_datastore='r-pipeline-data/rds/accidents.Rd')
input_rds_data_pipeline_param = (PipelineParameter(name="input_rds_data", default_value=input_rds_datapath),
                             DataPathComputeBinding(mode='mount'))

# MP3 Input Data parameter
input_mp3_datapath = DataPath(datastore=datastore, path_on_datastore='r-pipeline-data/mp3')
input_mp3_data_pipeline_param = (PipelineParameter(name="input_mp3_data", default_value=input_mp3_datapath),
                             DataPathComputeBinding(mode='mount'))

from azureml.data import OutputFileDatasetConfig

output_data = OutputFileDatasetConfig(name="output_data", 
                                      destination=(datastore_output, "output_data/{run-id}/{output-name}")).as_upload()

train_config = ScriptRunConfig(source_directory=source_directory,
                            command=['Rscript Pipeline-Test.R '
                                     '--input_rds_data', input_rds_data_pipeline_param,
                                     '--input_mp3_data', input_mp3_data_pipeline_param,
                                     '--output_data', output_data
                                    ],
                            compute_target=compute_target,
                            environment=env)

# R Model
train = CommandStep(name='train', 
                    runconfig=train_config, 
                    allow_reuse=False, 
                    inputs=[input_rds_data_pipeline_param, input_mp3_data_pipeline_param],
                    outputs=[output_data]
                   )

# Persist data (python script)
run_config = RunConfiguration()
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['pandas', 'pyodbc'])

persist_data = PythonScriptStep(
       script_name="persist_data.py",
       arguments=["--raw_data", output_data.as_input('raw_data')],
       inputs=[output_data],
       compute_target=compute_target,
       source_directory=source_directory,
       runconfig=run_config
)

## Submit the pipeline

In [None]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

pipeline = Pipeline(workspace=ws, steps=[train, persist_data])

# Parameters to tests our pipeline
test_rds_path = DataPath(datastore=datastore, path_on_datastore='r-pipeline-data/rds/accidents.Rd')
test_mp3_path = DataPath(datastore=datastore, path_on_datastore='r-pipeline-data/mp3/file_example_MP3_5MG_01.mp3')

experiment_name = 'R-batch-scoring'
pipeline_run = Experiment(ws, experiment_name).submit(pipeline, 
                                                      pipeline_parameters={"input_rds_data": test_rds_path,
                                                                           "input_mp3_data": test_mp3_path 
                                                                          })

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

## Set secrets in KeyVault

In [None]:
keyvault = ws.get_default_keyvault()

username = '<your-user-name>'
password = '<your-password>'
database = '<your-database>'
server = '<your-server>'

keyvault.set_secret(name="username", value = username)
keyvault.set_secret(name="password", value = username)
keyvault.set_secret(name="database", value = database)
keyvault.set_secret(name="server", value = server)

## Methods to disable all the schedules and pipelines

In [None]:
def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

schedules = Schedule.list(ws)

for schedule in schedules:
    stop_by_schedule_id(ws, schedule.id)
    
def disable_all_pipelines(ws):
    published_pipelines = PublishedPipeline.list(ws)
    for published_pipeline in  published_pipelines:
        published_pipeline.disable()
        print(f"{published_pipeline.name},'{published_pipeline.id}' disabled")

## Disable all the schedules and pipelines (only if necessary)

In [None]:
from azureml.pipeline.core import PublishedPipeline
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

schedules = Schedule.list(ws)

for schedule in schedules:
    stop_by_schedule_id(ws, schedule.id)
    
disable_all_pipelines(ws)

## Publish the pipeline

In [None]:
published_pipeline = pipeline.publish(name='pipeline-batch-score',
                                      description='R batch pipeline')

## Create the change-based schedule

In [None]:
from azureml.data.datapath import DataPath

datastore = Datastore.get(ws, datastore_name='<your-datastore>')

reactive_schedule = Schedule.create(ws, 
                                    name="R-Schedule", 
                                    description="Based on input file change.",
                                    pipeline_id=published_pipeline.id, 
                                    experiment_name=experiment_name, 
                                    datastore=datastore,
                                    polling_interval=1,
                                    data_path_parameter_name="input_mp3_data",
                                    path_on_datastore='r-pipeline-data/mp3/' 
                                   )

## Get schedule list

In [None]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

ss = Schedule.list(ws)
for s in ss:
    print(s)
    print('....')