![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-schedule-for-a-published-pipeline.png)

# How to Setup a Schedule for a Published Pipeline


In [None]:
import azureml.core
from azureml.core import Workspace

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# 計算目標
取得預設的Azure機器學習計算

In [5]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.exceptions import ComputeTargetException
aml_compute = ws.get_default_compute_target("CPU")
#compute_target = ws.get_default_compute_target("C")
cpu_cluster_name = "cpucluster"

# Verify that cluster does not exist already
try:
    aml_compute_target = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print("Found existing cpucluster")
except ComputeTargetException:
    print("Creating new cpucluster")

Found existing cpucluster


# 構建和發布管道
建立一個簡單的管道，發布並添加運行時間表。

In [6]:
from azureml.pipeline.steps import PythonScriptStep

trainStep = PythonScriptStep(
    name="Training_Step",
    script_name="train.py", 
    compute_target=aml_compute_target, 
    source_directory='.'
)
print("TrainStep created")

TrainStep created


### Build the pipeline

In [7]:
from azureml.pipeline.core import Pipeline

pipeline1 = Pipeline(workspace=ws, steps=[trainStep])
print ("Pipeline is built")

Pipeline is built


### Publish the pipeline

In [8]:
from datetime import datetime

timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')

pipeline_name = timenow + "-Pipeline"
print(pipeline_name)

published_pipeline1 = pipeline1.publish(
    name=pipeline_name, 
    description=pipeline_name)
print("Newly published pipeline id: {}".format(published_pipeline1.id))

11-07-2019-05-18-Pipeline
Created step Training_Step [827ee528][e4c3736e-7042-4f63-bb79-53b71e7bf10e], (This step will run and generate new outputs)
Newly published pipeline id: 009465ce-62c4-4564-b404-3483dbec30af


## Schedule Operations
計劃操作需要已發布管道的ID。 可以獲取所有已發布管道並對其進行調度操作，或者，如果已經知道已發布管道的ID，則也可以直接使用它。

In [9]:
from azureml.pipeline.core import PublishedPipeline

# You could retrieve all pipelines that are published, or 
# just get the published pipeline object that you have the ID for.

# Get all published pipeline objects in the workspace
all_pub_pipelines = PublishedPipeline.get_all(ws)

# We will iterate through the list of published pipelines and 
# use the last ID in the list for Schelue operations: 
print("Published pipelines found in the workspace:")
for pub_pipeline in all_pub_pipelines:
    print(pub_pipeline.id)
    pub_pipeline_id = pub_pipeline.id

print("Published pipeline id to be used for Schedule operations: {}".format(pub_pipeline_id))



Published pipelines found in the workspace:
009465ce-62c4-4564-b404-3483dbec30af
Published pipeline id to be used for Schedule operations: 009465ce-62c4-4564-b404-3483dbec30af


### Create a schedule for the pipeline using a recurrence
This schedule will run on a specified recurrence interval.

In [10]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

recurrence = ScheduleRecurrence(frequency="Minute", interval=1) # Runs every minute

schedule = Schedule.create(workspace=ws, name="My_Schedule",
                           pipeline_id=pub_pipeline_id, 
                           experiment_name='Schedule_Run',
                           recurrence=recurrence,
                           wait_for_provisioning=True,
                           description="Schedule Run")

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

Provisioning status: Completed
Created schedule with id: cabda549-5829-4024-989b-3f11bfd4a20c


Note: Set the `wait_for_provisioning` flag to False if you do not want to wait for the call to provision the schedule in the backend.

### Get all schedules for a given pipeline
Once you have the published pipeline ID, then you can get all schedules for that pipeline.

In [11]:
schedules = Schedule.get_all(ws, pipeline_id=pub_pipeline_id)

# We will iterate through the list of schedules and 
# use the last recurrence schedule in the list for further operations: 
print("Found these schedules for the pipeline id {}:".format(pub_pipeline_id))
for schedule in schedules: 
    print(schedule.id)
    if schedule.recurrence is not None:
        schedule_id = schedule.id

