# Azure Machine Learning Pipeline with Databricks Step
Sample notebook demonstrating creation of an AML pipeline with a `DatabricksStep` that accepts two file datasets as inputs (one dataset is read from, the other written to).

### Import required packages

In [None]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DatabricksCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.pipeline.core.pipeline_output_dataset import PipelineOutputTabularDataset
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
from azureml.pipeline.steps import DatabricksStep
import logging
import os

### Connect to AML workspace, create compute cluster, and get reference to default datastore

In [None]:
# Connect to AML workspace
ws = Workspace.from_config()

#Select AML Compute Cluster
cpu_cluster_name = 'cpucluster'

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D3_V2',
                                                           min_nodes=1,
                                                           max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
#Get default datastore
default_ds = ws.get_default_datastore()

### Select a curated AML environment for pipeline execution
Retrieve an existing curated environment from AML and attach to a `RunConfiguration` to support pipeline execution. Alternatively, custom environments can be created and attached here. 

In [None]:
run_env = Environment.get(ws, 'AzureML-sklearn-1.0-ubuntu20.04-py38-cpu')
run_config = RunConfiguration()
run_config.environment  = run_env

### Define output datasets
Here we define two datasets `input_data` and `output_data` and define where these data should reside within our default datastore. These essentially become locations where data can be staged throughtout pipeline execution, and the provided path ensures that outputs will be overwritten on each pipeline execution.

In [None]:
input_data = OutputFileDatasetConfig(name='input', destination=(default_ds, 'sfct/input_data'))
output_data = OutputFileDatasetConfig(name='output', destination=(default_ds, 'sfct/output_data'))

### Define DatabricksCompute
`DatabricksCompute` can be defined using the code below. See [this document](https://learn.microsoft.com/en-us/python/api/azureml-core/azureml.core.compute.databrickscompute?view=azure-ml-py) for more details. 

In [None]:
db_compute_name=os.getenv("DATABRICKS_COMPUTE_NAME", "<YOUR-DATABRICKS-COMPUTE-NAME>") # Databricks compute name
db_resource_group=os.getenv("DATABRICKS_RESOURCE_GROUP", "<YOUR-RESOURCE-GROUP>") # Databricks resource group
db_workspace_name=os.getenv("DATABRICKS_WORKSPACE_NAME", "<YOUR-DATABRICKS-WORKSPACE-NAME>") # Databricks workspace name
db_access_token=os.getenv("DATABRICKS_ACCESS_TOKEN", "<YOUR-DATABRICKS-ACCESS-TOKEN>") # Databricks access token
 
try:
    databricks_compute = DatabricksCompute(workspace=ws, name=db_compute_name)
    print('Compute target {} already exists'.format(db_compute_name))
except ComputeTargetException:
    print('Compute not found, will use below parameters to attach new one')
    print('db_compute_name {}'.format(db_compute_name))
    print('db_resource_group {}'.format(db_resource_group))
    print('db_workspace_name {}'.format(db_workspace_name))
    print('db_access_token {}'.format(db_access_token))
 
    config = DatabricksCompute.attach_configuration(
        resource_group = db_resource_group,
        workspace_name = db_workspace_name,
        access_token= db_access_token)
    databricks_compute=ComputeTarget.attach(ws, db_compute_name, config)
    databricks_compute.wait_for_completion(True)

### Define pipeline steps
Below we define the steps which should be executed over the course of our pipeline run. Two `PythonScriptSteps` are run, (Step One and Step Three) and a Databricks notebook is executed in the middle.

In [None]:
step_one = PythonScriptStep(
    name='Step One',
    script_name='step_one.py',
    arguments =['--input_data', input_data, '--output_data', output_data],
    outputs=[input_data, output_data],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)

step_two = DatabricksStep(
    name="Step Two - Databricks",
    inputs=[input_data.as_input(name='input'), output_data.as_input(name='output')],
    num_workers=1,
    notebook_path='<PATH-TO-NOTEBOOK>',
    run_name='DBX_Notebook',
    compute_target=databricks_compute,
    existing_cluster_id='<DATABRICKS_CLUSTER_ID>',
    permit_cluster_restart=False,
    allow_reuse=False
)

step_three = PythonScriptStep(
    name='Step Three',
    script_name='step_three.py',
    inputs=[output_data.as_input(name='output_data').as_download('./output_data')],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=True,
    runconfig=run_config
)

step_three.run_after(step_two)

### Define pipeline
Below is the syntax for creating and running an Azure ML pipeline

In [None]:
steps = [step_one, step_two, step_three]
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline_run = Experiment(ws, 'DBX_Test').submit(pipeline)

pipeline_run.wait_for_completion()