# Data Cleaning

## in parallel with ParallelRunStep (PRS)

### Step 1: Create your local Conda environment

In [1]:
%%writefile ./hydroqc-env.yml
name: hydroqc-env
channels:
    - conda-forge
    - defaults
dependencies:
    - python=3.8
    - numpy
    - pandas
    - holidays
    - ipykernel
    - matplotlib
    - jupyterlab
    - scikit-learn
    - pip
    - pip:
        - tensorflow
        - azureml-sdk
        - azureml-widgets

Overwriting ./hydroqc-env.yml


Activate your Conda environment and register the kernel for VSCode:

`conda env create -f hydroqc-env.yml`

`!conda activate hydroqc-env
python -m ipykernel install --user --name hydroqc-env --display-name "hydroqc-env"`

## Step 2: Connect to the AzureML Workspace

In [2]:
!pip install azureml




In [3]:
from azureml.core import Workspace

# Set up workspace
ws = Workspace.from_config()

# Connect to the default datastore
default_dstore = ws.get_default_datastore()

# Get the default key vault
default_kv = ws.get_default_keyvault()

We register the **hqasa** storage account as a datastore.

In [4]:
ws

Workspace.create(name='mlops-AML-WS', subscription_id='d71e4214-ad22-4df0-8289-acbc0d88408d', resource_group='mlops-RG')

In [5]:
# NOTE: hqasa contains the Hydro-Quebec synthetic data
import os
from azureml.core import Keyvault
from azureml.core import Datastore
from azureml.exceptions import UserErrorException

# Define the ASA parameters
blob_datastore_name = 'hydroqc'
account_name = 'hqasa'
container_name = 'hq-data'
SAS_TOKEN = 'sv=2020-08-04&ss=b&srt=sco&sp=rwl&se=2021-12-31T17:00:00Z&st=2021-09-06T16:00:00Z&spr=https&sig=G0xeEAR%2Bfxms7UUmFAlatPddJMNmmbMbmPXvgT2WxGw%3D'
# Set the Azure Storage key as a secret in Azure Key Vault
#default_kv.set_secret(name='ASA-KEY', value=os.environ.get('ASA_KEY'))   
#account_key = default_kv.get_secret(name='ASA-KEY')

try:
    blob_datastore = Datastore.get(workspace=ws, datastore_name=blob_datastore_name)
    print(f"Found Blob Datastore with name: {blob_datastore_name}")
except UserErrorException:
    blob_datastore = Datastore.register_azure_blob_container(
        workspace=ws,
        datastore_name=blob_datastore_name,
        account_name=account_name,
        container_name=container_name,
        #account_key=account_key)
        sas_token = SAS_TOKEN)
    print(f"Registered blob datastore with name: {blob_datastore_name}")

Found Blob Datastore with name: hydroqc


We create and register a file dataset from the Hydro-Quebec raw data.

In [6]:
from azureml.core.dataset import Dataset
from azureml.exceptions import UserErrorException

try:
    test_fds = Dataset.get_by_name(workspace=ws, name='test_fds')
except UserErrorException as user_ex:
    print('Create a FileDataset and register in the Workspace')
    test_fds = Dataset.File.from_files(path=(blob_datastore, '/data/training1/'), validate=True)
    test_fds.register(workspace=ws, name='test_fds', description='Sample synthetic raw data from Hydro-Quebec')

We create our environment for the cleaning step in our pipeline.

In [7]:
%%writefile ../pipelines/clean/src/clean-env.yml
name: clean-env
channels:
    - conda-forge
    - defaults
dependencies:
    - python=3.8
    - numpy
    - pandas
    - holidays
    - scikit-learn
    - pip
    - pip:
        - azureml-sdk

Overwriting ../pipelines/clean/src/clean-env.yml


Create an Environment object from the Conda YAML file.

In [8]:
from azureml.core import Environment

clean_env = Environment(name="clean-env").from_conda_specification(
                                                    name='clean-env',
                                                    file_path='../pipelines/clean/src/clean-env.yml')

We select our compute target for the cleaning step in our pipeline.

In [9]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
# NOTE: Need to be 16 characters or less
cluster_name = "prs-cluster"

