In [1]:
from azureml.core import Workspace

#

src_folder = 'steps'
cluster_name = 'cpu-cluster'
env_name = 'data-drift-env'

DATASTORE_NAME = 'workspaceblobstore'
FILE_DATASET_NAME = 'datadrift_file_results'
json_file_path = f'datadrift/metrics/**/output_*.json'


ws = Workspace.from_config()
print('Ready to work with', ws.name)

Ready to work with evolve-ml


 # what is the purpose of this script:
 pipeline which is schudles daily(?) 

 collect data drift detector output, transform
 
 result: dataset that can be used

```json

[{"runid": "mslearn-diabates-drift-scheedule-Monitor-Runs_1649806360074", "drift_threshold": 0.3, "pipeline_starttime": "2022-04-12T23:34:36.163569Z", "run_type": "Scheduler", "datadrift_id": "0eb10f47-1629-4014-9db8-680eb438328c", "datadrift_name": "mslearn-diabates-drift-scheedule", "datadrift_configuration_type": "DatasetBased", "start_date": "2022-04-11", "end_date": "2022-04-12", "baseline_dataset_id": "8ab2e553-c4d6-4ee5-ac83-574ef324496b", "target_dataset_id": "ce614e9a-70d5-4ab0-949a-18cf1ee9b991", "frequency": "Day", "from_dataset": "both", "column_name": "Pregnancies", "metric_category": "statistical_distance", "metric_type": "column", "column_type": "numerical", "name": "wasserstein_distance", "value": 5.950910447009183}... ]

```


In [15]:
%%writefile $src_folder/utils.py

from contextlib import contextmanager
import os
import shutil


@contextmanager
def temp_directory(dir_name = 'temp', **kwds):
    
    os.makedirs(dir_name, exist_ok=True)
    
    try:
        yield dir_name

    except Exception:
        print(f"Unable to create '{dir_name}'")

    finally:
        shutil.rmtree(dir_name)

Writing steps/utils.py


In [8]:
%%writefile $src_folder/transform-data-drift-output.py

import argparse
import json
import bigjson
import os
import utils
from azureml.core import Workspace, Dataset, Datastore, Run
from azureml.core.run import _OfflineRun


TEMP_DIRECTORY = 'temp'

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
    parser.add_argument('--transformed-data', type=str, dest='transformed_data', default='transformed_data', help='Folder for results')
    return parser.parse_args()

args = parse_args()
print(f'Arguments: {args.__dict__}')
save_folder = args.transformed_data
os.makedirs(save_folder, exist_ok=True)

# Get the experiment run context
run = Run.get_context()

# access the ws either wehn running offline or remote run 
ws = Workspace.from_config() if type(run) == _OfflineRun else run.experiment.workspace
dstore = Datastore.get_default(ws)

file_dataset = run.input_datasets['raw_data']

print(file_dataset)

with utils.temp_directory(TEMP_DIRECTORY):
    # Download json files defined by the dataset to temp directory
    json_file_paths = file_dataset.download(f'{TEMP_DIRECTORY}', overwrite=True)

    # Convert json files to jsonl files (in local directory) 
    for json_path in json_file_paths:
        
        # Read json file in streaming mode
        with open(json_path, 'rb') as f:
            json_data = bigjson.load(f)
            # Replace file name extension
            jsonl_path = os.path.splitext(json_path)[0]+'.jsonl'

            # Open jsonl file  
            with open(jsonl_path, 'w') as jsonl_file:
                # Iterates over input json
                for data in json_data:
                    # Converts json to a Python dict  
                    dict_data = data.to_python()
                    
                    # Saves the data to jsonl file
                    jsonl_file.write(json.dumps(dict_data)+"\n")
                    
        # Delete json file
        os.remove(json_path)

    # Upload jsonl files to datastore
    print("Saving Transformed Data...")
    print(os.listdir(f'{TEMP_DIRECTORY}'))
    output_dataset = Dataset.File.upload_directory(f'{TEMP_DIRECTORY}', target=save_folder)
    print(os.listdir(f'{save_folder}'))

