#### Register modules

Follow [this guide](https://msdata.visualstudio.com/AzureML/_git/ModuleDocs?path=%2Fdocs%2Fcli%2Fa-quick-go-through.md&_a=preview) to install the Azure Machine Learning CLI for module registration;

Reigster the following 3 custom modules:
* https://github.com/hirofumi-s-friends/CustomModules/blob/master/azureml-custom-module-examples/wide-and-deep-recommender/train_mpi.yaml
* https://github.com/hirofumi-s-friends/CustomModules/blob/master/azureml-custom-module-examples/wide-and-deep-recommender/score_parallel.yaml
* https://github.com/hirofumi-s-friends/CustomModules/blob/master/azureml-custom-module-examples/wide-and-deep-recommender/convert_multi_parquet_to_dfd.yaml

#### Imports

In [16]:
from azureml.core import Experiment, Workspace, Dataset
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies
from azureml.pipeline.core import Pipeline, PipelineData, Module
from azureml.pipeline.steps import ModuleStep
from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.pipeline.core import PipelineParameter
from azureml.core.compute import ComputeTarget, AmlCompute

#### Definitions

In [3]:
workspace_name="Your workspace name"
resource_group_name="Your resource group name"
subscription_id="Your subscription id"
compute_name = "Your compute name"

#### Get workspace

In [18]:
ws = Workspace.get(name=workspace_name, resource_group=resource_group_name, subscription_id=subscription_id)

#### Get compute

In [19]:
try:
    compute = AmlCompute(ws, compute_name)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_NC6",
                                                                min_nodes = 1, 
                                                                max_nodes = 4)    
    compute = ComputeTarget.create(ws, compute_name, provisioning_config)
    compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
print("Azure Machine Learning Compute attached")

found existing compute target.
Azure Machine Learning Compute attached


#### Get or create input Dataset

In [22]:
def get_or_create_file_dataset(ws, name, local_folder):
    try:
        return Dataset.get_by_name(ws, name=name)
    except:
        path_on_datastore = f'/data/{name}'
        datastore = ws.get_default_datastore()
        datastore.upload(str(local_folder), target_path=path_on_datastore)
        datastore_paths = [(datastore, path_on_datastore + '/**')]
        dataset = Dataset.File.from_files(path=datastore_paths)
        dataset.register(ws, name=name, create_new_version=True)
        return Dataset.get_by_name(ws, name=name)

In [23]:
data_to_train = get_or_create_file_dataset(ws, name='train_dfd', local_folder='./sample_data/train_data_frame_directory').as_named_input('data').as_mount()
data_to_score = get_or_create_file_dataset(ws, name='test_pq', local_folder='./sample_data/test_parquet_files').as_named_input('data').as_mount()

#### Define pipeline data

In [24]:
trained_model = PipelineData(name='trained_model', datastore=ws.get_default_datastore())
scored_dfd = PipelineData(name='score_output', datastore=ws.get_default_datastore())
test_dfd = PipelineData(name='test_dfd', datastore=ws.get_default_datastore())
eval_result = PipelineData(name='eval_result', datastore=ws.get_default_datastore())

In [25]:
def get_module(ws, name, namespace):
    """A method to get the module from the specific namespace in the workspace. """
    return Module.get(ws, name=f"{namespace}://{name}")

#### Define MPI train module step

In [26]:
mpi_train_module = get_module(ws, name="MPI Train Wide and Deep Recommender",namespace='microsoft.com/azureml/samples')
mpi_train_step = ModuleStep(module=mpi_train_module,
                            inputs_map={'Training_dataset_of_user_item_rating_triples': data_to_train},
                            outputs_map={'Trained_Wide_and_Deep_recommendation_model': trained_model},
                            params={'Epochs': 12,
                                      'Batch size': 64,
                                      'Wide part optimizer': 'Adagrad',
                                      'Wide optimizer learning rate': 0.1,
                                      'Crossed feature dimension': 1000,
                                      'Deep part optimizer': 'Adagrad',
                                      'Deep optimizer learning rate': 0.1,
                                      'User embedding dimension': 16,
                                      'Item embedding dimension': 16,
                                      'Categorical features embedding dimension': 4,
                                      'Hidden units': '256,128',
                                      'Activation function': 'ReLU',
                                      'Dropout': 0.8,
                                      'Batch Normalization': 'True',
                                      'NodeCount': 2,
                                      'MpiProcessCountPerNode': 1,
                                      'Arguments': 'USE_STRUCTURED_ARGUMENTS'},
                            compute_target=compute)

#### Define parallel score module step

In [27]:
parallel_score_module = get_module(ws, name="Parallel Score Wide and Deep Recommender",namespace='microsoft.com/azureml/samples')
parallel_score_step = ModuleStep(module=parallel_score_module,
                                 inputs_map={'Trained_Wide_and_Deep_recommendation_model': trained_model,
                                             'Dataset_to_score':data_to_score},
                                 outputs_map={'Scored_dataset':scored_dfd},
                                 params={'Recommender prediction kind':'Rating Prediction',
                                         'Arguments': 'USE_STRUCTURED_ARGUMENTS'},
                                 compute_target=compute)

#### Define multi parquets convert module step

In [28]:
convert_parquet_module = get_module(ws, 
                                    name='Convert Multi Parquet Files to DataFrameDirectory', 
                                    namespace='microsoft.com/azureml/samples')
convert_parquet_step = ModuleStep(
    module=convert_parquet_module,
    inputs_map={'Input_path': data_to_score},
    outputs_map={'Output_path': test_dfd},
    params={'Arguments': 'USE_STRUCTURED_ARGUMENTS'},
    compute_target=compute)

#### Define evaluate recommender module step

In [29]:
evaluate_recommender_module = get_module(ws, name='Evaluate Recommender', namespace='azureml')
evaluate_recommender_step = ModuleStep(
    module=evaluate_recommender_module,
    inputs_map={'Scored_dataset': scored_dfd, 'Test_dataset': test_dfd},
    outputs_map={'Metric': eval_result},
    params={'Arguments': 'USE_STRUCTURED_ARGUMENTS'},
    compute_target=compute)

#### Submit pipeline

In [31]:
pipeline = Pipeline(ws, steps=[
        mpi_train_step,
        parallel_score_step,
        convert_parquet_step,
        evaluate_recommender_step,
    ])
exp = Experiment(ws, name='wide_and_deep_recommender').submit(pipeline)