# Azure Machine Learning and Pipeline SDK-specific imports

In [1]:
import os
from azureml.core import VERSION, Workspace
from azureml.data import HDFSOutputDatasetConfig
from azureml.core.datastore import Datastore
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import SynapseSparkStep

# Check core SDK version number
print(f"SDK: v{VERSION}")

SDK: v1.42.0


In [3]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, sep = '\n')

mldevorgrunprod
AZ-RG-DSL-MLDEVORGRUNPROD


## Prepare data

In [5]:
# Use the default blob storage
def_blob_store = Datastore(ws, "workspaceblobstore")
print('Datastore {} will be used'.format(def_blob_store.name))

# We are uploading a sample file in the local directory to be used as a datasource
file_name = "Titanic.csv"
def_blob_store.upload_files(files=[f"./data/{file_name}"], overwrite=False)

Datastore workspaceblobstore will be used
Uploading an estimated of 1 files
Target already exists. Skipping upload for Titanic.csv
Uploaded 0 files


$AZUREML_DATAREFERENCE_workspaceblobstore

## Configure datasets

In [7]:
from azureml.core import Dataset

print("tabular dataset as input")
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(def_blob_store, file_name)])
input1 = titanic_tabular_dataset.as_named_input("tabular_input")

print("file dataset as input")
titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])
input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

print("register output as file dataset")
from azureml.data import HDFSOutputDatasetConfig

output = HDFSOutputDatasetConfig(destination=(def_blob_store, "test")).register_on_complete(name="registered_dataset")

tabular dataset as input
file dataset as input
register output as file dataset


### Run pipeline.

In [None]:
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(def_blob_store, file_name)])
titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])

step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()
step1_output = HDFSOutputDatasetConfig(destination=(def_blob_store, "test")).register_on_complete(
    name="registered_dataset"
)

step2_input = step1_output.as_input("step2_input").as_download()

from azureml.core.environment import Environment

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.38.0")

step_1 = SynapseSparkStep(
    name="synapse-spark",
    file="dataprep.py",
    source_directory="./code",
    inputs=[step1_input1, step1_input2],
    outputs=[step1_output],
    arguments=["--tabular_input", step1_input1, "--file_input", step1_input2, "--output_dir", step1_output],
    compute_target='SparkPoolSmall',
    driver_memory="7g",
    driver_cores=4,
    executor_memory="7g",
    executor_cores=2,
    num_executors=1,
    environment=env,
)

pipeline = Pipeline(workspace=ws, steps=[step_1])
pipeline_run = pipeline.submit("synapse-pipeline", regenerate_outputs=True)

# get pipeline status
pipeline_run.wait_for_completion()
for step_run in pipeline_run.get_children():
    print(f"{step_run.name}: {step_run.get_metrics()}")