print(os.listdir(f'{save_folder}'))

#TODO: 
## move to util


Overwriting steps/transform-data-drift-output.py


In [7]:
%%writefile $src_folder/save-data-drift-output.py

import argparse
from azureml.core import Dataset, Datastore, Run
from azureml.core.run import _OfflineRun
from azureml.data.dataset_factory import DataType

PARTITION_FORMAT = '{DATADRIFT_ID}/{PARTITION_DATE:yyyy/MM/dd}/output_{RUN_ID}.json'


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--transformed-data", type=str, dest='transformed_data', help='transformed data')
    return parser.parse_args()

args = parse_args()
print(f'Arguments: {args.__dict__}')
transformed_data = args.transformed_data
print(transformed_data)


# Crate TabularDataSet based on converted jsonl files
output_dataset = Dataset.Tabular.from_json_lines_files(path=transformed_data, partition_format=PARTITION_FORMAT)
output_dataset = output_dataset.register_on_complete(name='datadrift_results_pipeline', description = 'datadrift results pipeline')


Overwriting steps/save-data-drift-output.py


In [2]:
if False:
    from azureml.core.environment import Environment
    from azureml.core.conda_dependencies import CondaDependencies

    myenv = Environment(name=env_name)
    conda_dep = CondaDependencies()

    conda_dep.add_pip_package('bigjson')
    conda_dep.add_pip_package('azureml-defaults')

    # Adds dependencies to PythonSection of myenv
    myenv.python.conda_dependencies=conda_dep

    myenv.register(ws).build(ws)

In [3]:
from azureml.core import RunConfiguration, ComputeTarget, Environment

run_config = RunConfiguration()
run_config.environment = Environment.get(ws, env_name)
compute_target = ComputeTarget(ws, cluster_name)


In [4]:
from azureml.pipeline.core import PipelineData, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.core import ComputeTarget, Datastore, Dataset
from azureml.data import OutputFileDatasetConfig

#datadir_param = PipelineData('datadir', is_directory=True)
#evolve_param = PipelineData('evolve')
#azure_param = PipelineData('azure')
#compare_param = PipelineData('compare')

#collection_param = PipelineParameter(name="collection", default_value='test_datasets')
#repo_param = PipelineParameter(name="repo", default_value='test')


collection_param = PipelineParameter(name="collection", default_value='test_datasets')
repo_param = PipelineParameter(name="repo", default_value='test')


# Get data-drift output dataset
dstore = Datastore.get(ws, DATASTORE_NAME)
metrics_ds = Dataset.File.from_files(path=(dstore,json_file_path)) # add filter dataset

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
transformed_data = OutputFileDatasetConfig("transformed_data")

# step 0, collate the data drift outputs of dstore 
## use the path-pattern to load files 
## filter(-1 month)

# Step 1, data transofrm (from json to jsonl), save to dstore as .jsonl files
## future todo: Parallel-ize the json files transformation  
transform_step = PythonScriptStep(
    name='transform data drift output',
    source_directory=src_folder,
    script_name='transform-data-drift-output.py',
    arguments = ['--input-data', metrics_ds.as_named_input('raw_data'),
                '--transformed-data', transformed_data],                
    compute_target=compute_target, 
    runconfig=run_config, 
    allow_reuse=False,    
)

# Step 2, collate the .jsonl file, partition accordingly (time-series), register new version per Output Dataset (Tabular) 
## future - iron out the duplicated / contradictory data points 
save_step = PythonScriptStep(
    name='save data drift output',
    source_directory=src_folder,
    script_name='save-data-drift-output.py',
    arguments = ['--transformed-data', transformed_data.as_input()],
    compute_target=compute_target, 
    runconfig=run_config,    
    allow_reuse=False,
)

print("Pipeline steps defined")

