# Azure Machine Learning and Synapse integration (Private preview)
### Tutorial: Productionize machine learning pipeline by leveraging spark backed by Synapse

In this notebook, you learn the following tasks: 
* Leverage spark pools as compute target for Azure ML pipeline run
* Leverage spark pools as compute target for Azure ML experiment run

**Contents**:
* Package installation
* Get compute targets for pipeline run
* Submit a two-step pipeline run, including big data processing on spark cluster and training on AML compute instance.
* Submit an experiment job to spark pool.

**Please refer to "Run ML flow E2E in notebook" notebook for first time setup (for example, linking Synapse workspace and attaching Synapse Spark pools to Azure ML)**

#### Step1: Install/update Azure ML packages

Note: In private preview, please install the packages below. In public preview, this step will be removed. 

In [None]:
pip install -U "azureml-core<0.1.10" --index-url https://azuremlsdktestpypi.azureedge.net/SynapseInAml/ --extra-index-url https://pypi.python.org/simple

In [None]:
pip install -U "azureml-pipeline-core<0.1.10" --index-url https://azuremlsdktestpypi.azureedge.net/SynapseInAml/ --extra-index-url https://pypi.python.org/simple

In [None]:
pip install -U "azureml-pipeline-steps<0.1.10" --index-url https://azuremlsdktestpypi.azureedge.net/SynapseInAml/ --extra-index-url https://pypi.python.org/simple

**Please restart kernal once installation completes**

#### Step2: Get attached spark pool and compute instance

In [None]:
import datetime  
from azureml.core import Workspace, Experiment, Dataset, Environment,Datastore, LinkedWorkspace

ws = Workspace.from_config()

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# retrieve attached Synapse spark pool
synapse_compute = ws.compute_targets['<Synapse Spark pool alias in AML>']
synapse_compute

# Choose a name for your CPU cluster
cpu_cluster_name = "cpucluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

#### Step3: Prepare for pipeline input data

In [None]:
from azureml.core import Dataset
from azureml.data.dataset_factory import DataType

dataset_name="blob_ds"
try:
    dataset = Dataset.get_by_name(workspace=ws, name=dataset_name)
    print('Found existing dataset, use it.')
except:
    # create a TabularDataset from a delimited file behind a public web url and convert column "Survived" to boolean
    web_path ='https://dprepdata.blob.core.windows.net/demo/Titanic.csv'
    titanic_ds = Dataset.Tabular.from_delimited_files(path=web_path, set_column_types={'Survived': DataType.to_bool()})
    titanic_ds.register(ws,name=dataset_name)

#### Step4: Start a pipeline run 
View run logs and output in Azure Machine Learning Studio run detail page

In [None]:
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep, SynapseSparkStep
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

# Add training config including dependencies
train_run_config = RunConfiguration()
conda = CondaDependencies.create(
    pip_indexurl='https://azuremlsdktestpypi.azureedge.net/sdk-release/master/588E708E0DF342C4A80BD954289657CF',
    pip_packages=['azureml-sdk<0.1.1', 'azureml-dataprep[fuse,pandas]>=1.1.19', 'azureml-telemetry'],
    pin_sdk_version=False
)


conda.set_pip_option('--pre')
train_run_config.environment.python.conda_dependencies = conda

In [None]:
from azureml.data import HDFSOutputDatasetConfig

ds = Dataset.get_by_name(ws,name='blob_ds')
input1 = ds.as_named_input('synapseinput')

output1 = HDFSOutputDatasetConfig(
    "synapse_step_output", destination=(ws.datastores['<datastore name>'],"<folder name>")).register_on_complete(name="<registered dataset name>")

input2 = output1.as_input("input2").as_download()


step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'pyspark_job_pipeline.py',
                          source_directory=".", 
                          #arguments=[myinput, output1],
                          inputs=[input1],
                          outputs=[output1],
                          compute_target = synapse_compute,
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1)

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[input2],
                          inputs=[input2],
                          #outputs=[output2],
                          compute_target=cpu_cluster_name,
                          runconfig = train_run_config,
                          source_directory=".",
                          allow_reuse=False)

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('two_steps', regenerate_outputs=True)
      

#### Other option: start an experient run
View run logs and output in Azure Machine Learning Studio run detail page

In [None]:
from azureml.core import RunConfiguration
from azureml.data import HDFSOutputDatasetConfig
output = HDFSOutputDatasetConfig(
    "synapse_step_output",
    destination=(ws.datastores['<datastore name>'],"<folder name>")).register_on_complete(name="<registered dataset name")

run_config = RunConfiguration(framework="pyspark")
run_config.output_data = {output.name: output}

run_config.target = '<Spark pool alias in AML>'

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

from azureml.core import ScriptRunConfig 

script_run_config = ScriptRunConfig(source_directory = '.', 
                                    script= 'pyspark_job_exp.py', 
                                    arguments = ['args1','args2'], 
                                    run_config = run_config) 

from azureml.core import Experiment 
exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run