### Setup Libraries

In [1]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.widgets import RunDetails
 
from azureml.core import Dataset
 
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.core import PipelineRun, StepRun, PortDataReference
from azureml.pipeline.steps import PythonScriptStep
 
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
 
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
 
from azureml.core.model import Model
 
# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

SDK version: 1.12.0


### Initialize AzureML Workspace

In [2]:
ws = Workspace.from_config()

### Create Comput Target or use existing one

In [4]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
aml_compute = "automatecluster"

# Verify that cluster does not exist already
try:
    aml_compute = ComputeTarget(workspace=ws, name=aml_compute)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=4)
    aml_compute = ComputeTarget.create(ws, aml_compute, compute_config)

aml_compute.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


### Configure the training run's environment

In [37]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

# Create a new runconfig object
aml_run_config = RunConfiguration()

# Use the aml_compute you created above. 
aml_run_config.target = aml_compute

# Enable Docker
aml_run_config.environment.docker.enabled = True

# Set Docker base image to the default CPU-based image
aml_run_config.environment.docker.base_image = "mcr.microsoft.com/azureml/base:0.2.1"

# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
aml_run_config.environment.python.user_managed_dependencies = False

# Specify CondaDependencies obj, add necessary packages
aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=['pandas','scikit-learn'], 
    pip_packages=['azureml-dataset-runtime[fuse]', 'packaging', 'numpy==1.16.2','azureml-sdk'])

print ("Run configuration created.")

Run configuration created.


### Configure Datasets and Datastore

In [None]:
# ### Upload data to datastore and register datasets.
# from azureml.core import Datastore
# blob_datastore_name='kaggledatabook' # Name of the datastore to workspace
# container_name=os.getenv("BLOB_CONTAINER", "opendata") # Name of Azure blob container
# account_name=os.getenv("BLOB_ACCOUNTNAME", "kaggledatabook") # Storage account name
# account_key=os.getenv("BLOB_ACCOUNT_KEY", "QGmWeGNpXKFtmU7cnXW5Dg0LwX7L2SCbfjsZlBKKHHgsdhABgTfFo5Vh4ja3KTFdCfDrh7Q6n3SGpVlE4g/eXA==") # Storage account access key

# blob_datastore = Datastore.register_azure_blob_container(workspace=ws, 
#                                                          datastore_name=blob_datastore_name, 
#                                                          container_name=container_name, 
#                                                          account_name=account_name,
#                                                          account_key=account_key)

# blob_datastore.upload_files(files = ['./Bank.csv'], overwrite = True, show_progress = True)
# from azureml.core.dataset import Dataset
# bank_dataset = Dataset.Tabular.from_delimited_files(path=blob_datastore.path('Bank.csv'))
# bank_dataset

In [38]:
blob_datastore = Datastore.get(ws, "kaggledatabook")

In [82]:
from azureml.core.dataset import Dataset
bank_dataset = Dataset.get_by_name(ws, name='bank_dataset')

### Construct your pipeline steps

##### Data Prep Step

In [76]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

# python scripts folder
prepare_data_folder = './scripts/prepdata'

# Define output after cleansing step
cleansed_data = PipelineData("cleansed_data", datastore=blob_datastore).as_dataset()

print('Cleanse script is in {}.'.format(os.path.realpath(prepare_data_folder)))

# cleansing step creation
# See the cleanse.py for details about input and output
cleansingStep = PythonScriptStep(
    name="Cleanse Bank Marketing Data",
    script_name="prep.py", 
    arguments=["--output_cleanse", cleansed_data],
    inputs=[bank_dataset.as_named_input('bank_dataset')],
    outputs=[cleansed_data],
    compute_target=aml_compute,
    runconfig=aml_run_config,
    source_directory=prepare_data_folder,
    allow_reuse=True
)

print("cleansingStep created.")


Cleanse script is in /mnt/batch/tasks/shared/LS_root/mounts/clusters/adscompute/code/Users/prsing/BankMarketingAnalysis/ML_Pipelines/scripts/prepdata.
cleansingStep created.


##### Training model step

In [77]:
model_file = PipelineData("model_file", datastore=blob_datastore)

In [78]:
train_model_folder = './scripts/trainmodel'
trainmodel = PythonScriptStep(name="train_step",
                         script_name="./train.py", 
                         arguments=['--learning_rate', 0.01, 
                                    '--n_estimators', 600, 
                                    '--max_depth', 9,
                                    '--min_samples_split', 1200,
                                    '--min_samples_leaf', 60,
                                    '--subsample', 0.85,
                                    '--random_state', 10,
                                    '--max_features', 7,
                                    '--model',model_file],
                         inputs= [cleansed_data.parse_parquet_files(file_extension=None)],
                         outputs=[model_file],                         
                         compute_target=aml_compute, 
                         runconfig=aml_run_config,
                         source_directory=train_model_folder,
                         allow_reuse=True)

##### Combine steps and submit the pipeline

In [79]:
steps = [cleansingStep,trainmodel]

In [80]:
pipeline1 = Pipeline(workspace=ws, steps=steps)

In [81]:
pipeline_run1 = Experiment(ws, 'Bank_Marketing').submit(pipeline1, regenerate_outputs=False)

Created step Cleanse Bank Marketing Data [5d082efb][091249ef-40a6-4d2e-a29f-79c3d68a49ac], (This step is eligible to reuse a previous run's output)
Created step train_step [f4cb3f05][03c8531d-4a30-46c1-8a16-de577be0a842], (This step will run and generate new outputs)
Submitted PipelineRun 62f1d8fb-feb3-4824-bceb-3ff7d24cbad7
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Bank_Marketing/runs/62f1d8fb-feb3-4824-bceb-3ff7d24cbad7?wsid=/subscriptions/ab8f5415-63b3-4fd4-8a8a-9213316abb6e/resourcegroups/ADS_Book/workspaces/ADS_AMLworkspace


##### Publishing the pipeline

In [87]:
published_pipeline1 = pipeline_run1.publish_pipeline(
     name="BankMarketing_GB_Pipeline",
     description="Predict Term Deposit pipeline on bank marketing dataset",
     version="1.0")

In [88]:
published_pipeline1.id

'3834a1a6-70ca-4146-b559-0665c88d5afe'

##### Schedule a pipeline

In [91]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

recurrence = ScheduleRecurrence(frequency="Minute", interval=5)
recurring_schedule = Schedule.create(ws, name="bankmarketing_recurring5min", 
                            description="Based on time",
                            pipeline_id="3834a1a6-70ca-4146-b559-0665c88d5afe", 
                            experiment_name="Bank_Marketing", 
                            recurrence=recurrence)

##### Deactivate the Pipeline schedule

In [92]:
ss = Schedule.list(ws)
for s in ss:
    print(s)

Pipeline(Name: bankmarketing_recurring5min,
Id: 544dffab-e172-422f-a99e-9f7ae537b367,
Status: Active,
Pipeline Id: 3834a1a6-70ca-4146-b559-0665c88d5afe,
Recurrence Details: Runs every 5 Minutes)


In [93]:
def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

stop_by_schedule_id(ws, "544dffab-e172-422f-a99e-9f7ae537b367")

Name,Id,Status,Pipeline Id,Recurrence Details
bankmarketing_recurring5min,544dffab-e172-422f-a99e-9f7ae537b367,Disabled,3834a1a6-70ca-4146-b559-0665c88d5afe,Runs every 5 Minutes


In [94]:
ss = Schedule.list(ws)
for s in ss:
    print(s)