In [1]:
import mlflow
import requests
import json
import time
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, AzureCliCredential, InteractiveBrowserCredential
from datetime import datetime
from pathlib import Path

from mlflow.tracking import MlflowClient
from mlflow.utils.rest_utils import http_request
import uuid

subscription_id = '96aede12-2f73-41cb-b983-6d11a904839b'
resource_group = 'promptflow'
workspace_name = 'promptflow-eastus'

# subscription_id = '96aede12-2f73-41cb-b983-6d11a904839b'
# resource_group = 'hod-rg'
# workspace_name = 'hod-pflow'

ml_client = MLClient(credential=AzureCliCredential(),
                        subscription_id=subscription_id,
                        resource_group_name=resource_group,
                        workspace_name=workspace_name)

base_endpoint = ml_client.workspaces.get(ml_client.workspace_name).discovery_url.replace("discovery", "")
url = (
    f"history/v1.0"
    f"/subscriptions/{ml_client.subscription_id}"
    f"/resourceGroups/{ml_client.resource_group_name}"
    f"/providers/Microsoft.MachineLearningServices"
    f"/workspaces/{ml_client.workspace_name}"
)
endpoint_url = base_endpoint + url
print(endpoint_url)

Failed to integrate module: opencensus.ext.logging.trace
No module named 'opencensus.ext.logging'


https://eastus.api.azureml.ms/history/v1.0/subscriptions/96aede12-2f73-41cb-b983-6d11a904839b/resourceGroups/promptflow/providers/Microsoft.MachineLearningServices/workspaces/promptflow-eastus


In [2]:
# credential=AzureCliCredential()
credential=InteractiveBrowserCredential()
token = credential.get_token("https://management.azure.com/.default").token

headers = {  
    'Authorization': f'Bearer {token}',
    'Content-Type': 'application/json',
} 

### Get run from run history

In [None]:

run_id = "c619f648-c809-4545-9f94-f67b0a680706"    # eastus
# run_id = "web_classification_default_20230809_163800_975156"    # hod-pflow
url = f"{endpoint_url}/runs/{run_id}"

response = requests.get(url, headers=headers)

