In [1]:
import inspect
from azureml.core import Workspace, Dataset
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.pipeline.core.graph import DataType
from azureml.pipeline.wrapper import Module, dsl, Pipeline
from azureml.core.authentication import InteractiveLoginAuthentication

In [2]:
subscription_id = '4f455bd0-f95a-4b7d-8d08-078611508e0b'
resource_group = 'fundamental'
workspace_name = 'fundamental4'
namespace=workspace_name # for loading module
# set this if you have multiple tenant
tenant_id="72f988bf-86f1-41af-91ab-2d7cd011db47"
interactive_auth = InteractiveLoginAuthentication(tenant_id=tenant_id)

workspace = Workspace(subscription_id, resource_group, workspace_name)
print(workspace.name, workspace.resource_group, workspace.location, workspace.subscription_id, workspace.compute_targets.keys(),sep = '\n')

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.
fundamental4
fundamental
eastus2euap
4f455bd0-f95a-4b7d-8d08-078611508e0b
dict_keys(['myaks', 'myaks2', 'myaks3', 'myaks4', 'myaks5', 'myaks6', 'myaks7', 'myaks8', 'aml-compute'])


In [3]:
# choose compute target
print(workspace.compute_targets)
aml_compute_name = 'aml-compute'
try:
    aml_compute = AmlCompute(workspace, aml_compute_name)
    print("Found existing compute target: {}".format(aml_compute_name))
