In [None]:
import azureml.core
from azureml.core import Workspace, Experiment
from azureml.train.automl.run import AutoMLRun
from azureml.widgets import RunDetails

from azureml.core.compute import ComputeTarget
from azureml.core.runconfig import RunConfiguration, DEFAULT_CPU_IMAGE
from azureml.core.conda_dependencies import CondaDependencies
from azureml.data.data_reference import DataReference

from azureml.core import Dataset
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.core import Pipeline, PipelineRun
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineData

from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

print("AML SDK version:", azureml.core.VERSION)

# Load the workspace from a configuration file
ws = Workspace.from_config()

# Get a reference to our auto ml experiment
exp = Experiment(ws, 'HousingModel')

In [None]:
# Get a list of all previous runs in the experiment
runs = list(exp.get_runs()) 

# Get the latest automl run. Alternatively, runs[-1] gets the first run
raw_run = runs[0]

# Convert the basic `Run` into the richer `AutoMLRun`, to get some extra APIs
automl_run = AutoMLRun(exp, raw_run.id)

In [None]:
# Get the best output of our automl run..
best_run, best_model = automl_run.get_output()

# ..and register it in our Models repository
automl_run.register_model(model_name='HousePrices')

In [None]:
# Use Spock, the compute we created when experimenting with automated ml
compute = ComputeTarget(workspace=ws, name='Spock')
compute.wait_for_completion(show_output=True)

# Get a reference to our AmesHousing dataset..
ds = Dataset.get_by_name(ws, 'AmesHousing')
# ..and convert it to a pipeline input
full_ds = ds.as_named_input('full_ds')

# Define the step's output
fetch_data_param = PipelineData("fetched_data")

# Put it all together
fetch_step = PythonScriptStep(
    name="fetch_data",
    script_name="fetch.py",
    arguments=["--fetched_data", fetch_data_param],
    inputs=[full_ds],
    outputs=[fetch_data_param],
    compute_target=compute,
    source_directory='./fetch_data',
    allow_reuse=False
)


In [None]:
# Make sure to create the directory first
!mkdir fetch_data

In [None]:
%%writefile fetch_data/fetch.py

from azureml.core import Run

# Retrieve our input from the current run context
ds = Run.get_context().input_datasets['full_ds']
df = ds.to_pandas_dataframe()
print(df)

# Sample 10 houses and make sure to drop the target column
forecast_df = df.sample(10).drop(columns='SalePrice')
print(forecast_df)

# Parse the `fetched_data` argument, this is the location where we should save
# the output
parser = argparse.ArgumentParser()
parser.add_argument('--fetched_data', dest='fetched_data', required=True)
args = parser.parse_args()
print(args.fetched_data)

# Save the output, the AML pipeline infrastructure will take care
# of passing it to the next steps
forecast_df.to_csv(args.fetched_data, index=False)

In [None]:
# Define the step's output
predictions_param = PipelineData("predictions")

# Specify manually a configuration
run_config = RunConfiguration()
run_config.environment.docker.enabled = True
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
# Yes, I would like to be able to specify my dependencies thankyouverymuch
run_config.environment.python.user_managed_dependencies = False
# It might be a good idea to pin a specific version of the AML SDK here
conda = CondaDependencies()
conda.add_pip_package('azureml-sdk[automl]')
conda.add_pip_package('joblib')
conda.add_pip_package('xgboost==0.90')
run_config.environment.python.conda_dependencies = conda

# discuss allow reuse for first two steps
run_step = PythonScriptStep(
    name="run",
    script_name="run.py",
    arguments=["--fetched_data", fetch_data_param, "--predictions", predictions_param],
    inputs=[fetch_data_param],
    outputs=[predictions_param],
    compute_target=compute,
    runconfig = run_config,
    source_directory='./run',
    allow_reuse=False
)

In [None]:
!mkdir run

In [None]:
%%writefile run/run.py

from azureml.core import Run, Model, Workspace
import joblib
import pandas as pd

# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--fetched_data', dest='fetched_data', required=True)
parser.add_argument('--predictions', dest='predictions', required=True)
args = parser.parse_args()
print(args.fetched_data)
print(args.predictions)

# Read the input data
df = pd.read_csv(args.fetched_data)
print(df)

# Get the current context's workspace..
ws = Run.get_context().experiment.workspace
print(ws)

# ..in order to be able to retrieve a model from the repository..
model_ws = Model(ws, 'HousePrices')

# ..which we'll then download locally..
pickled_model_name = model_ws.download(exist_ok = True)

# ..and deserialize
model = joblib.load(pickled_model_name)
print(model)

# ..and use to predict the house prices
results = model.predict(df)
print(results)

# The predictions are stored in the `predictions` output path
# so that AML can find them and pass them to other steps
df['PredictedSalePrice'] = results
df.to_csv(args.predictions, index=False)

In [None]:
save_step = PythonScriptStep(
    name="save_predictions",
    script_name="save.py",
    arguments=["--predictions", predictions_param],
    inputs=[predictions_param],
    compute_target=compute,
    source_directory='./save_predictions',
    allow_reuse=False
)

In [None]:
!mkdir save_predictions

In [None]:
%%writefile save_predictions/save.py

from azureml.core import Run, Model, Workspace
import pandas as pd
import os

# Parse arguments and print the `predictions` input
parser = argparse.ArgumentParser()
parser.add_argument('--predictions', dest='predictions', required=True)
args = parser.parse_args()
print(args.predictions)

# Read the dataset
df = pd.read_csv(args.predictions)
print(df)

# Get a reference to the workspace's default data store, we'll use this
# to save the predictions
ws = Run.get_context().experiment.workspace
ds = ws.get_default_datastore()

# Create a folder and persist the predictions inside
os.mkdir('./out')
df.to_csv('./out/predictions.csv')

# Upload the folder to the workspace's default data store
ds.upload('./out', target_path='latest_predictions')

In [None]:
pipeline = Pipeline(workspace=ws, steps=[fetch_step, run_step, save_step])
pipeline.validate()
pipeline.submit('IRunPipelines')

In [None]:
# Publish the pipeline first, so that we can reference it when defining the schedule
published_pipeline = pipeline.publish()

# Run twice a day, every day
recurrence = ScheduleRecurrence(frequency="Day", interval=1, hours=[1, 13], minutes=[30])
recurring_schedule = Schedule.create(ws, name="DailySchedule", 
                            description="Twice a day, at 01:30 and 13:30",
                            pipeline_id=published_pipeline.id, 
                            experiment_name='IRunScheduledPipelines', 
                            recurrence=recurrence)

In [None]:
schedules = Schedule.list(ws, pipeline_id=published_pipeline.id)
schedules

In [None]:
# Disable/enable all schedules of a pipeline
for schedule in schedules:
    schedule.disable()
    #schedule.enable()