# BHP Positive Comms Proof-of-Concept sample pipeline notebook 

This notebook outlines the process to build an AMLS pipeline to undertake the transcibing of the battle scenario.

The actions to be performed are:
 - Importing Python packages required to build a pipeline
 - Creating/Connecting to an AMLS workspace
 - Creating the experiment for the pipeline to use
 - Building the pipeline
 - Deploy the pipeline
 - Monitor the results

# Import Python packages

In [1]:
import warnings
warnings.filterwarnings('ignore')

# setup the current paths
import os, sys
from dotenv import load_dotenv, find_dotenv

currentDir = os.path.dirname(os.getcwd())
print(f'Current working directory: {currentDir}')
sys.path.append('../')
sys.path.append('./')

# azure ML core services
import azureml.core
from azureml.core import Experiment, Workspace

# Azure ML pipline 
from azureml.pipeline.core import Pipeline, PipelineParameter

# import from common setups & environments
from common.constants import *
from common.azureml_configuration import *
from common.general_utilities import *

# load the env variables from the hidden file
print('Loading environmental variables', load_dotenv(find_dotenv('./common/.env')))

Current working directory: /mnt/batch/tasks/shared/LS_root/mounts/clusters/nej-compute-private4/code/Users/nejhdeh.ghevondian/bhp-positive-comms-poc/positive-comms-experimentation/src
Azure ML version: 1.37.0
Loading environmental variables True


# Creating/Connecting to the AMLS workspace

Next step is to create (or connect to existing) the AMLS workspace where the experiment pipeline will be deployed.

In [None]:
# 1. configure the azure ml workspace
#------------------------------------
# get subscription id and other keys from .env file, Other constabst are from source files
TENANT_ID = os.environ.get('TENANT_ID')
SUBSCRIPTION_ID = os.environ.get('SUBSCRIPTION_ID')

DOCKER_FILE = 'docker_dependencies.yml'
DOCKER_FILE_PATH = f'../{PIPELINE_SCRIPT_PATH}/{DOCKER_FILE}'

# create a temp directory to store results with sub-foldrs
#---------------------------------------------------------
utilConfig = GeneraltUtilities()

print('Configuring the Azure ML services and workplace, pleat wait...')

dsp_inference_folders = f'{INFERENCE_PATH}{RESULTS_RECORDINGS_INFERENCE_DSP_PATH}'
transcripts_inference_folder = f'{INFERENCE_PATH}{RESULTS_RECORDINGS_INFERENCE_TRANSCRIBE_PATH}'
assessed_inference_folder =  f'{INFERENCE_PATH}{RESULTS_RECORDINGS_INFERENCE_ASSESSED_PATH}'

utilConfig.createTmpDir(dsp_inference_folders)
utilConfig.createTmpDir(transcripts_inference_folder)
utilConfig.createTmpDir(assessed_inference_folder)

# initilaise the azureml config class
azuremlConfig = AzureMLConfiguration(workspace=WORKSPACE_NAME, tenant_id=TENANT_ID, subscription_id=SUBSCRIPTION_ID, 
                                    resource_group=RESOURCE_GROUP, location=LOCATION)
# configure Azure ML services
#----------------------------
# configure Azure ML workspace
azuremlConfig.configWorkspace()

# configure the azure ML compute 
azuremlConfig.configCompute()

# configure the experiment(s)
azuremlConfig.configExperiment(experiment_name=EXPERIMENT_NAME)

# configure the environment - conda
azuremlConfig.configEnvironment(environment_name=ENVIRONMENT_NAME)

# confogure and register the datastore(s) with Azure ML piplines
inference_datastore = azuremlConfig.configDataStore(datastore=INFERENCE_DATASTORE_NAME, container_name=INFERENCE_CONTAINER_NAME)
processed_datastore = azuremlConfig.configDataStore(datastore=DSP_INFERENCE_DATASTORE_NAME, container_name=DSP_INFERENCE_CONATINER_NAME)
transcribed_datastore = azuremlConfig.configDataStore(datastore=TRANSCRIBED_INFERENCE_DATASTORE_NAME, container_name=TRANSCRIBED_INFERENCE_CONATINER_NAME)
assessed_datastore = azuremlConfig.configDataStore(datastore=ASSESSED_INFERENCE_DATASTORE_NAME, container_name=ASSESSED_INFERENCE_CONATINER_NAME)

