In [3]:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input, Output, load_component
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.parallel import parallel_run_function, RunFunction

In [4]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [5]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
# cpu_compute_target = "cpu-cluster"
# print(ml_client.compute.get(cpu_compute_target))
# gpu_compute_target = "A10024"
# print(ml_client.compute.get(gpu_compute_target))

Found the config file in: /config.json


In [6]:


# parallel task to process file data
file_batch_inference = parallel_run_function(
    name="file_batch_score",
    display_name="Batch Score with File Dataset",
    description="parallel component for batch score",
    inputs=dict(
        job_data_path=Input(
            type=AssetTypes.URI_FOLDER,
            description="The data to be split and scored in parallel",
        )
    ),
    # outputs=dict(
    #     job_output_path=Output(type="uri_folder", path="./output")
    # ),
    input_data="${{inputs.job_data_path}}",
    instance_count=1,
    max_concurrency_per_instance=1,
    mini_batch_size="1",
    mini_batch_error_threshold=1,
    retry_settings=dict(max_retries=2, timeout=60),
    logging_level="DEBUG",
    is_deterministic=False,
    task=RunFunction(
        code="./",
        entry_script="test.py",
        # program_arguments="--job_output_path ${{outputs.job_output_path}}",
        # environment=ml_client.environments.get(name="testenv", version="1")
        environment="azureml://registries/azureml/environments/sklearn-1.5/labels/latest"
    ),
)

In [7]:

@pipeline()
def parallel_in_pipeline():
   
    batch_inference_with_file_data = file_batch_inference(
        job_data_path=Input(path="./input/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.RO_MOUNT)
    )
    
    return {
        # "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
        # "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
    }

# create a pipeline
pipeline_job = parallel_in_pipeline(
    # pipeline_job_data_path=pipeline_job_data_path,
    # pipeline_score_model=pipeline_score_model,
)
# pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE

# set pipeline level compute
pipeline_job.settings.default_compute = "zhzhen1"

In [7]:
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
[32mUploading input (0.0 MBs): 100%|█

Experiment,Name,Type,Status,Details Page
pipeline_samples,green_chain_c843q7bn3z,pipeline,NotStarted,Link to Azure Machine Learning studio


In [8]:
from datetime import datetime, timedelta
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import MLClient, Input, load_component
from azure.ai.ml.constants import TimeZone
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import (
    JobSchedule,
    CronTrigger,
    RecurrenceTrigger,
    RecurrencePattern,
)
from datetime import datetime


# Set the start time to UTC+8
schedule_start_time = datetime.utcnow() + timedelta(hours=8)
schedule_name = "test_schedule1"

# Update the recurrence trigger to run every 5 minutes
recurrence_trigger = RecurrenceTrigger(
    frequency="minute",
    interval=5,
    # schedule=RecurrencePattern(hours=[], minutes=list(range(0, 60, 5))),
    start_time=schedule_start_time,
    time_zone=TimeZone("China Standard Time"),
)

# Create the job schedule
job_schedule = JobSchedule(
    name= schedule_name, 
    trigger=recurrence_trigger, 
    create_job=pipeline_job
)


In [9]:
job_schedule = ml_client.schedules.begin_create_or_update(
    schedule=job_schedule
).result()
print(job_schedule)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
[32mUploading input (0.0 MBs): 100%|█

..name: test_schedule1
create_job:
  display_name: parallel_in_pipeline
  status: NotStarted
  experiment_name: Default
  type: pipeline
  settings:
    default_compute: azureml:/subscriptions/3158be26-2033-4ef0-98c4-2c77cfb66a18/resourceGroups/amlrg/providers/Microsoft.MachineLearningServices/workspaces/mljpe/computes/zhzhen1
  jobs:
    file_batch_score:
      logging_level: DEBUG
      task:
        type: run_function
        code: azureml:/subscriptions/3158be26-2033-4ef0-98c4-2c77cfb66a18/resourceGroups/amlrg/providers/Microsoft.MachineLearningServices/workspaces/mljpe/codes/2c86fb69-e49c-4003-84d3-bfeecb7ee3e6/versions/1
        entry_script: test.py
        environment: azureml://registries/azureml/environments/sklearn-1.5/labels/latest
      mini_batch_size: '1'
      input_data: ${{inputs.job_data_path}}
      resources:
        instance_count: 1
      retry_settings:
        timeout: 60
        max_retries: 2
      max_concurrency_per_instance: 1
      mini_batch_error_thresh

In [19]:
import time

startts = int(time.time())
print("startts: ", startts)

startts:  1727600410


In [48]:

from azure.ai.ml import MLClient, command, Input
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())

data_asset = ml_client.data.get("azureml_green_chain_c843q7bn3z_input_data_file_batch_score_job_data_path", version="1")

# to successfully create a job, customize the parameters below based on your workspace resources
job = command(
        command='ls "${{inputs.data}}"',
        inputs={
            "data": Input(path=data_asset.id,
                type=AssetTypes.URI_FOLDER,
                mode=InputOutputModes.RO_MOUNT
            )
        },
        environment="azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest"
      )
returned_job = ml_client.jobs.create_or_update(job)

Found the config file in: /config.json


In [47]:

import mltable
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())
data_asset = ml_client.data.get("azureml_green_chain_c843q7bn3z_input_data_file_batch_score_job_data_path", version="1")

path = {
  'folder': data_asset.path
}

tbl = mltable.from_delimited_files(paths=[path])
df = tbl.to_pandas_dataframe()
df

Found the config file in: /config.json
First partition columns (ordered): ['## This file was auto generated by the Azure Machine Learning Studio. Please do not remove.']
Found Partition has columns (ordered): ['## This file was auto generated by the Azure Machine Learning Studio. Please do not remove.', 'Column2']


Unnamed: 0,## This file was auto generated by the Azure Machine Learning Studio. Please do not remove.,Column2
0,## Read more about the .amlignore file here: h...,
1,,
2,.ipynb_aml_checkpoints/,
3,*.amltmp,
4,*.amltemp,
5,8. 问：什么是意外伤害保障？,
6,答：意外伤害保障为对员工因意外伤害造成的身故、伤残，由中意人寿提供一定的经济补,
7,偿。因此，此项保障可以弥补遭受意外伤害时个人和家庭遭受的经济损失。该保障为全球,
8,二十四小时的保障责任。,
9,9. 问：什么样的情况算意外伤害？,