Pipeline steps defined


In [9]:
from azureml.core import Experiment
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [transform_step, save_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'data-drift-output-exeriment')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True) 
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)


Pipeline is built.
Created step transform data drift output [35f82fb6][ce0e88fe-c9da-4a89-a050-ff4599488974], (This step will run and generate new outputs)
Created step save data drift output [751e847b][e69bc83a-b74c-42d3-84b4-4c84dfe0c8d0], (This step will run and generate new outputs)
Submitted PipelineRun d979f672-c1bd-4f3f-ac0c-8a180e6c831d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/d979f672-c1bd-4f3f-ac0c-8a180e6c831d?wsid=/subscriptions/06c3d5bb-46a2-4d92-9508-c018d06f6452/resourcegroups/evolve-team-rg/workspaces/evolve-ml&tid=72a43063-967e-43c8-8121-0823266b2701
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: d979f672-c1bd-4f3f-ac0c-8a180e6c831d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/d979f672-c1bd-4f3f-ac0c-8a180e6c831d?wsid=/subscriptions/06c3d5bb-46a2-4d92-9508-c018d06f6452/resourcegroups/evolve-team-rg/workspaces/evolve-ml&tid=72a43063-967e-43c8-8121-0823266b2701
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 708ccaca-90d9-4a93-bb48-eadcab1cc401
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/708ccaca-90d9-4a93-bb48-eadcab1cc401?wsid=/subscriptions/06c3d5bb-46a2-4d92-9508-c018d06f6452/resourcegroups/evolve-team-rg/workspaces/evolve-ml&tid=72a43063-967e-43c8-8121-0823266b2701
StepRun( transform data drift output ) Status: Running

