In [10]:
from azureml.core import Workspace, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.pipeline.wrapper import Module, dsl

In [11]:
subscription_id = '4f455bd0-f95a-4b7d-8d08-078611508e0b'
resource_group = 'fundamental'
workspace_name = 'fundamental3'
tenant_id = "72f988bf-86f1-41af-91ab-2d7cd011db47"
# for loading module
namespace = workspace_name
experiment_name = 'parallel'
interactive_auth = InteractiveLoginAuthentication(tenant_id=tenant_id)
workspace = Workspace(subscription_id, resource_group, workspace_name)
print(workspace.name, workspace.resource_group, workspace.location, workspace.subscription_id,
      workspace.compute_targets.keys(), sep='\n')

fundamental3
fundamental
eastasia
4f455bd0-f95a-4b7d-8d08-078611508e0b
dict_keys(['myaks2', 'aml-compute', 'my-compute', 'compute-deploy'])


In [12]:
aml_compute_name = 'aml-compute'
try:
    aml_compute = AmlCompute(workspace, aml_compute_name)
    print("Found existing compute target: {}".format(aml_compute_name))
except:
    print("Creating new compute target: {}".format(aml_compute_name))

    provisioning_config = AmlCompute.provisioning_configuration(vm_size="STANDARD_D2_V2", min_nodes=1, max_nodes=4)
    aml_compute = ComputeTarget.create(workspace, aml_compute_name, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Found existing compute target: aml-compute


In [13]:
# load data
dataset_name = "THUNews"
char2index_name = 'CharToIndex'
if dataset_name not in workspace.datasets:
    web_path = ['https://datastore4fasttext.blob.core.windows.net/data/dataset/']
    data = Dataset.File.from_files(path=web_path)
    data.register(workspace=workspace, name=dataset_name, description='THUCNews dataset is generated by filtering \
    and filtering historical data of Sina News RSS subscription channel from 2005 to 2011', create_new_version=True)
dataset = workspace.datasets[dataset_name]

if char2index_name not in workspace.datasets:
    print('Registering CharToIndex for fasttext pipeline ...')
    path = ['https://datastore4fasttext.blob.core.windows.net/data/map/']
    data = Dataset.File.from_files(path=path)
    data.register(workspace=workspace, name=char2index_name,
                  description='The mapping relationship between character and index ')
    print('Registerd')
char2index = workspace.datasets[char2index_name]

In [14]:
split_data_txt_for_parallel_module_func = Module.from_yaml(workspace, 'split_data_txt_for_parallel/split_data_txt_for_parallel.spec.yaml')
fasttext_train_module_func = Module.from_yaml(workspace, 'fasttext_train/fasttext_train.spec.yaml')
fasttext_score_parallel_module_func = Module.from_yaml(workspace, 'fasttext_score_parallel/fasttext_score_parallel.spec.yaml')

In [15]:
@dsl.pipeline(name='test parallel', description='Test parallel', default_compute_target=aml_compute.name)
def training_pipeline(epochs):
    split_data_txt_parallel = split_data_txt_for_parallel_module_func(
        input_dir=dataset,
        training_data_ratio=0.7,
        validation_data_ratio=0.2
    )

    fasttext_train = fasttext_train_module_func(
        training_data_dir=split_data_txt_parallel.outputs.training_data_output,
        validation_data_dir=split_data_txt_parallel.outputs.validation_data_output,
        char2index_dir=char2index,
        epochs=epochs
    )

    fasttext_score_parallel = fasttext_score_parallel_module_func(
        texts_to_score=split_data_txt_parallel.outputs.test_data_output,
        fasttext_model_dir=fasttext_train.outputs.trained_model_dir,
        char2index_dir=char2index
    )
    fasttext_score_parallel.runsettings.configure(node_count=2, process_count_per_node=4, mini_batch_size="128")

    return {**fasttext_score_parallel.outputs, **fasttext_train.outputs}

In [16]:
# pipeline
pipeline = training_pipeline(epochs=1)
# pipeline.save(experiment_name=experiment_name)

In [17]:
# validate
pipeline.validate()

<IPython.core.display.Javascript object>

ValidateView(container_id='container_id_8a6cf49d-ca9d-4ebc-af01-32270dd86223_widget', env_json='{"subscription…

{'result': 'validation passed', 'errors': []}

In [18]:
# pipeline_run
pipeline_run = pipeline.submit(experiment_name=experiment_name, regenerate_outputs=True)
# pipeline_run.wait_for_completion()

Submitted PipelineRun 9f29b68d-4391-4f5a-a753-5e49e57ae9d2
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/parallel/runs/9f29b68d-4391-4f5a-a753-5e49e57ae9d2?wsid=/subscriptions/4f455bd0-f95a-4b7d-8d08-078611508e0b/resourcegroups/fundamental/workspaces/fundamental3