print("Schedule id to be used for schedule operations: {}".format(schedule_id))



Found these schedules for the pipeline id 009465ce-62c4-4564-b404-3483dbec30af:
cabda549-5829-4024-989b-3f11bfd4a20c
Schedule id to be used for schedule operations: cabda549-5829-4024-989b-3f11bfd4a20c


### Get all schedules in your workspace
You can also iterate through all schedules in your workspace if needed.

In [12]:
# Use active_only=False to get all schedules including disabled schedules
schedules = Schedule.get_all(ws, active_only=True) 
print("Your workspace has the following schedules set up:")
for schedule in schedules:
    print("{} (Published pipeline: {}".format(schedule.id, schedule.pipeline_id))



Your workspace has the following schedules set up:
cabda549-5829-4024-989b-3f11bfd4a20c (Published pipeline: 009465ce-62c4-4564-b404-3483dbec30af


### Get the schedule

In [17]:
fetched_schedule = Schedule.get(ws, schedule_id)
print("Using schedule with id: {}".format(fetched_schedule.id))

Using schedule with id: cabda549-5829-4024-989b-3f11bfd4a20c


### Disable the schedule

In [16]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
fetched_schedule.disable(wait_for_provisioning=False)
fetched_schedule = Schedule.get(ws, schedule_id)
print("Disabled schedule {}. New status is: {}".format(fetched_schedule.id, fetched_schedule.status))

Disabled schedule cabda549-5829-4024-989b-3f11bfd4a20c. New status is: Disabled


### Reenable the schedule

In [None]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
fetched_schedule.enable(wait_for_provisioning=True)
fetched_schedule = Schedule.get(ws, schedule_id)
print("Enabled schedule {}. New status is: {}".format(fetched_schedule.id, fetched_schedule.status))

### Change recurrence of the schedule

In [None]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
recurrence = ScheduleRecurrence(frequency="Hour", interval=2) # Runs every two hours

fetched_schedule = Schedule.get(ws, schedule_id)

fetched_schedule.update(name="My_Updated_Schedule", 
                        description="Updated_Schedule_Run", 
                        status='Active', 
                        wait_for_provisioning=True,
                        recurrence=recurrence)

fetched_schedule = Schedule.get(ws, fetched_schedule.id)

print("Updated schedule:", fetched_schedule.id, 
      "\nNew name:", fetched_schedule.name,
      "\nNew frequency:", fetched_schedule.recurrence.frequency,
      "\nNew status:", fetched_schedule.status)

### Create a schedule for the pipeline using a Datastore
This schedule will run when additions or modifications are made to Blobs in the Datastore.
By default, the Datastore container is monitored for changes. Use the path_on_datastore parameter to instead specify a path on the Datastore to monitor for changes. Note: the path_on_datastore will be under the container for the datastore, so the actual path monitored will be container/path_on_datastore. Changes made to subfolders in the container/path will not trigger the schedule.
Note: Only Blob Datastores are supported.

In [None]:
from azureml.core.datastore import Datastore

datastore = Datastore(workspace=ws, name="workspaceblobstore")

schedule = Schedule.create(workspace=ws, name="My_Schedule",
                           pipeline_id=pub_pipeline_id, 
                           experiment_name='Schedule_Run',
                           datastore=datastore,
                           wait_for_provisioning=True,
                           description="Schedule Run")
                          #polling_interval=5, use polling_interval to specify how often to poll for blob additions/modifications. Default value is 5 minutes.
                          #path_on_datastore="file/path") use path_on_datastore to specify a specific folder to monitor for changes.

# You may want to make sure that the schedule is provisioned properly
# before making any further changes to the schedule

print("Created schedule with id: {}".format(schedule.id))

In [None]:
# Set the wait_for_provisioning flag to False if you do not want to wait  
# for the call to provision the schedule in the backend.
schedule.disable(wait_for_provisioning=True)
schedule = Schedule.get(ws, schedule_id)
print("Disabled schedule {}. New status is: {}".format(schedule.id, schedule.status))