if response.status_code == 200:
    print(f"Successfully get run from run history")
    with open("./download/rh_batch_run.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### Modify run in run history

In [None]:
class MyClass:
    a = 5

# /experimentids/42caf8b8-562b-47f6-b838-0ae2aabb1dc1
run_id = "web_classification_default_20230815_103013_448837"    # eastus
url = f"{endpoint_url}/runs/{run_id}/modify"

payload = {
    # "runId": run_id,
    "hidden": False,
    "tags": {
        "test_tag": {"a": 123}
    },
    "description": "",
    "displayName": "hod-run",
}
response = requests.patch(url, headers=headers, json=payload)

if response.status_code == 200:
    print(f"Successfully updated run.")
    with open("./update/rh_batch_run_resp.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### Get run data from run history

In [None]:

run_id = "eval_classification_accuracy_variant_0_20231205_121502_971898"    # eastus
# run_id = "web_classification_default_20230809_163800_975156"    # hod-pflow
url = f"{endpoint_url}/rundata"

payload = {
    "runId": run_id,
    "selectRunMetadata": True,
    "selectRunDefinition": True,
    "selectJobSpecification": True,
}

response = requests.post(url, headers=headers, json=payload)

if response.status_code == 200:
    print(f"Successfully get run from run history")
    with open("./download/rh_eval_run_data.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### Get run from PFS

In [3]:
run_id = "6ce7ef3a-44ed-47c6-abe8-f4e760f67a78"
pfs_endpoint_url = endpoint_url.replace("history/v1.0", "flow/api")
url = f"{pfs_endpoint_url}/BulkRuns/{run_id}"

response = requests.get(url, headers=headers)

if response.status_code == 200:
    print(f"Successfully get runs from PFS")
    with open("./pfs_run_info.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

Successfully get child runs from PFS


### Get run from index service

In [None]:
run_id = "c619f648-c809-4545-9f94-f67b0a680706"
index_endpoint_url = endpoint_url.replace("/history", "/index")
url = f"{index_endpoint_url}/entities"

payload = {
    "filters": [{
        "field": "type",
        "operator": "eq",
        "values": ["runs"]
    }, {
        "field": "annotations/archived",
        "operator": "eq",
        "values": ["false"]
    },
        {
            "field": "properties/runId",
            "operator": "eq",
            "values": [run_id]
        }
    ],
    "order": [{
        "direction": "Desc",
        "field": "properties/startTime"
    }
    ],
    "pageSize": 50,
}

response = requests.post(url, json=payload, headers=headers)

if response.status_code == 200:
    print(f"Successfully get run info from index service")
    with open("./download/index_batch_run.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### List runs from index service

In [None]:
index_endpoint_url = endpoint_url.replace("/history", "/index")
url = f"{index_endpoint_url}/entities"

payload = {
    "filters": [
        {
            "field": "type",
            "operator": "eq",
            "values": ["runs"]
        },
        {
            "field": "annotations/archived",
            "operator": "eq",
            "values": ["false"]
        },
        {
            "field": "properties/runType",
            "operator": "contains",
            "values": [
                "azureml.promptflow.FlowRun",
                "azureml.promptflow.EvaluationRun",
            ]
        }
    ],
    "freeTextSearch": "",
    "order": [
        {
            "direction": "Desc",
            "field": "properties/creationContext/createdTime"
        }
    ],
    # index service can return 100 results at most
    "pageSize": 50,
    "skip": 0,
    "includeTotalResultCount": True,
    "searchBuilder": "AppendPrefix"
}

response = requests.post(url, json=payload, headers=headers)

if response.status_code == 200:
    print(f"Successfully listed runs from index service")
    with open("./list_entities_50.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### Get metrics from metric service

In [None]:
run_id = "groundedness_eval_default_20230821_191944_818788"
index_endpoint_url = endpoint_url.replace("/history/v1.0", "/metric/v2.0")
url = f"{index_endpoint_url}/runs/{run_id}/lastvalues"

response = requests.post(url, json={}, headers=headers)

if response.status_code == 200:
    print(f"Successfully got metrics from metric service")
    with open("./metrics_cle.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### List flows from index service

In [None]:
index_endpoint_url = endpoint_url.replace("/history", "/index")
url = f"{index_endpoint_url}/entities"

payload = {
    "filters": [
        {
            "field": "type",
            "operator": "eq",
            "values": ["flows"]
        },
        # {
        #     "field": "annotations/archived",
        #     "operator": "eq",
        #     "values": ["true"]
        # },
        {
            "field": "properties/creationContext/createdBy/userTenantId",
            "operator": "eq",
            "values": ["72f988bf-86f1-41af-91ab-2d7cd011db47"]
        },
        {
            "field": "properties/creationContext/createdBy/userObjectId",
            "operator": "eq",
            "values": ["c05e0746-e125-4cb3-9213-a8b535eacd79"]
        }
    ],
    "freeTextSearch": "",
    "order": [
        {
            "direction": "Desc",
            "field": "properties/creationContext/createdTime"
        }
    ],
    # index service can return 100 results at most
    "pageSize": 10,
    "skip": 0,
    "includeTotalResultCount": True,
    "searchBuilder": "AppendPrefix"
}

response = requests.post(url, json=payload, headers=headers)

if response.status_code == 200:
    print(f"Successfully listed runs from index service")
    with open("./flow_list_archived.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### Get snapshot sas token from content service

In [None]:
snapshot_id = "194d5c58-3176-4311-88f0-e01a31a5f706"
index_endpoint_url = endpoint_url.replace("/history/v1.0", "/content/v2.0")
url = f"{index_endpoint_url}/snapshots/sas"

payload = {
    "snapshotOrAssetId": snapshot_id,
    # "path": "string"
}

response = requests.post(url, json=payload, headers=headers)

if response.status_code == 200:
    print(f"Successfully got metrics from metric service")
    with open("./download/snapshot_sas_resp.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

In [None]:
import re

OUTPUT_PORTAL_PATTERN = re.compile(r"azureml://.*?/data/(?P<name>.*?)/versions/(?P<version>.*?)$")
target = 'azureml://locations/eastus/workspaces/3e123da1-f9a5-4c91-9234-8d9ffbb39ff5/data/azureml_web_classification_default_20230809_120434_491077_output_data_flow_outputs/versions/1'

match = OUTPUT_PORTAL_PATTERN.match(target)
if match:
    print(match.group('name'))
    print(match.group('version'))
print(match.groups())

In [None]:
from pathlib import Path
a = Path(".").resolve().absolute()
print(str(a))
print(a.as_posix())

## Test promptflow schema

In [None]:
import json
from marshmallow_jsonschema import JSONSchema  
from promptflow._sdk.schemas._flow_dag import FlowDagSchema

json_schema = JSONSchema().dump(FlowDagSchema(context={"base_path": "."}))
with open("./flow_dag_schema.json", "w") as f:
    json.dump(json_schema, f, indent=4)

# Temprary test

In [34]:

from os import PathLike
from pathlib import Path
from typing import IO, Any, AnyStr, Dict, List, Optional, Set, Tuple, Union
from ruamel.yaml import YAML, YAMLError

def load_yaml(source: Optional[Union[AnyStr, PathLike, IO]]) -> Dict:
    # null check - just return an empty dict.
    # Certain CLI commands rely on this behavior to produce a resource
    # via CLI, which is then populated through CLArgs.
    """Load a local YAML file.

    :param source: The relative or absolute path to the local file.
    :type source: str
    :return: A dictionary representation of the local file's contents.
    :rtype: Dict
    """
    # These imports can't be placed in at top file level because it will cause a circular import in
    # exceptions.py via _get_mfe_url_override

    if source is None:
        return {}

    # pylint: disable=redefined-builtin
    input = None
    must_open_file = False
    try:  # check source type by duck-typing it as an IOBase
        readable = source.readable()
        if not readable:  # source is misformatted stream or file
            msg = "File Permissions Error: The already-open \n\n inputted file is not readable."
            raise Exception(msg)
        # source is an already-open stream or file, we can read() from it directly.
        input = source
    except AttributeError:
        # source has no writable() function, assume it's a string or file path.
        must_open_file = True

    if must_open_file:  # If supplied a file path, open it.
        try:
            input = open(source, "r", encoding="utf-8")
        except OSError:  # FileNotFoundError introduced in Python 3
            msg = "No such file or directory: {}"
            raise Exception(msg.format(source))
    # input should now be a readable file or stream. Parse it.
    cfg = {}
    try:
        yaml = YAML()
        yaml.preserve_quotes = True
        cfg = yaml.load(input)
    except YAMLError as e:
        msg = f"Error while parsing yaml file: {source} \n\n {str(e)}"
        raise Exception(msg)
    finally:
        if must_open_file:
            input.close()
    return cfg


In [35]:
import yaml
from ruamel.yaml import YAML

ryaml = YAML()
ryaml.preserve_quotes = True

In [36]:
yaml_file = "./flows/web_classification/flow.dag.yaml"

with open(yaml_file, encoding="utf-8") as f:
    yaml1 = yaml.safe_load(f)

with open(yaml_file, encoding="utf-8") as f:
    yaml2 = ryaml.load(f)

In [15]:
print(yaml1 == yaml2)
print(yaml2)

True
{'a': [1, 2, 3, 4, True], 'inputs': {'url': {'type': 'string', 'default': 'https://www.microsoft.com/en-us/d/xbox-wireless-controller-stellar-shift-special-edition/94fbjc7h0h6h'}}, 'outputs': {'category': {'type': 'string', 'reference': '${convert_to_dict.output.category}'}, 'evidence': {'type': 'string', 'reference': '${convert_to_dict.output.evidence}'}}, 'nodes': [{'name': 'fetch_text_content_from_url', 'type': 'python', 'source': {'type': 'code', 'path': 'fetch_text_content_from_url.py'}, 'inputs': {'url': '${inputs.url}'}}, {'name': 'summarize_text_content', 'use_variants': True}, {'name': 'prepare_examples', 'type': 'python', 'source': {'type': 'code', 'path': 'prepare_examples.py'}, 'inputs': {}}, {'name': 'classify_with_llm', 'type': 'llm', 'source': {'type': 'code', 'path': 'classify_with_llm.jinja2'}, 'inputs': {'deployment_name': 'text-davinci-003', 'suffix': '', 'max_tokens': '128', 'temperature': '0.2', 'top_p': '1.0', 'logprobs': '', 'echo': 'False', 'stop': '', 'pre

In [17]:
with open("./yaml1.yaml", "w") as f:
    yaml.safe_dump(yaml1, f, indent=4)
    
ryaml.default_flow_style = True
with open("./yaml2.yaml", "w") as f:
    ryaml.dump(yaml2, f)

In [9]:
from pathlib import Path

a = Path(yaml_file).read_text()
b = ryaml.load(a)
print(b)
print(b == yaml2)

{'a': [1, 2, 3, 4, True], 'inputs': {'url': {'type': 'string', 'default': 'https://www.microsoft.com/en-us/d/xbox-wireless-controller-stellar-shift-special-edition/94fbjc7h0h6h'}}, 'outputs': {'category': {'type': 'string', 'reference': '${convert_to_dict.output.category}'}, 'evidence': {'type': 'string', 'reference': '${convert_to_dict.output.evidence}'}}, 'nodes': [{'name': 'fetch_text_content_from_url', 'type': 'python', 'source': {'type': 'code', 'path': 'fetch_text_content_from_url.py'}, 'inputs': {'url': '${inputs.url}'}}, {'name': 'summarize_text_content', 'use_variants': True}, {'name': 'prepare_examples', 'type': 'python', 'source': {'type': 'code', 'path': 'prepare_examples.py'}, 'inputs': {}}, {'name': 'classify_with_llm', 'type': 'llm', 'source': {'type': 'code', 'path': 'classify_with_llm.jinja2'}, 'inputs': {'deployment_name': 'text-davinci-003', 'suffix': '', 'max_tokens': '128', 'temperature': '0.2', 'top_p': '1.0', 'logprobs': '', 'echo': 'False', 'stop': '', 'presence

In [40]:
from collections import OrderedDict  
  
# Create an ordered dictionary  
my_dict = OrderedDict()  
  
# Add key-value pairs to the dictionary  
my_dict['apple'] = 3  
my_dict['banana'] = 5  
my_dict['orange'] = 2  

a = {}
a["a"] = my_dict
a["b"] = [1,2,3,4]
a["c"] = "hello"

# with open("./ordered_dict.yaml", "w", encoding="utf-8") as f:
#     ryaml.dump(a, f)

# with open("./ordered_dict_pyyaml.yaml", "w", encoding="utf-8") as f:
#     yaml.safe_dump(a, f)


In [12]:
with open("./ordered_dict.yaml", encoding="utf-8") as f:
    yaml1 = ryaml.load(f)

print(yaml1)

{'a': {'apple': 3, 'banana': 5, 'orange': 2}, 'b': [1, 2, 3, 4], 'c': 'hello'}


In [13]:
with open("./ordered_dict_1.yaml", "w", encoding="utf-8") as f:
    ryaml.dump(a, f)

In [37]:
a = load_yaml(yaml_file)
with open(yaml_file, encoding="utf-8") as f:
    b = load_yaml(f)

with open(yaml_file, "r", encoding="utf-8") as f:
    content = f.read()
    # print(content)
    c = ryaml.load(content)
    print(type(content), type(c))

print(type(a))
print(a == b)
print(a == c)
# print(c)

<class 'str'> <class 'ruamel.yaml.comments.CommentedMap'>
<class 'ruamel.yaml.comments.CommentedMap'>
True
True
