# Create Pipeline
## Connect to your workspace

In [1]:
import azureml.core
from azureml.core import Workspace

# Manualy define workspace config
# ws = Workspace.get(
#            name="webparsing",
#            subscription_id= "1e70b6a9-7079-4703-a282-0c52141550cc",
#            resource_group= "webparsing"
# )

# Load the workspace from the saved config file
ws = Workspace.from_config()
kv = ws.get_default_keyvault()

In [2]:
from azureml.core import Workspace
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Workspace name: webparsing_centralus
Azure region: centralus
Subscription id: 439243ce-aaee-42b7-82d8-c88d5a342df3
Resource group: webparsing


## Prepare a compute environment for the pipeline steps

In [10]:
%ls

[0m[34;42mOlxParser_pipeline[0m/        [01;32mtest_pipeline.ipynb.amltmp[0m*
[34;42mOlxParser_pipeline_r05[0m/    [01;32mtest_pipeline_r1.0.ipynb.amltmp[0m*
[01;32mTest_pipeline.ipynb[0m*       [01;32muntitled.ipynb.amltmp[0m*
[01;32mTest_pipeline_r1.0.ipynb[0m*


In [9]:
%cd code/Users/vadym.pakholchuk.sa60vadym.pakholchuk.sa60

/mnt/batch/tasks/shared/LS_root/mounts/clusters/olxcompute/code/Users/vadym.pakholchuk.sa60


In [11]:
import os

# Create a folder for the pipeline step files
experiment_folder = 'OlxParser_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

OlxParser_pipeline


In [12]:
# get a secret from the keyvault
secret_name= kv.get_secret(name="BlobName")
secret_key = kv.get_secret(name="BlobKey")

In [13]:
# This block is needed to define Compute Target or create it, if we haven't it
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "OLXCompute-v2"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
    print(f'Cluster name: {cluster_name}')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.
Cluster name: OLXCompute-v2


In [14]:
from azureml.core import Environment
pipeline_env = Environment("OLX_Parsing")
pipeline_env.environment_variables['BLOB_ACCOUNTNAME'] = secret_name
pipeline_env.environment_variables['BLOB_ACCOUNT_KEY'] = secret_key
pipeline_env.environment_variables['BLOB_CONTAINER'] = 'olx'

In [8]:
# Set the container registry information.
# pipeline_env.docker.base_image_registry.address = "webparsingcontainer.azurecr.io"
# pipeline_env.docker.base_image_registry.username = "WebParsing"
# pipeline_env.docker.base_image_registry.password = "OvrOqNkqD5V8/EmeJcgBLbhMpYXvIX5S"

In [15]:
# When you're using your custom Docker image, you might already have your Python environment properly set up. 
# In that case, set the user_managed_dependencies flag to True to use your custom image's built-in Python environment.


# pipeline_env.docker.base_image = "webparsingcontainer.azurecr.io/web_parsing:v2"
pipeline_env.docker.base_image = "strateg17/web_parsing:v2"
pipeline_env.python.user_managed_dependencies = True

In [16]:
from azureml.core.runconfig import RunConfiguration

# Register the environment 
pipeline_env.register(workspace=ws)

registered_env = Environment.get(ws, 'OLX_Parsing')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## Create and manage datastores for inputs and outputs

In [None]:
# from azureml.core.datastore import Datastore

# pages_datastore = Datastore.get(
#     workspace=ws, 
#     datastore_name="olx_pages_datastore")

## Create and run a pipeline

In [17]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

pages_param_1 = PipelineData('pages',  is_directory=True)
images_param_1 = PipelineData('images', is_directory=True)

pages_param_2 = PipelineData('pages',  is_directory=True)
images_param_2 = PipelineData('images', is_directory=True)

# Find and save all first 25 pages step
write_output_step_1 = PythonScriptStep(
    name='The Page Saver for 25 pages',
    script_name='Step_1_OlxParser_azure.py',
    arguments = ["--pages-dir", pages_param_1],
    outputs=[pages_param_1],
    compute_target=pipeline_cluster,
    source_directory='OlxParser_pipeline',
    runconfig = pipeline_run_config,
    allow_reuse=False
)

# Parsing all available pages from the search result
read_output_step_1 = PythonScriptStep(
     name='The Page Parser of 25 pages',
    script_name='Step_2_OlxParser_azure.py',
    arguments = ["--pages-dir", pages_param_1, "--images-dir", images_param_1],
    inputs = [pages_param_1],
    outputs=[images_param_1],
    compute_target=pipeline_cluster,
    source_directory='OlxParser_pipeline',
    runconfig = pipeline_run_config,
    allow_reuse=False
)

# Validation of the status of all not processed today pages
status_validation_step = PythonScriptStep(
     name='The offer validation',
    script_name='Step_3_OlxParser_azure.py',
    compute_target=pipeline_cluster,
    source_directory='OlxParser_pipeline',
    runconfig = pipeline_run_config,
    allow_reuse=False
)

# Saving all not processed today pages
write_output_step_2 = PythonScriptStep(
    name='Active pages additional saving',
    script_name='Step_4_OlxParser_azure.py',
    arguments = ["--pages-dir", pages_param_2],
    outputs=[pages_param_2],
    compute_target=pipeline_cluster,
    source_directory='OlxParser_pipeline',
    runconfig = pipeline_run_config,
    allow_reuse=False
)

# Parsing all not processed today pages
read_output_step_2 = PythonScriptStep(
     name='Active pages additional parsing',
    script_name='Step_5_OlxParser_azure.py',
    arguments = ["--pages-dir", pages_param_2, "--images-dir", images_param_2],
    inputs = [pages_param_2],
    outputs=[images_param_2],
    compute_target=pipeline_cluster,
    source_directory='OlxParser_pipeline',
    runconfig = pipeline_run_config,
    allow_reuse=False
)

In [18]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline, StepSequence

# Construct the pipeline
pipeline_steps = StepSequence(steps=[write_output_step_1, read_output_step_1, status_validation_step, write_output_step_2, read_output_step_2])
				  
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

Pipeline is built.


In [19]:
# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'OLX-parsing-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
pipeline_run.wait_for_completion(show_output=True)

Created step The Page Saver for 25 pages [000e8bff][c1ac0a2c-583e-4f1e-bc1f-aefbe500ef05], (This step will run and generate new outputs)Created step The Page Parser of 25 pages [79a6ae9e][362c8640-89fd-4a08-9755-2dac31ec2445], (This step will run and generate new outputs)

Created step The offer validation [fc7c21c1][6d196a85-9638-4ee4-bf0c-8f21a3944640], (This step will run and generate new outputs)Created step Active pages additional saving [253cae7f][c4a7a013-c698-4739-b71a-f565ced87125], (This step will run and generate new outputs)
Created step Active pages additional parsing [f4a38287][57deb1db-effa-4c23-9b0c-250fee8a83c6], (This step will run and generate new outputs)

Submitted PipelineRun 9128eff2-3daf-45ad-8b9c-7c2d8ff930d3
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/9128eff2-3daf-45ad-8b9c-7c2d8ff930d3?wsid=/subscriptions/439243ce-aaee-42b7-82d8-c88d5a342df3/resourcegroups/webparsing/workspaces/webparsing_centralus&tid=d41e8406-8169-4076-92b7-5301ec927b9

'Finished'

## Publish the pipeline

In [20]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="olx-parsing-pipeline-v1", description="Retrive info from OLX", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
olx-parsing-pipeline-v1,f75da127-f000-45f1-93aa-d5529f90adf2,Active,REST Endpoint


## Schedule the Pipeline

In [22]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule, TimeZone

# Submit the Pipeline every Day of the week at 01:00 UTC
recurrence = ScheduleRecurrence(frequency="Day", 
                                interval=1, 
                                time_of_day="00:15",
                                # time_zone=TimeZone.UTC02,
                                # start_time="2021-09-01T00:00:00"
                                )

daily_schedule = Schedule.create(ws, name="daily-olx-parser", 
                                  description="Based on time",
                                  pipeline_id='f75da127-f000-45f1-93aa-d5529f90adf2	', 
                                  experiment_name='OLX-parsing-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

Pipeline scheduled.


In [23]:
import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import ScheduleRecurrence, Schedule

ws = Workspace.from_config()

ss = Schedule.list(ws)
for s in ss:
    print(s,'\n')

Pipeline(Name: daily-olx-parser,
Id: 7a20a376-c30d-4853-968c-12e795e81ef4,
Status: Active,
Pipeline Id: f75da127-f000-45f1-93aa-d5529f90adf2	,
Pipeline Endpoint Id: None,
Recurrence Details: Runs at 0:15 every Day) 



In [4]:
Schedule.get_schedules_for_pipeline_id(ws, '85845f92-757f-4b79-8458-3cd4c2643f29')

[Pipeline(Name: daily-olx-parser,
 Id: 4a430931-e20c-4b49-8e93-097fa90bf5e2,
 Status: Active,
 Pipeline Id: 85845f92-757f-4b79-8458-3cd4c2643f29,
 Pipeline Endpoint Id: None,
 Recurrence Details: Runs at 0:5 every Day)]

In [None]:
sch = Schedule.get(ws, 'dc37f5a3-1b32-4b73-9d51-85fc0da6dd6a', _workflow_provider=None, _service_endpoint=None)

In [None]:
 Schedule.disable(sch)