#register the datasets associated with the datastore - recordings
inference_recordings_datasets = azuremlConfig.configDatasets(datastore=inference_datastore, file_path= INFERENCE_RECORDINGS_FOLDER, 
                                            dataset_name=INFERENCE_RECORDINGS_DATASET_NAME, description='inference recordings datasets')

# register the datasets associated with the datastore - key phrases
key_phrases_datasets = azuremlConfig.configDatasets(datastore=inference_datastore, file_path = KEY_PHRASES_FOLDER, 
                                            dataset_name=INFERENCE_KEY_PHRASES_DATASET_NAME, description='inference key phrases datasets')

# register the datasets associated with the datastore - key phrases
assessed_datasets = azuremlConfig.configDatasets(datastore=assessed_datastore, file_path = ASSESSED_FOLDER, 
                                            dataset_name=INFERENCE_ASSESSED_DATASET_NAME, description='inference assessed datasets')

# configre the datasets to be used for the pieline data
processed_ds = azuremlConfig.configPipelineOutputData(name='processed_data', destination=(processed_datastore, '/'))
transcribed_ds = azuremlConfig.configPipelineOutputData(name='transcribed_data', destination=(transcribed_datastore, '/'))
assessed_ds = azuremlConfig.configPipelineOutputData(name='nlp_data', destination=(assessed_datastore, '/'))

# Configure th Azure ML environment to run the expirement in

In [3]:
# setup the pipeline configuration parameters
azuremlConfig.configPipeline(docker_path=DOCKER_FILE_PATH, conda_packages=CONDA_PACKAGES, pip_packages=PIP_PACKAGES)

# configure the pipeline data paths
datapath_pipeline_param, datapath_input = azuremlConfig.configPipelineInputData(datastore=inference_datastore, path_on_datastore=INFERENCE_RECORDINGS_FOLDER)

# setup the pipeine parameters
# Parameter 1
pipeline_lang_param = azuremlConfig.configPipelineParameter(description='language used to translate the Speech to Text. (e.g. en-AU, en-GB, en-US)', 
                                                            default_value='en-US')
# Parameter 2
pipeline_validate_param = azuremlConfig.configPipelineParameter(description='If truth validation file exists, set to True.', 
                                                            default_value=False)

In [4]:
# setup the correct path to align the pipline entry
current_dir = os.getcwd()
os.chdir('../..')

# setup the script steps
filter_step = PythonScriptStep(name='Step 1 - Filter Audio Files',
                               script_name = FILTER_SCRIPT_FILENAME,
                               arguments = [ '--raw_input_datapath', datapath_input, '--processed_dir', processed_ds], 
                               inputs = [datapath_input],
                               outputs = [processed_ds],  
                               source_directory = PIPELINE_SOURCE_PATH,
                               compute_target = azuremlConfig.compute_target,
                               runconfig = azuremlConfig.aml_run_config,
                               allow_reuse = True)
 
transcribe_step = PythonScriptStep(name='Step 2 - Transcribe Filtered Audio Files',
                               script_name = TRANSCRIBE_SCRIPT_FILENAME,
                               arguments = ['--validate', pipeline_validate_param,'--language', pipeline_lang_param, '--processed_data', processed_ds.as_input('filtered_audio_files_ds').as_mount(), 
                                        '--key_phrases_path', key_phrases_datasets.as_mount(), '--transcribed_data', transcribed_ds], 
                               outputs = [transcribed_ds],  
                               source_directory = PIPELINE_SOURCE_PATH,
                               compute_target = azuremlConfig.compute_target,
                               runconfig = azuremlConfig.aml_run_config,
                               allow_reuse = True)

nlp_step = PythonScriptStep(name='Step 3 - NLP Anaysis',
                               script_name = NLP_SCRIPT_FILENAME,
                               arguments = ['--transcribed_data', transcribed_ds.as_input('transcribed_audio_files_ds').as_mount(), '--nlp_data', assessed_ds],  
                               outputs = [assessed_ds],  
                               source_directory = PIPELINE_SOURCE_PATH,
                               compute_target = azuremlConfig.compute_target,
                               runconfig = azuremlConfig.aml_run_config,
                               allow_reuse = True)