# Verify that cluster does not exist already
try:
    prs_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing "{}" cluster. Use it.'.format(cluster_name))
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(
                            vm_size='STANDARD_D15_V2',
                            min_nodes=0,
                            max_nodes=5,
                            vm_priority='dedicated',
                            idle_seconds_before_scaledown=2400,
                            admin_username=None,
                            admin_user_password=None,
                            admin_user_ssh_key=None,
                            vnet_resourcegroup_name=None,
                            vnet_name=None,
                            subnet_name=None,
                            description='PRS Cluster',
                            remote_login_port_public_access='NotSpecified',
                            identity_type=None,
                            identity_id=None)

    prs_cluster = ComputeTarget.create(
                              workspace=ws,
                              name=cluster_name,
                              provisioning_configuration=compute_config)

    prs_cluster.wait_for_completion(show_output=True)

# # NOTE: To delete the compute cluster
# ws.compute_targets['prs-cluster'].delete()

Found existing "prs-cluster" cluster. Use it.


Configure the PRS step with the ParallelRunConfig object.

In [10]:
from azureml.pipeline.steps import ParallelRunConfig

# NOTE: Standard_D15_v2: 20 cores, 140 GB RAM, 1000 GB disk
# NOTE: Python use 1 core per process
processes_per_node = 20

# Maximum nodes available in the prs-cluster compute target
node_count = 4

# Run timeout
# NOTE: After 180 seconds, run will be cancelled
timeout = 180

parallel_run_config = ParallelRunConfig(
    source_directory='../pipelines/clean/src',
    entry_script='clean-aml.py',
    mini_batch_size='1',
    run_invocation_timeout=timeout,
    error_threshold=1,
    run_max_try=1,
    output_action="append_row",
    environment=clean_env,
    process_count_per_node=processes_per_node,
    compute_target=ws.compute_targets['prs-cluster'],
    node_count=node_count,
    logging_level='INFO',
    # Specify the filename for the PRS output
    append_row_file_name='prs_clean.txt')

Define the parallel run step.

In [11]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import ParallelRunStep

# Define the number of files to clean
n_files = 200

# Define the input for the PRS cleaning step
input_dataset = test_fds.take(count=n_files).as_named_input(name=f'test_{n_files}')

# Define the output for the PRS cleaning step
prs_output_dir = OutputFileDatasetConfig(name='clean_prs_output', 
                                     # Write output to default datastore
                                     destination=(default_dstore, '/data/hydroqc/prs/clean/output/'), 
                                     source=None)                                  

# Define the PRS step
parallel_run_step = ParallelRunStep(
    name="clean_prs",
    parallel_run_config=parallel_run_config,
    inputs=[input_dataset.as_mount()],
    output=prs_output_dir,
    allow_reuse=False,
    arguments=None)                                    

In [12]:
# Create the experiment & pipeline
from azureml.core import Experiment
from azureml.widgets import RunDetails
from azureml.pipeline.core import Pipeline

# Create experiment
experiment = Experiment(workspace=ws, name=f'prs-cpu-clean-{n_files}')

# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[parallel_run_step])

# Launch the experiment
# NOTE: Return azureml.pipeline.core.run.PipelineRun
run = experiment.submit(pipeline, tags={'Files': str(n_files)})

# See the interactive logs
RunDetails(run_instance=run).show()

# # Stream the logs
run.wait_for_completion(show_output=True, timeout_seconds=2400)

Created step clean_prs [568124d6][dfa66f5f-792b-42f4-b169-8bd0954138a6], (This step will run and generate new outputs)
Submitted PipelineRun cff7eb80-55d1-454f-acfd-669ff1571c73
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/cff7eb80-55d1-454f-acfd-669ff1571c73?wsid=/subscriptions/d71e4214-ad22-4df0-8289-acbc0d88408d/resourcegroups/mlops-RG/workspaces/mlops-AML-WS&tid=72f988bf-86f1-41af-91ab-2d7cd011db47


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: cff7eb80-55d1-454f-acfd-669ff1571c73
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/cff7eb80-55d1-454f-acfd-669ff1571c73?wsid=/subscriptions/d71e4214-ad22-4df0-8289-acbc0d88408d/resourcegroups/mlops-RG/workspaces/mlops-AML-WS&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
PipelineRun Status: Running