StepRun(transform data drift output) Execution Summary
StepRun( transform data drift output ) Status: Finished
{'runId': '708ccaca-90d9-4a93-bb48-eadcab1cc401', 'target': 'cpu-cluster', 'status': 'Completed', 'startTimeUtc': '2022-04-17T06:33:57.756798Z', 'endTimeUtc': '2022-04-17

ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "{'code': data-capability.DatasetMountSession:input_7c0bc4da.ExecutionError, 'message': \nError Code: ScriptExecution.StreamAccess.NotFound\n, 'target': , 'category': UserError, 'error_details': [{'key': NonCompliantReason, 'value': \nError Code: ScriptExecution.StreamAccess.NotFound\nFailed Step: 6e7dc11a-b948-4ac1-b654-2fbdf1a594e6\nError Message: ScriptExecutionException was caused by StreamAccessException.\n  StreamAccessException was caused by NotFoundException.\n    Found no resources for the input provided: 'https://evolveml3040211544.blob.core.windows.net/azureml-blobstore-899eb723-9471-475e-9ea2-abcad8a41de5/dataset/708ccaca-90d9-4a93-bb48-eadcab1cc401/transformed_data/'\n| session_id=1d79f0f3-0532-417f-95b5-11e7caadde3f}, {'key': StackTrace, 'value':   File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/data_capability/capability_session.py\", line 47, in start\n    (data_path, sub_data_path) = session.start()\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/data_capability/data_sessions.py\", line 182, in start\n    if self._is_single_file:\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/data_capability/data_sessions.py\", line 130, in _is_single_file\n    path = dataflow._to_pyrecords()[0][temp_column]\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/dataflow.py\", line 758, in _to_pyrecords\n    intermediate_files = _write_preppy_with_fallback('Dataflow.to_pyrecords', self, span_context=to_dprep_span_context(span.get_context()))\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_dataframereader.py\", line 192, in _write_preppy_with_fallback\n    _execute_with_fallback(activity, dataflow_to_execute, force_clex=force_clex, span_context=span_context)\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_dataframereader.py\", line 240, in _execute_with_fallback\n    clex_execute()\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_dataframereader.py\", line 221, in clex_execute\n    span_context=span_context\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_aml_helper.py\", line 38, in wrapper\n    return send_message_func(op_code, message, cancellation_token)\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/engineapi/api.py\", line 159, in execute_anonymous_activity\n    response = self._message_channel.send_message('Engine.ExecuteActivity', message_args, cancellation_token)\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/engineapi/engine.py\", line 291, in send_message\n    raise_engine_error(response['error'])\n\n  File \"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/errorhandlers.py\", line 10, in raise_engine_error\n    raise ExecutionError(error_response)\n}",
        "messageParameters": {},
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"{'code': data-capability.DatasetMountSession:input_7c0bc4da.ExecutionError, 'message': \\nError Code: ScriptExecution.StreamAccess.NotFound\\n, 'target': , 'category': UserError, 'error_details': [{'key': NonCompliantReason, 'value': \\nError Code: ScriptExecution.StreamAccess.NotFound\\nFailed Step: 6e7dc11a-b948-4ac1-b654-2fbdf1a594e6\\nError Message: ScriptExecutionException was caused by StreamAccessException.\\n  StreamAccessException was caused by NotFoundException.\\n    Found no resources for the input provided: 'https://evolveml3040211544.blob.core.windows.net/azureml-blobstore-899eb723-9471-475e-9ea2-abcad8a41de5/dataset/708ccaca-90d9-4a93-bb48-eadcab1cc401/transformed_data/'\\n| session_id=1d79f0f3-0532-417f-95b5-11e7caadde3f}, {'key': StackTrace, 'value':   File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/data_capability/capability_session.py\\\", line 47, in start\\n    (data_path, sub_data_path) = session.start()\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/data_capability/data_sessions.py\\\", line 182, in start\\n    if self._is_single_file:\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/data_capability/data_sessions.py\\\", line 130, in _is_single_file\\n    path = dataflow._to_pyrecords()[0][temp_column]\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/dataflow.py\\\", line 758, in _to_pyrecords\\n    intermediate_files = _write_preppy_with_fallback('Dataflow.to_pyrecords', self, span_context=to_dprep_span_context(span.get_context()))\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_dataframereader.py\\\", line 192, in _write_preppy_with_fallback\\n    _execute_with_fallback(activity, dataflow_to_execute, force_clex=force_clex, span_context=span_context)\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_dataframereader.py\\\", line 240, in _execute_with_fallback\\n    clex_execute()\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_dataframereader.py\\\", line 221, in clex_execute\\n    span_context=span_context\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/_aml_helper.py\\\", line 38, in wrapper\\n    return send_message_func(op_code, message, cancellation_token)\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/engineapi/api.py\\\", line 159, in execute_anonymous_activity\\n    response = self._message_channel.send_message('Engine.ExecuteActivity', message_args, cancellation_token)\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/engineapi/engine.py\\\", line 291, in send_message\\n    raise_engine_error(response['error'])\\n\\n  File \\\"/opt/miniconda/envs/data-capability/lib/python3.7/site-packages/azureml/dataprep/api/errorhandlers.py\\\", line 10, in raise_engine_error\\n    raise ExecutionError(error_response)\\n}\",\n        \"messageParameters\": {},\n        \"details\": []\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\"\n}"
    }
}

In [None]:
from azureml.pipeline.core import PipelineRun
from azureml.core import Experiment

# Publish the pipeline from the run
submitted_pipeline_run = PipelineRun(experiment=Experiment(experiment, run_id=pipeline_run.id))
published_pipeline = submitted_pipeline_run.publish_pipeline(name='data-drift-output-pipeline',
    description='collect, transform and save datadrift output into dataset',
    version='1.0',
    continue_on_step_failure=False)

print('Pipeline scheduled.')

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Schedules a daily run of a published pipeline
daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(ws, name='data_drift_output_schedule',
                                        description='update data drift output every day',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='schedule_data_drift_output_pipeline',
                                        recurrence=daily)

print('Pipeline scheduled.')