except:
    print("Creating new compute target: {}".format(aml_compute_name))

    provisioning_config = AmlCompute.provisioning_configuration(vm_size="STANDARD_D2_V2",
                                                                min_nodes=1,
                                                                max_nodes=4)
    aml_compute = ComputeTarget.create(workspace, aml_compute_name, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
# print(aml_compute)

{'myaks': AksCompute(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=myaks, id=/subscriptions/4f455bd0-f95a-4b7d-8d08-078611508e0b/resourceGroups/fundamental/providers/Microsoft.MachineLearningServices/workspaces/fundamental4/computes/myaks, type=AKS, provisioning_state=Failed, location=eastus2euap, tags=None), 'myaks2': AksCompute(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=myaks2, id=/subscriptions/4f455bd0-f95a-4b7d-8d08-078611508e0b/resourceGroups/fundamental/providers/Microsoft.MachineLearningServices/workspaces/fundamental4/computes/myaks2, type=AKS, provisioning_state=Failed, location=eastus2euap, tags=None), 'myaks3': AksCompute(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=myaks3, id=/subscriptions/4

In [4]:
# register my own datatype
DataType.create_data_type(workspace, 'MyDirectory', description='', is_directory=True) # won't register repeatedly
DataType.create_data_type(workspace, 'MyFile', description='', is_directory=False)

<azureml.pipeline.core.graph.DataType at 0x1d2dc82e710>

In [5]:
# load data
dataset_name = 'THUCNews_TXT'
char2index_name = 'Char2Index_JSON'

if dataset_name not in workspace.datasets:
    print('Registering a THUCNews dataset for fasttext pipeline ...')
    path = ['https://datastore4fasttext.file.core.windows.net/data4fasttext/THUCNews.txt']
    data = Dataset.File.from_files(path=path)
    data.register(workspace=workspace, name=dataset_name, description='THUCNews dataset is generated by filtering and filtering historical data of Sina News RSS subscription channel from 2005 to 2011')
    print('Registerd')
data = workspace.datasets[dataset_name]

if char2index_name not in workspace.datasets:
    print('Registering a Char2Index_JSON for fasttext pipeline ...')
    path = ['https://datastore4fasttext.file.core.windows.net/data4fasttext/character2index.json']
    data = Dataset.File.from_files(path=path)
    data.register(workspace=workspace, name=char2index_name, description='The mapping relationship between character and index ')
    print('Registerd')
char2index = workspace.datasets[char2index_name]

print(data)
print(char2index)

FileDataset
{
  "source": [
    "('workspaceblobstore', 'UI/07-08-2020_062513_UTC/THUCNews.txt')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ],
  "registration": {
    "id": "88a58e42-425a-4f75-84ba-5a33f40ae804",
    "name": "THUCNews_TXT",
    "version": 1,
    "workspace": "Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental')"
  }
}
FileDataset
{
  "source": [
    "('workspaceblobstore', 'UI/07-08-2020_064227_UTC/character2index.json')"
  ],
  "definition": [
    "GetDatastoreFiles"
  ],
  "registration": {
    "id": "4881be3f-eae9-4d54-870b-fc65b8941049",
    "name": "Char2Index_JSON",
    "version": 1,
    "workspace": "Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental')"
  }
}


In [6]:
# load module
try:
    split_data_txt_module_func = Module.load(workspace=workspace, namespace=namespace, name='Split Data Txt')
    print('found split_data_txt_module')
except:
    print('not found split_data_txt_module, register it now...')
    yaml_file='split_data_txt/split_data_txt.spec.yaml'
    split_data_txt_module_func = Module.register(workspace=workspace, yaml_file=yaml_file)

try:
    split_data_txt_parallel_module_func = Module.load(workspace=workspace, namespace=namespace, name='Split Data Txt Parallel')
    print('found split_data_txt_parallel_module')
except:
    print('not found split_data_txt_parallel_module, register it now...')
    yaml_file='split_data_txt_parallel/split_data_txt_parallel.spec.yaml'
    split_data_txt_parallel_module_func = Module.register(workspace=workspace, yaml_file=yaml_file)

try:
    fasttext_train_module_func = Module.load(workspace=workspace, namespace=namespace, name='FastText Train')
    print('found fasttext_train_module')
except:
    print('not found fasttext_train_module, register it now...')
    yaml_file='fasttext_train/fasttext_train.spec.yaml'
    fasttext_train_module_func = Module.register(workspace=workspace, yaml_file=yaml_file)

try:
    fasttext_evaluation_module_func = Module.load(workspace=workspace, namespace=namespace, name='FastText Evaluation')
    print('found fasttext_evaluation_module')
except:
    print('not found fasttext_evaluation_module, register it now...')
    yaml_file='fasttext_evaluation/fasttext_evaluation.spec.yaml'
    fasttext_evaluation_module_func = Module.register(workspace=workspace, yaml_file=yaml_file)

try:
    fasttext_score_module_func = Module.load(workspace=workspace, namespace=namespace, name='FastText Score')
    print('found fasttext_score_module')
except:
    print('not found fasttext_score_module, register it now...')
    yaml_file='fasttext_score/fasttext_score.spec.yaml'
    fasttext_score_module_func = Module.register(workspace=workspace, yaml_file=yaml_file)

try:
    fasttext_score_parallel_module_func = Module.load(workspace=workspace, namespace=namespace, name='FastText Score Parallel')
    print('found fasttext_score_parallel_module')
except:
    print('not found fasttext_score_parallel_module, register it now...')
    yaml_file='fasttext_score_parallel/fasttext_score_parallel.spec.yaml'
    fasttext_score_parallel_module_func = Module.register(workspace=workspace, yaml_file=yaml_file)

try:
    compare_two_models_module_func = Module.load(workspace=workspace, namespace=namespace, name='Compare Two Models')
    print('found compare_two_models_module')
except:
    print('not found compare_two_models_module, register it now...')
    yaml_file='compare_two_models/compare_two_models.spec.yaml'
    compare_two_models_module_func = Module.register(workspace=workspace, yaml_file=yaml_file)



# inspect signature
# print(inspect.signature(split_data_txt_module_func))
# print(inspect.signature(split_data_txt_parallel_module_func))
# print(inspect.signature(fasttext_train_module_func))
# print(inspect.signature(fasttext_evaluation_module_func))
# print(inspect.signature(fasttext_score_module_func))
# print(inspect.signature(fasttext_score_parallel_module_func))
# print(inspect.signature(compare2model_module_func))

found split_data_txt_module
found split_data_txt_parallel_module
found fasttext_train_module
found fasttext_evaluation_module
found fasttext_score_module
found fasttext_score_parallel_module
found compare_two_models_module


In [7]:
# connect module
@dsl.pipeline(name='test deploy', description='Test parallel', default_compute_target=aml_compute_name)
def training_pipeline(epochs):
    split_data_txt_parallel = split_data_txt_parallel_module_func(
    input_dir = data,
    training_data_ratio = 0.7,
    validation_data_ratio = 0.1,
    random_split = True,
    seed = 9
    )

    fasttext_train = fasttext_train_module_func(
    training_data_dir = split_data_txt_parallel.outputs.training_data_output,
    validation_data_dir = split_data_txt_parallel.outputs.validation_data_output,
    char2index_dir = char2index,
    epochs = epochs,
    batch_size = 64,
    learning_rate = 0.0005,
    embedding_dim = 128
    )
    
    fasttext_score_parallel = fasttext_score_parallel_module_func(
    texts_to_score = split_data_txt_parallel.outputs.test_data_output,
    fasttext_model = fasttext_train.outputs.trained_model_dir,
    char2index_dir = char2index
    )
    fasttext_score_parallel.runsettings.configure(node_count=4, process_count_per_node=8, mini_batch_size=128)

    return {**fasttext_score_parallel.outputs, **fasttext_train.outputs}


In [8]:
split_data_txt_parallel = split_data_txt_parallel_module_func(
    input_dir = data,
    training_data_ratio = 0.7,
    validation_data_ratio = 0.1,
    random_split = True,
    seed = 8
    )

fasttext_train = fasttext_train_module_func(
    training_data_dir = split_data_txt_parallel.outputs.training_data_output,
    validation_data_dir = split_data_txt_parallel.outputs.validation_data_output,
    char2index_dir = char2index,
    epochs = 1,
    batch_size = 64,
    learning_rate = 0.0005,
    embedding_dim = 128
    )
type(fasttext_train)

azureml.pipeline.wrapper._module.Module

In [9]:
# pipeline
pipeline = training_pipeline(epochs=1)

In [10]:
# visualization
pipeline.validate()

<IPython.core.display.Javascript object>

ValidateView(container_id='container_id_8a2321f0-cf17-48a4-90c4-fb6f17c7e1d9_widget', env_json='{"subscription…

{'result': 'validation passed', 'errors': []}

In [11]:
# pipeline_draft = pipeline.save(
#     experiment_name='my test',
# )
# pipeline_draft

In [12]:
# run
run = pipeline.submit(experiment_name='parallel', pipeline_parameters={'epochs':1})
run.wait_for_completion()
run
#%


Submitted PipelineRun fc22254c-9793-4643-8bbb-7465087a9075
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/parallel/runs/fc22254c-9793-4643-8bbb-7465087a9075?wsid=/subscriptions/4f455bd0-f95a-4b7d-8d08-078611508e0b/resourcegroups/fundamental/workspaces/fundamental4
PipelineRunId: fc22254c-9793-4643-8bbb-7465087a9075
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/parallel/runs/fc22254c-9793-4643-8bbb-7465087a9075?wsid=/subscriptions/4f455bd0-f95a-4b7d-8d08-078611508e0b/resourcegroups/fundamental/workspaces/fundamental4


<IPython.core.display.Javascript object>

ValidateView(container_id='container_id_fb1b8a08-24be-4727-87d4-7f091fc15a56_widget', env_json='{}', graph_jso…

Experiment,Id,Type,Status,Details Page,Docs Page
parallel,fc22254c-9793-4643-8bbb-7465087a9075,azureml.PipelineRun,Completed,Link to Azure Machine Learning studio,Link to Documentation


In [13]:
type(run)

azureml.pipeline.wrapper._pipeline_run.PipelineRun

In [14]:
# model = run.register_model(model_name='fasttext_model',
#                            model_path='Trained_model_dir') #Trained_model_dir/BestModel
# print(model.name, model.id, model.version, sep='\t')

pipeline.outputs.trained_model_dir.AVAILABLE_MODE



type(run)




azureml.pipeline.wrapper._pipeline_run.PipelineRun

In [15]:
# model = run.register_model(model_name='fasttext_model',
#                            model_path='Trained_model_dir') #Trained_model_dir/BestModel

In [16]:
# run.pipeline.outputs.trained_model_dir.configure()

In [17]:
aaa=[]
for cr in run.get_children():
    aaa.append(cr)
    print(cr,'\n')

Run(Experiment: parallel,
Id: 7c2424d9-5ac4-45ff-b1fe-35397b540046,
Type: azureml.StepRun,
Status: Completed) 

Run(Experiment: parallel,
Id: 1236e2be-ef37-460b-9989-1587cf6146d4,
Type: azureml.StepRun,
Status: Completed) 

Run(Experiment: parallel,
Id: 989218ee-3cbd-4b32-851e-9ecd0a724c4f,
Type: azureml.StepRun,
Status: Completed) 



In [18]:
# run.child_run(name='FastText Train').register_model(model_name='BestModel', model_path='Trained_model_dir') #Trained_model_dir/BestModel
run.get_file_names()

['logs/azureml/executionlogs.txt',
 'logs/azureml/stderrlogs.txt',
 'logs/azureml/stdoutlogs.txt']

In [19]:
aaa[-2].get_file_names()

['Trained_model_dir',
 'azureml-logs/55_azureml-execution-tvmps_fb8f9aa0452e048f8b7501b72370916a334146e00208516abd44e37cb542dd04_d.txt',
 'azureml-logs/65_job_prep-tvmps_fb8f9aa0452e048f8b7501b72370916a334146e00208516abd44e37cb542dd04_d.txt',
 'azureml-logs/70_driver_log.txt',
 'azureml-logs/75_job_post-tvmps_fb8f9aa0452e048f8b7501b72370916a334146e00208516abd44e37cb542dd04_d.txt',
 'azureml-logs/process_info.json',
 'azureml-logs/process_status.json',
 'logs/azureml/executionlogs.txt',
 'logs/azureml/stderrlogs.txt',
 'logs/azureml/stdoutlogs.txt']

In [20]:
type(aaa[-2])

azureml.pipeline.core.run.StepRun

In [21]:
# aaa[-2].register_model(model_name='BestModel', model_path='Trained_model_dir', tags={'my_tags': 'fasttext'})

In [22]:
run.get_children()

<generator object Run._rehydrate_runs at 0x000001D2DDF5A888>

In [23]:
# register env for deployment to workspace
# from azureml.core.environment import Environment
# myenv = Environment.from_conda_specification(name = 'env_for_deployment',
#                                              file_path = 'deployment/env_for_deployment.yaml')
# myenv.register(workspace=workspace)

In [24]:
# from azureml.core.model import Model
# model = Model.get_model_path(model_name='fasttext', version=2)
# Model.deploy(workspace=workspace, name='my_deployment', models=[model], inference_config=None, deployment_config=None, deployment_target=None, overwrite=False)

In [25]:
# 3. Define inference configuration
# from azureml.core.environment import Environment
# from azureml.core.model import InferenceConfig


# myenv = Environment.get(workspace=workspace, name='env_for_deployment', version='2')
# inference_config = InferenceConfig(entry_script='deployment/scoring_for_deployment.py',
#                                    environment=myenv)
# inference_config                                   

In [26]:
# 创建k8s资源
# from azureml.core.compute import AksCompute, ComputeTarget

# # Use the default configuration (you can also provide parameters to customize this).
# # For example, to create a dev/test cluster, use:
# # prov_config = AksCompute.provisioning_configuration(cluster_purpose = AksCompute.ClusterPurpose.DEV_TEST)
# prov_config = AksCompute.provisioning_configuration()
# aks_name = 'myaks'
# # Create the cluster
# aks_target = ComputeTarget.create(workspace = workspace,
#                                     name = aks_name,
#                                     provisioning_configuration = prov_config)

# # Wait for the create process to complete
# aks_target.wait_for_completion(show_output = True)


In [27]:
# # Attach an existing AKS cluster
# from azureml.core.compute import AksCompute, ComputeTarget
# # Set the resource group that contains the AKS cluster and the cluster name
# resource_group = 'myresourcegroup'
# cluster_name = 'myexistingcluster'

# # Attach the cluster to your workgroup. If the cluster has less than 12 virtual CPUs, use the following instead:
# # attach_config = AksCompute.attach_configuration(resource_group = resource_group,
# #                                         cluster_name = cluster_name,
# #                                         cluster_purpose = AksCompute.ClusterPurpose.DEV_TEST)
# attach_config = AksCompute.attach_configuration(resource_group = resource_group,
#                                          cluster_name = cluster_name)
# aks_target = ComputeTarget.attach(ws, 'myaks', attach_config)

In [30]:
#####################################################
# register env for deployment to workspace
from azureml.core.environment import Environment
myenv = Environment.from_conda_specification(name = 'env_for_deployment',
                                             file_path = 'deployment/env_for_deployment.yaml')
myenv.register(workspace=workspace)



{
    "databricks": {
        "eggLibraries": [],
        "jarLibraries": [],
        "mavenLibraries": [],
        "pypiLibraries": [],
        "rcranLibraries": []
    },
    "docker": {
        "arguments": [],
        "baseDockerfile": null,
        "baseImage": "mcr.microsoft.com/azureml/intelmpi2018.3-ubuntu16.04:20200423.v1",
        "baseImageRegistry": {
            "address": null,
            "password": null,
            "registryIdentity": null,
            "username": null
        },
        "enabled": false,
        "platform": {
            "architecture": "amd64",
            "os": "Linux"
        },
        "sharedVolumes": true,
        "shmSize": null
    },
    "environmentVariables": {
        "EXAMPLE_ENV_VAR": "EXAMPLE_VALUE"
    },
    "inferencingStackVersion": null,
    "name": "env_for_deployment",
    "python": {
        "baseCondaEnvironment": null,
        "condaDependencies": {
            "channels": [
                "defaults"
            ],
         

In [34]:

from azureml.core.model import Model
# model = Model.get_model_path(model_name='BestModel', version=2)
Model.list(workspace)
# Model.deploy(workspace=workspace, name='my_deployment', models=[model], inference_config=None, deployment_config=None, deployment_target=None, overwrite=False)
    

[Model(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=sklearn_regression_model, id=sklearn_regression_model:1, version=1, tags={'area': 'diabetes', 'type': 'regression'}, properties={}),
 Model(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=BestModel, id=BestModel:2, version=2, tags={'my_tags': 'fasttext'}, properties={}),
 Model(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=BestModel, id=BestModel:1, version=1, tags={}, properties={}),
 Model(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=fasttext_model, id=fasttext_model:1, version=1, tags={}, properties={}),
 Model(workspace=Workspace.create(name='fundamental4', subs

In [41]:
# model = Model.get_model_path(model_name='BestModel', version=2, _workspace=workspace)  # 会把模型下载到本地的azureml-models目录下
# model

'azureml-models\\BestModel\\2\\Trained_model_dir'

In [42]:
# 3. Define inference configuration
from azureml.core.environment import Environment
from azureml.core.model import InferenceConfig


myenv = Environment.get(workspace=workspace, name='env_for_deployment', version='2')
inference_config = InferenceConfig(entry_script='deployment/scoring_for_deployment.py',
                                   environment=myenv)
inference_config      

InferenceConfig(entry_script=deployment/scoring_for_deployment.py, runtime=None, conda_file=None, extra_docker_file_steps=None, source_directory=None, enable_gpu=None, base_image=None, base_image_registry=<azureml.core.container_registry.ContainerRegistry object at 0x000001D2DFB19B70>)

In [45]:
model = aaa[-2].register_model(model_name='BestModel', model_path='Trained_model_dir', tags={'my_tags': 'fasttext'})
model

Model(workspace=Workspace.create(name='fundamental4', subscription_id='4f455bd0-f95a-4b7d-8d08-078611508e0b', resource_group='fundamental'), name=BestModel, id=BestModel:3, version=3, tags={'my_tags': 'fasttext'}, properties={})

In [46]:
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import Model

deployment_config = AciWebservice.deploy_configuration(cpu_cores = 1, memory_gb = 1)
service = Model.deploy(workspace=workspace, name="my-deployment", models=[model], inference_config=inference_config, deployment_config=deployment_config)
service.wait_for_deployment(show_output = True)
print(service.state)

Running......................................................................................................................................................................................................................................................................................................................................................................................................................
TimedOut
ERROR - Service deployment polling reached non-successful terminal state, current service state: Unhealthy
Operation ID: 7c31321a-4935-4709-abfe-65609391f7b0
More information can be found using '.get_logs()'
Error:
{
  "code": "DeploymentTimedOut",
  "statusCode": 504,
  "message": "The deployment operation polling has TimedOut. The service creation is taking longer than our normal time. We are still trying to achieve the desired state for the web service. Please check the webservice state for the current webservice health. You can run print(service.state) from the pytho

WebserviceException: WebserviceException:
	Message: Service deployment polling reached non-successful terminal state, current service state: Unhealthy
Operation ID: 7c31321a-4935-4709-abfe-65609391f7b0
More information can be found using '.get_logs()'
Error:
{
  "code": "DeploymentTimedOut",
  "statusCode": 504,
  "message": "The deployment operation polling has TimedOut. The service creation is taking longer than our normal time. We are still trying to achieve the desired state for the web service. Please check the webservice state for the current webservice health. You can run print(service.state) from the python SDK to retrieve the current state of the webservice."
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Service deployment polling reached non-successful terminal state, current service state: Unhealthy\nOperation ID: 7c31321a-4935-4709-abfe-65609391f7b0\nMore information can be found using '.get_logs()'\nError:\n{\n  \"code\": \"DeploymentTimedOut\",\n  \"statusCode\": 504,\n  \"message\": \"The deployment operation polling has TimedOut. The service creation is taking longer than our normal time. We are still trying to achieve the desired state for the web service. Please check the webservice state for the current webservice health. You can run print(service.state) from the python SDK to retrieve the current state of the webservice.\"\n}"
    }
}

In [47]:
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import Model

deployment_config = AciWebservice.deploy_configuration(cpu_cores = 1, memory_gb = 1)
service = Model.deploy(workspace=workspace, name="my-deployment", models=[model], inference_config=inference_config, deployment_config=deployment_config)
service.wait_for_deployment(show_output = True)
print(service.state)

ERROR - Error, there is already a service with name my-deployment found in workspace fundamental4



WebserviceException: WebserviceException:
	Message: Error, there is already a service with name my-deployment found in workspace fundamental4
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Error, there is already a service with name my-deployment found in workspace fundamental4"
    }
}

In [None]:
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import Model

deployment_config = AciWebservice.deploy_configuration(cpu_cores = 2, memory_gb = 2)
service = Model.deploy(workspace=workspace, name="my-deployment2", models=[model], inference_config=inference_config, deployment_config=deployment_config)
service.wait_for_deployment(show_output = True)
print(service.state)