StepRunId: 13a723d1-6816-42e3-87d8-888442572513
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/13a723d1-6816-42e3-87d8-888442572513?wsid=/subscriptions/d71e4214-ad22-4df0-8289-acbc0d88408d/resourcegroups/mlops-RG/workspaces/mlops-AML-WS&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
StepRun( clean_prs ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_4125d27826e7714d018aa2c4cd30f825705411b5cf36f7cfcf075c87bf9aedf2_d.txt
2021-11-12T00:07:16Z Successfully mounted a/an Blobfuse File System at /mnt/batch/tasks/shared/LS_root/jobs/mlops-aml-ws/azureml/13a723d1-6816-42e3-87d8-888442572513/mounts/workspaceblobstore
2021-11-12T00:07:16Z Th

'Finished'

We download the PRS output data for further analysis.

In [13]:
run.find_step_run(name='clean_prs')[0]

Experiment,Id,Type,Status,Details Page,Docs Page
prs-cpu-clean-200,13a723d1-6816-42e3-87d8-888442572513,azureml.StepRun,Completed,Link to Azure Machine Learning studio,Link to Documentation


In [14]:
os.makedirs('../pipelines/clean/outputs', exist_ok=True)
run.find_step_run(name='clean_prs')[0].get_output_data(name='clean_prs_output').download(local_path='../pipelines/clean/outputs', overwrite=True, show_progress=True)

Downloading data/hydroqc/prs/clean/output/prs_clean.txt
Downloaded data/hydroqc/prs/clean/output/prs_clean.txt, 1 files out of an estimated total of 1


1

In [15]:
import pandas as pd 

clean_prs_output_df = pd.read_csv('../pipelines/clean/outputs/data/hydroqc/prs/clean/output/prs_clean.txt', 
                                sep=' ',
                                header=None,
                                names=['Filename', 'Filesize_Bytes', 'StartDate', 'EndDate', 'Duration', 'RunID', 'Status'],
                                parse_dates=['StartDate', 'EndDate']
                                )
clean_prs_output_df.head()

Unnamed: 0,Filename,Filesize_Bytes,StartDate,EndDate,Duration,RunID,Status
0,consommationfichier_conso_LAX10.csv,2084847,2021-11-12 00:08:42.130595,2021-11-12 00:08:44.607893,0:00:02.477298,49c351f8-6d28-4d25-bd07-16b118e00827,Completed
1,consommationfichier_conso_LAX100.csv,2101804,2021-11-12 00:08:42.092063,2021-11-12 00:08:44.444474,0:00:02.352411,f29501df-cdf3-467a-b7ec-382fc3f2e89d,Completed
2,consommationfichier_conso_LAX1000.csv,2118761,2021-11-12 00:08:42.129247,2021-11-12 00:08:44.267348,0:00:02.138101,acb04bd0-7d68-411a-80ad-c4c1ac26c328,Completed
3,consommationfichier_conso_LAX10000.csv,2135718,2021-11-12 00:08:42.178584,2021-11-12 00:08:44.433847,0:00:02.255263,76bfc2cd-acb4-4195-a956-730303aab686,Completed
4,consommationfichier_conso_LAX100000.csv,2152675,2021-11-12 00:08:42.154818,2021-11-12 00:08:44.357718,0:00:02.202900,cc8d5af9-fe59-4a24-b500-4167f64e574b,Completed


In [16]:
clean_prs_output_df.tail()

Unnamed: 0,Filename,Filesize_Bytes,StartDate,EndDate,Duration,RunID,Status
195,consommationfichier_conso_LAX100173.csv,2152675,2021-11-12 00:08:47.472718,2021-11-12 00:08:50.723031,0:00:03.250313,d4e24754-d812-455d-9a69-b8c4325e98aa,Completed
196,consommationfichier_conso_LAX100174.csv,2152675,2021-11-12 00:08:48.857229,2021-11-12 00:08:51.144159,0:00:02.286930,4fcc31d2-ac79-47a6-bbb4-8aec099ee8a1,Completed
197,consommationfichier_conso_LAX100175.csv,2152675,2021-11-12 00:08:47.557488,2021-11-12 00:08:49.768622,0:00:02.211134,130185ae-6e5d-4d05-9a5f-e7efcb408cd7,Completed
198,consommationfichier_conso_LAX100176.csv,2152675,2021-11-12 00:08:48.637818,2021-11-12 00:08:50.588514,0:00:01.950696,8d095a52-9ff5-48a7-8eff-ce7cbe4927c1,Completed
199,consommationfichier_conso_LAX100177.csv,2152675,2021-11-12 00:08:48.635066,2021-11-12 00:08:50.672175,0:00:02.037109,2253a41b-caf7-4822-8822-97ed21f3c93f,Completed


Register the resulting clean data as a file dataset in the workspace for future use.

In [17]:
try:
    clean_fds = Dataset.get_by_name(workspace=ws, name='clean_fds')
except UserErrorException as user_ex:
    print('Create a FileDataset and register in the Workspace')
    clean_fds = Dataset.File.from_files(path=(default_dstore, '/data/hydroqc/clean'), validate=True)
    clean_fds.register(workspace=ws, name='clean_fds', description='Sample synthetic cleaned data from Hydro-Quebec')

Create a FileDataset and register in the Workspace