ML_step = PythonScriptStep(name='Step 4 - Comms Classification',
                               script_name = ML_SCRIPT_FILENAME,
                               arguments = ['--nlp_data', assessed_ds.as_input('nlp_ds').as_mount()],  
                               source_directory = PIPELINE_SOURCE_PATH,
                               compute_target = azuremlConfig.compute_target,
                               runconfig = azuremlConfig.aml_run_config,
                               allow_reuse = True)

# Build the pipeline and run the experiment


In [5]:
# list of steps to run
pipeline_steps = [filter_step, transcribe_step, nlp_step, ML_step]

# build the pipeline
pipeline = azuremlConfig.buildPipeline(azuremlConfig.ws, script_steps = pipeline_steps)

# run the pipeline
#run =  azuremlConfig.submitPipeline(azuremlConfig.experiment, pipeline, regenerate_outputs=True)

run = azuremlConfig.experiment.submit(pipeline, regenerate_outputs=True)

# revert to current directory
os.chdir(current_dir)


Created step Step 1 - Filter Audio Files [bffee21f][db58e25a-186d-4431-b799-9600327f8dfa], (This step will run and generate new outputs)
Created step Step 2 - Transcribe Filtered Audio Files [d5befb72][5cd3fe3e-cae0-4721-be38-e64acb6b85c1], (This step will run and generate new outputs)Created step Step 3 - NLP Anaysis [d0a9fcdb][d1109603-dcd3-4fc1-af51-61421704f1bc], (This step will run and generate new outputs)

Created step Step 4 - Comms Classification [dea6e69c][653feb66-1cbc-4ba1-aa4c-fb05d2630ab6], (This step will run and generate new outputs)
Created data reference inference_datastore_16635bfa for StepId [86525019][889f1187-009b-475e-993e-138004c32738], (Consumers of this data will generate new runs.)
Submitted PipelineRun a5eb09cc-e37f-483d-a514-364103a88a81
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/a5eb09cc-e37f-483d-a514-364103a88a81?wsid=/subscriptions/26256615-d27f-4067-92b9-53a9d6126c57/resourcegroups/it-aue1-pov-arg-poscomms/workspaces/pos-comms-poc

# Publish Pipeline
Publishing a pipeline allows the pipeline to be run by other uses with different inputs.

In [6]:
from azureml.pipeline.core import Pipeline

published_pipeline = pipeline.publish(
     name='Positive Comms Classification',
     description="Takes raw audio files as input. Then filters, transcribes, applies NLP (Natural Language Processing) and comms analysis to create a dataframe output.",
     version="1.0",
     continue_on_step_failure=False)

# Create a versioned pipeline endpoint

You can create a Pipeline Endpoint with multiple published pipelines behind it. This technique gives you a fixed REST endpoint as you iterate on and update your ML pipelines.

In [None]:
from azureml.pipeline.core import PipelineEndpoint
from azureml.pipeline.core import PublishedPipeline

published = pipeline.publish(name='Positive Comms Classification')
pipeline_endpoint = PipelineEndpoint.get(workspace=azuremlConfig.ws, name='Positive Comms Classification')
pipeline_endpoint.add_default(published)

# Trigger a pipeline
Time-based schedules can be used to take care of routine tasks, such as monitoring for data drift. Change-based schedules can be used to react to irregular or unpredictable changes, such as new data being uploaded or old data being edited.

In [None]:
import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment

ws = Workspace.from_config()

experiments = Experiment.list(ws)
for experiment in experiments:
    print(experiment.name)

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")

experiment_name = "testpipeline1" 
pipeline_id = "74d1d177-2d58-4ab2-9331-4b6b1885223d" 

# Create a schedule
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
# Change-Based Schedule
datastore = Datastore(workspace=ws, name="dev_raw_datastore")
reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                            pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, path_on_datastore='/radio-check') #data_path_parameter_name="input_data")

# To disable the schedule
reactive_schedule.update(status='Disabled')

# To list all schedules for a pipeline
f = Schedule.list(ws,pipeline_id)


In [11]:
dir = '/home/azureuser/cloudfiles/code/Users/nejhdeh.ghevondian/bhp-positive-comms-poc/positive-comms-experimentation/src/engine'
os.chdir(dir)



# Submit a job to a pipeline endpoint

You can submit a job to the default version of a pipeline endpoint:

In [None]:
# Default version
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("testpipeline1")
print(run_id)

# or to a specific version
#run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
#print(run_id)