In [7]:
import requests
import json

class OpenMetadataApi:
    def __init__(self, services, base_url, token):
        self.SERVICES = services
        self.BASE_URL = base_url
        self.TOKEN = token

    API_URLS = {
        "list_test_case": {
            "method": "get",
            "url": "/dataQuality/testCases"
        },
        "create_test_case": {
            "method": "post",
            "url": "/dataQuality/testCases"
        },
        "delete_test_case_by_id": {
            "method": "delete",
            "url": "/dataQuality/testCases/{id}"
        },
        "add_test_cases_to_test_suites": {
            "method": "put",
            "url": "/dataQuality/testCases/logicalTestCases"
        },
        "get_test_suites_by_name": {
            "method": "get",
            "url": "/dataQuality/testSuites/name/{name}"
        },
        "list_test_suites": {
            "method": "get",
            "url": "/dataQuality/testSuites"
        },
        "create_test_suites": {
            "method": "post",
            "url": "/dataQuality/testSuites"
        },
        "create_executable_test_suites": {
            "method": "post",
            "url": "/dataQuality/testSuites/basic"
        },
        "create_test_definition": {
            "method": "post",
            "url": "/dataQuality/testDefinition"
        },
        "list_domains": {
            "method": "get",
            "url": "/domains"
        },
        "create_domains": {
            "method": "post",
            "url": "/domains"
        },
        "list_schema": {
            "method": "get",
            "url": "/databaseSchemas"
        },
        "update_schema": {
            "method": "put",
            "url": "/databaseSchemas"
        },
        "bulk_add_assets": {
            "method": "put",
            "url": "/domains/{name}/assets/add"
        },
        "list_ingestion_pipelines": {
            "method": "get",
            "url": "/services/ingestionPipelines"
        },
        "delete_ingestion_pipelines": {
            "method": "delete",
            "url": "/services/ingestionPipelines/{id}"
        },
        "create_executable_test_suites_pipelines": {
            "method": "post",
            "url": "/services/ingestionPipelines"
        },
        "deploy_test_suites_pipelines": {
            "method": "post",
            "url": "/services/ingestionPipelines/deploy/{id}"
        },
        "list_users": {
            "method": "get",
            "url": "/users"
        },
        "list_tables": {
            "method": "get",
            "url": "/tables"
        },
        "update_table": {
            "method": "put",
            "url": "/tables"
        },
        "update_table_by_id": {
            "method": "patch",
            "url": "/tables/{id}"
        },
        "create_subscriptions": {
            "method": "post",
            "url": "/events/subscriptions"
        },
        "list_subscriptions": {
            "method": "get",
            "url": "/events/subscriptions"
        },
        "delete_subscription_by_id": {
            "method": "delete",
            "url": "/events/subscriptions/{id}"
        },
    }

    def headers(self):
        return {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.TOKEN}"
        }

    def get_response(self, api, params=None, url_params=None):
        if params is None:
            params = {}
        if url_params is None:
            url_params = {}

        api_info = self.API_URLS.get(api)
        if not api_info:
            raise ValueError(f"API {api} not found")

        url = api_info["url"]
        for key, value in url_params.items():
            url = url.replace(f"{{{key}}}", str(value))

        full_url = self.BASE_URL + url
        method = api_info["method"]
        headers = self.headers()

        if method == "get":
            response = requests.get(full_url, params=params, headers=headers)
        elif method == "post":
            response = requests.post(full_url, json=params, headers=headers)
        elif method == "put":
            response = requests.put(full_url, json=params, headers=headers)
        elif method == "delete":
            response = requests.delete(full_url, params=params, headers=headers)
        elif method == "patch":
            headers["Content-Type"] = "application/json-patch+json"
            response = requests.patch(full_url, json=params, headers=headers)
        else:
            raise ValueError(f"Unsupported method: {method}")

        return response.json()

    def create_test_case_by_sql_params(self, name, displayName, description, schema, table, sql, strategy, threshold):
        return {
            "computePassedFailedRowCount": True,
            "description": description,
            "displayName": displayName,
            "entityLink": f"<#E::table::{self.SERVICES}.hive.{schema}.{table}>",
            "name": name,
            "parameterValues": [
                {"name": "sqlExpression", "value": sql},
                {"name": "strategy", "value": strategy},
                {"name": "threshold", "value": threshold}
            ],
            "testDefinition": "tableCustomSQLQuery",
            "testSuite": f"{self.SERVICES}.hive.{schema}.{table}.testSuite",
        }

{'code': 404, 'message': 'testSuite instance for gbsp.hive.my_schema.my_table.testSuite not found'}


In [None]:
if __name__ == "__main__":
    # 配置参数
    SERVICES = "gbsp"
    BASE_URL = "http://123.207.39.39:8585/api/v1"
    TOKEN = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6InlhbmcuZGVndWkiLCJyb2xlcyI6W10sImVtYWlsIjoieWFuZy5kZWd1aUBzaW5jZXRpbWVzLmNvbSIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3NDMwNTQ0NzMsImV4cCI6bnVsbH0.UoLQbvqxIoeEcNG6q9IROA-y9HGSRIEZcV42wcLvYoaL_hXuKLDFlFTsc8D6tZk7kzUUZKYUa7A1mcyHBfpOekOZ4V8BG38fjFHgF5GO7AJT1YCSQ8gWrdtks0t144FIUuTxMPxEJ-ksLf6PABqDeLYCnyzlkhHD3MOcx-oZIkwN70O6q_3wSu7srz4FwbApqYMicHkrdwtPwhR4q3z6Bqtbccb_47B272bPkHrKGPoiI0_FAS5iRiQzapDbzOAUVZT3YumztYP3edXcFFfEuooCRjt2JxB4cWDZ47ilbqD_8oqFfPkELWAtDA1_nhKzTc2XB0ZIOavpcdzFyqTZlw"
    
    # 初始化 API 客户端
    api = OpenMetadataApi(services=SERVICES, base_url=BASE_URL, token=TOKEN)
    
    # 示例：创建测试用例
    test_case_params = api.create_test_case_by_sql_params(
        name="test_case_1",
        displayName="Test Case 1",
        description="This is a test case",
        schema="my_schema",
        table="my_table",
        sql="SELECT * FROM my_table WHERE id > 100",
        strategy="count",
        threshold="10"
    )
    
    # 发送请求
    response = api.get_response("create_test_case", params=test_case_params)
    print(response)