In [None]:
# Please type in the following variables
storage_account_name = "amldbxadls"     # ADLS account
container_name = "data"                 # ADLS container, make sure it exists
path_to_raw_data = 'nyctlcraw'          # path in ADLS container
path_to_cleaned_date = 'nyctlccleaned'  # path in ADLS container
datastore_name = 'adlsgen2store'        # Name of Datastore representing ADLS in Azure ML
adbcluster_name = 'dbxcluster'          # Name of databricks cluster registered in AML
amlcluster_name = 'amlcluster'          # Name of AML cluster

experiment_name = 'my-experiment'       # Name of the experiment for tracking in AML

In [None]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Environment
from azureml.core.compute import ComputeTarget, DatabricksCompute
from azureml.core.datastore import Datastore
from azureml.core.runconfig import RunConfiguration, JarLibrary, PyPiLibrary
from azureml.exceptions import ComputeTargetException
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import DatabricksStep, PythonScriptStep
from azureml.data.data_reference import DataReference
from azureml.data import OutputFileDatasetConfig

print("SDK version:", azureml.core.VERSION)
ws = Workspace.from_config()

In [None]:
# Create connections
adlsstore = ws.datastores[datastore_name]

rawdata = DataReference(datastore=adlsstore, path_on_datastore="nyctlcraw", data_reference_name="input")
cleaneddata = DataReference(datastore=adlsstore, path_on_datastore="nyctlccleaned", data_reference_name="output")
aggdata = OutputFileDatasetConfig(name="output", destination=(adlsstore, "nyctlccleaned")).as_upload()


In [None]:
# Create the databricks step

step1 = DatabricksStep(
    name="Data Aggregation",
    run_name='Aggregate data and store in cleaned dataset as parquet',
    # Inputs and Outputs
    inputs=[rawdata], 
    outputs=[aggdata],
    
    # Databricks Cluster
    compute_target=ws.compute_targets[adbcluster_name],
    spark_version='11.3.x-scala2.12', # don't leave blank, default is very old
    node_type='Standard_D4A_v4',  # specify if you know what you need, or leave blank
    num_workers=1, # for this task 1 or 2 is enough, if data is large, you may want a large cluster
    # alternatively, you can use an existing cluster
    # existing_cluster_id='',

    # Local file address
    source_directory='./databricks_step',
    python_script_name='main.py',

    # Use this option to save the results, when running pipeline without changes to this step
    allow_reuse=True,
    # you can install any package that you need to use in databricks cluster
    pypi_libraries= [PyPiLibrary('azureml-mlflow','')],
)

In [None]:
# Create the azureml step
source_directory = './azureml_step'
runconfig = RunConfiguration()
runconfig.environment = Environment.from_pip_requirements('myenv', source_directory+'/requirements.txt')

step2 = PythonScriptStep(
    name='DoSomething',
    script_name='main.py',
    source_directory=source_directory,
    arguments=['--input',aggdata.as_input()],
    compute_target=ws.compute_targets[amlcluster_name],
    runconfig=runconfig,
    allow_reuse=False, 
)

In [None]:
# Create the pipeline and submit

pipeline = Pipeline(workspace=ws, steps=[
    step1, 
    # step2,
])
pipeline_run = Experiment(ws, experiment_name).submit(pipeline)
pipeline_run.wait_for_completion()