From 4cdaa13d30f4b9742209016f3d55a36b8f63078c Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Mon, 4 Dec 2023 23:58:49 +0800 Subject: [PATCH 01/10] refactor(client): support asyncio and better client type hint --- instill/clients/__init__.py | 2 +- instill/clients/client.py | 56 ++------- instill/clients/instance.py | 40 ++++++ instill/clients/mgmt.py | 96 ++++++--------- instill/clients/model.py | 141 ++++++++++----------- instill/clients/pipeline.py | 236 ++++++++++++++++++------------------ 6 files changed, 269 insertions(+), 302 deletions(-) create mode 100644 instill/clients/instance.py diff --git a/instill/clients/__init__.py b/instill/clients/__init__.py index acd20ff..9dde8e5 100644 --- a/instill/clients/__init__.py +++ b/instill/clients/__init__.py @@ -1,4 +1,4 @@ -from instill.clients.client import InstillClient, get_client +from instill.clients.client import InstillClient from instill.clients.mgmt import MgmtClient from instill.clients.model import ModelClient from instill.clients.pipeline import PipelineClient diff --git a/instill/clients/client.py b/instill/clients/client.py index 54ca1f9..069f9dc 100644 --- a/instill/clients/client.py +++ b/instill/clients/client.py @@ -5,49 +5,19 @@ from instill.utils.error_handler import NotServingException from instill.utils.logger import Logger -_mgmt_client = None -_pipeline_client = None -_model_client = None -_client = None - - -def _get_mgmt_client() -> MgmtClient: - global _mgmt_client - - if _mgmt_client is None: - _mgmt_client = MgmtClient() - - return _mgmt_client - - -def _get_pipeline_client() -> PipelineClient: - global _pipeline_client - - if _pipeline_client is None: - _pipeline_client = PipelineClient(namespace=_get_mgmt_client().get_user().name) - - return _pipeline_client - - -def _get_model_client() -> ModelClient: - global _model_client - - if _model_client is None: - _model_client = ModelClient(namespace=_get_mgmt_client().get_user().name) - - return _model_client - class InstillClient: def __init__(self) -> None: - self.mgmt_service = _get_mgmt_client() + self.mgmt_service = MgmtClient() if not self.mgmt_service.is_serving(): Logger.w("Instill Core is required") raise NotServingException - self.pipeline_service = _get_pipeline_client() + self.pipeline_service = PipelineClient( + namespace=self.mgmt_service.get_user().name + ) if not self.pipeline_service.is_serving(): Logger.w("Instill VDP is not serving, VDP functionalities will not work") - self.model_service = _get_model_client() + self.model_service = ModelClient(namespace=self.mgmt_service.get_user().name) if not self.model_service.is_serving(): Logger.w( "Instill Model is not serving, Model functionalities will not work" @@ -61,19 +31,17 @@ def set_instance(self, instance: str): def close(self): if self.mgmt_service.is_serving(): for host in self.mgmt_service.hosts.values(): - host["channel"].close() + host.channel.close() + host.async_channel.close() if self.pipeline_service.is_serving(): for host in self.pipeline_service.hosts.values(): - host["channel"].close() + host.channel.close() + host.async_channel.close() if self.model_service.is_serving(): for host in self.model_service.hosts.values(): - host["channel"].close() + host.channel.close() + host.async_channel.close() def get_client() -> InstillClient: - global _client - - if _client is None: - _client = InstillClient() - - return _client + return InstillClient() diff --git a/instill/clients/instance.py b/instill/clients/instance.py new file mode 100644 index 0000000..81847c7 --- /dev/null +++ b/instill/clients/instance.py @@ -0,0 +1,40 @@ +from typing import Union +import grpc + + +import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service +import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service +import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service + + +class InstillInstance: + def __init__(self, url: str, token: str, secure: bool, stub): + self.token: str = token + if not secure: + channel = grpc.insecure_channel(url) + async_channel = grpc.aio.insecure_channel(url) + self.metadata = ( + ( + "authorization", + f"Bearer {token}", + ), + ) + else: + ssl_creds = grpc.ssl_channel_credentials() + call_creds = grpc.access_token_call_credentials(token) + creds = grpc.composite_channel_credentials(ssl_creds, call_creds) + channel = grpc.secure_channel(target=url, credentials=creds) + async_channel = grpc.aio.secure_channel(target=url, credentials=creds) + self.metadata = "" + self.channel: grpc.Channel = channel + self.async_channel: grpc.Channel = async_channel + self.client: Union[ + model_service.ModelPublicServiceStub, + pipeline_service.PipelinePublicServiceStub, + mgmt_service.MgmtPublicServiceStub, + ] = stub(channel) + self.async_client: Union[ + model_service.ModelPublicServiceStub, + pipeline_service.PipelinePublicServiceStub, + mgmt_service.MgmtPublicServiceStub, + ] = stub(async_channel) diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index 30a24fa..c235d6a 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -1,17 +1,15 @@ # pylint: disable=no-member,wrong-import-position -from collections import defaultdict - -import grpc - +from typing import Dict import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck # mgmt import instill.protogen.core.mgmt.v1alpha.metric_pb2 as metric_interface import instill.protogen.core.mgmt.v1alpha.mgmt_pb2 as mgmt_interface import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service -from instill.clients import constant # common +from instill.clients.constant import DEFAULT_INSTANCE +from instill.clients.instance import InstillInstance from instill.clients.base import Client from instill.configuration import global_config from instill.utils.error_handler import grpc_handler @@ -21,9 +19,9 @@ class MgmtClient(Client): def __init__(self) -> None: - self.hosts: defaultdict = defaultdict(dict) - if constant.DEFAULT_INSTANCE in global_config.hosts: - self.instance = constant.DEFAULT_INSTANCE + self.hosts: Dict[str, InstillInstance] = {} + if DEFAULT_INSTANCE in global_config.hosts: + self.instance = DEFAULT_INSTANCE elif len(global_config.hosts) == 0: self.instance = "" else: @@ -31,27 +29,11 @@ def __init__(self) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): - if not config.secure: - channel = grpc.insecure_channel(config.url) - self.hosts[instance]["metadata"] = ( - ( - "authorization", - f"Bearer {config.token}", - ), - ) - else: - ssl_creds = grpc.ssl_channel_credentials() - call_creds = grpc.access_token_call_credentials(config.token) - creds = grpc.composite_channel_credentials(ssl_creds, call_creds) - channel = grpc.secure_channel( - target=config.url, - credentials=creds, - ) - self.hosts[instance]["metadata"] = "" - self.hosts[instance]["token"] = config.token - self.hosts[instance]["channel"] = channel - self.hosts[instance]["client"] = mgmt_service.MgmtPublicServiceStub( - channel + self.hosts[instance] = InstillInstance( + config.url, + config.token, + config.secure, + mgmt_service.MgmtPublicServiceStub, ) @property @@ -79,15 +61,15 @@ def metadata(self, metadata: str): self._metadata = metadata def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: mgmt_interface.LivenessResponse = self.hosts[self.instance][ - "client" - ].Liveness(request=mgmt_interface.LivenessRequest()) + resp: mgmt_interface.LivenessResponse = self.hosts[ + self.instance + ].client.Liveness(request=mgmt_interface.LivenessRequest()) return resp.health_check_response.status def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: mgmt_interface.ReadinessResponse = self.hosts[self.instance][ - "client" - ].Readiness(request=mgmt_interface.ReadinessRequest()) + resp: mgmt_interface.ReadinessResponse = self.hosts[ + self.instance + ].client.Readiness(request=mgmt_interface.ReadinessRequest()) return resp.health_check_response.status def is_serving(self) -> bool: @@ -101,9 +83,9 @@ def is_serving(self) -> bool: @grpc_handler def login(self, username="admin", password="password") -> str: - resp: mgmt_interface.AuthLoginResponse = self.hosts[self.instance][ - "client" - ].AuthLogin( + resp: mgmt_interface.AuthLoginResponse = self.hosts[ + self.instance + ].client.AuthLogin( request=mgmt_interface.AuthLoginRequest( username=username, password=password ) @@ -112,21 +94,19 @@ def login(self, username="admin", password="password") -> str: @grpc_handler def get_token(self, name: str) -> mgmt_interface.ApiToken: - resp: mgmt_interface.GetTokenResponse = self.hosts[self.instance][ - "client" - ].GetToken( + resp: mgmt_interface.GetTokenResponse = self.hosts[ + self.instance + ].client.GetToken( request=mgmt_interface.GetTokenRequest(name=name), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.token @grpc_handler def get_user(self) -> mgmt_interface.User: - resp: mgmt_interface.GetUserResponse = self.hosts[self.instance][ - "client" - ].GetUser( + resp: mgmt_interface.GetUserResponse = self.hosts[self.instance].client.GetUser( request=mgmt_interface.GetUserRequest(name="users/me"), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.user @@ -134,52 +114,52 @@ def get_user(self) -> mgmt_interface.User: def list_pipeline_trigger_records( self, ) -> metric_interface.ListPipelineTriggerRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + return self.hosts[self.instance].client.ListPipelineTriggerRecords( request=metric_interface.ListPipelineTriggerChartRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler def list_pipeline_trigger_table_records( self, ) -> metric_interface.ListPipelineTriggerTableRecordsRequest: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + return self.hosts[self.instance].client.ListPipelineTriggerRecords( request=metric_interface.ListPipelineTriggerTableRecordsResponse(), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler def list_pipeline_trigger_chart_records( self, ) -> metric_interface.ListPipelineTriggerChartRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + return self.hosts[self.instance].client.ListPipelineTriggerRecords( request=metric_interface.ListPipelineTriggerChartRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler def list_connector_execute_records( self, ) -> metric_interface.ListConnectorExecuteRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + return self.hosts[self.instance].client.ListPipelineTriggerRecords( request=metric_interface.ListConnectorExecuteRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler def list_connector_execute_table_records( self, ) -> metric_interface.ListConnectorExecuteTableRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + return self.hosts[self.instance].client.ListPipelineTriggerRecords( request=metric_interface.ListConnectorExecuteTableRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler def list_connector_execute_chart_records( self, ) -> metric_interface.ListConnectorExecuteChartRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + return self.hosts[self.instance].client.ListPipelineTriggerRecords( request=metric_interface.ListConnectorExecuteChartRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) diff --git a/instill/clients/model.py b/instill/clients/model.py index 23ec2e8..12551df 100644 --- a/instill/clients/model.py +++ b/instill/clients/model.py @@ -1,32 +1,31 @@ # pylint: disable=no-member,wrong-import-position import time -from collections import defaultdict -from typing import Iterable, Tuple, Union +from typing import Iterable, Tuple, Union, Dict -import grpc from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 -import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck -import instill.protogen.model.model.v1alpha.model_definition_pb2 as model_definition_interface # model import instill.protogen.model.model.v1alpha.model_pb2 as model_interface import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service -from instill.clients import constant -from instill.clients.base import Client +import instill.protogen.model.model.v1alpha.model_definition_pb2 as model_definition_interface # common +import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck +from instill.clients.constant import DEFAULT_INSTANCE +from instill.clients.instance import InstillInstance +from instill.clients.base import Client from instill.configuration import global_config from instill.utils.error_handler import grpc_handler class ModelClient(Client): def __init__(self, namespace: str) -> None: - self.hosts: defaultdict = defaultdict(dict) + self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace - if constant.DEFAULT_INSTANCE in global_config.hosts: - self.instance = constant.DEFAULT_INSTANCE + if DEFAULT_INSTANCE in global_config.hosts: + self.instance = DEFAULT_INSTANCE elif len(global_config.hosts) == 0: self.instance = "" else: @@ -34,27 +33,11 @@ def __init__(self, namespace: str) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): - if not config.secure: - channel = grpc.insecure_channel(config.url) - self.hosts[instance]["metadata"] = ( - ( - "authorization", - f"Bearer {config.token}", - ), - ) - else: - ssl_creds = grpc.ssl_channel_credentials() - call_creds = grpc.access_token_call_credentials(config.token) - creds = grpc.composite_channel_credentials(ssl_creds, call_creds) - channel = grpc.secure_channel( - target=config.url, - credentials=creds, - ) - self.hosts[instance]["metadata"] = "" - self.hosts[instance]["token"] = config.token - self.hosts[instance]["channel"] = channel - self.hosts[instance]["client"] = model_service.ModelPublicServiceStub( - channel + self.hosts[instance] = InstillInstance( + config.url, + config.token, + config.secure, + model_service.ModelPublicServiceStub, ) @property @@ -82,15 +65,15 @@ def metadata(self, metadata: str): self._metadata = metadata def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: model_interface.LivenessResponse = self.hosts[self.instance][ - "client" - ].Liveness(request=model_interface.LivenessRequest()) + resp: model_interface.LivenessResponse = self.hosts[ + self.instance + ].client.Liveness(request=model_interface.LivenessRequest()) return resp.health_check_response.status def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: model_interface.ReadinessResponse = self.hosts[self.instance][ - "client" - ].Readiness(request=model_interface.ReadinessRequest()) + resp: model_interface.ReadinessResponse = self.hosts[ + self.instance + ].client.Readiness(request=model_interface.ReadinessRequest()) return resp.health_check_response.status def is_serving(self) -> bool: @@ -104,13 +87,13 @@ def is_serving(self) -> bool: @grpc_handler def watch_model(self, model_name: str) -> model_interface.Model.State.ValueType: - resp: model_interface.WatchUserModelResponse = self.hosts[self.instance][ - "client" - ].WatchUserModel( + resp: model_interface.WatchUserModelResponse = self.hosts[ + self.instance + ].client.WatchUserModel( request=model_interface.WatchUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.state @@ -132,9 +115,9 @@ def create_model_local( parent=self.namespace, model=model, content=data ) create_resp: model_interface.CreateUserModelBinaryFileUploadResponse = ( - self.hosts[self.instance]["client"].CreateUserModelBinaryFileUpload( + self.hosts[self.instance].client.CreateUserModelBinaryFileUpload( request_iterator=iter([req]), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) ) @@ -164,11 +147,11 @@ def create_model( model.configuration.update(configuration) create_resp: model_interface.CreateUserModelResponse = self.hosts[ self.instance - ]["client"].CreateUserModel( + ].client.CreateUserModel( request=model_interface.CreateUserModelRequest( model=model, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) while self.get_operation(name=create_resp.operation.name).done is not True: @@ -190,11 +173,11 @@ def create_model( @grpc_handler def deploy_model(self, model_name: str) -> model_interface.Model.State: - self.hosts[self.instance]["client"].DeployUserModel( + self.hosts[self.instance].client.DeployUserModel( request=model_interface.DeployUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) state = self.watch_model(model_name=model_name) @@ -206,11 +189,11 @@ def deploy_model(self, model_name: str) -> model_interface.Model.State: @grpc_handler def undeploy_model(self, model_name: str) -> model_interface.Model.State: - self.hosts[self.instance]["client"].UndeployUserModel( + self.hosts[self.instance].client.UndeployUserModel( request=model_interface.UndeployUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) state = self.watch_model(model_name=model_name) @@ -222,35 +205,35 @@ def undeploy_model(self, model_name: str) -> model_interface.Model.State: @grpc_handler def trigger_model(self, model_name: str, task_inputs: list) -> Iterable: - resp: model_interface.TriggerUserModelResponse = self.hosts[self.instance][ - "client" - ].TriggerUserModel( + resp: model_interface.TriggerUserModelResponse = self.hosts[ + self.instance + ].client.TriggerUserModel( request=model_interface.TriggerUserModelRequest( name=f"{self.namespace}/models/{model_name}", task_inputs=task_inputs ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.task_outputs @grpc_handler def delete_model(self, model_name: str): - self.hosts[self.instance]["client"].DeleteUserModel( + self.hosts[self.instance].client.DeleteUserModel( request=model_interface.DeleteUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler def get_model(self, model_name: str) -> model_interface.Model: - resp: model_interface.GetUserModelResponse = self.hosts[self.instance][ - "client" - ].GetUserModel( + resp: model_interface.GetUserModelResponse = self.hosts[ + self.instance + ].client.GetUserModel( request=model_interface.GetUserModelRequest( name=f"{self.namespace}/models/{model_name}", view=model_definition_interface.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.model @@ -258,36 +241,36 @@ def get_model(self, model_name: str) -> model_interface.Model: def update_model( self, model: model_interface.Model, mask: field_mask_pb2.FieldMask ) -> model_interface.Model: - resp: model_interface.UpdateUserModelResponse = self.hosts[self.instance][ - "client" - ].UpdateUserModel( + resp: model_interface.UpdateUserModelResponse = self.hosts[ + self.instance + ].client.UpdateUserModel( request=model_interface.UpdateUserModelRequest( model=model, update_mask=mask, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.model @grpc_handler def lookup_model(self, model_uid: str) -> model_interface.Model: - resp: model_interface.LookUpModelResponse = self.hosts[self.instance][ - "client" - ].LookUpModel( + resp: model_interface.LookUpModelResponse = self.hosts[ + self.instance + ].client.LookUpModel( request=model_interface.LookUpModelRequest(permalink=f"models/{model_uid}"), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.model @grpc_handler def get_model_card(self, model_name: str) -> model_interface.ModelCard: - resp: model_interface.GetUserModelCardResponse = self.hosts[self.instance][ - "client" - ].GetUserModel( + resp: model_interface.GetUserModelCardResponse = self.hosts[ + self.instance + ].client.GetUserModel( request=model_interface.GetUserModelCardRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.readme @@ -297,26 +280,26 @@ def list_models(self, public=False) -> Tuple[Iterable, str, int]: model_interface.ListModelsResponse, model_interface.ListUserModelsResponse ] if not public: - resp = self.hosts[self.instance]["client"].ListUserModels( + resp = self.hosts[self.instance].client.ListUserModels( request=model_interface.ListUserModelsRequest(parent=self.namespace), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) else: - resp = self.hosts[self.instance]["client"].ListModels( + resp = self.hosts[self.instance].client.ListModels( request=model_interface.ListModelsRequest(), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.models, resp.next_page_token, resp.total_size @grpc_handler def get_operation(self, name: str) -> operations_pb2.Operation: - resp: model_interface.GetModelOperationResponse = self.hosts[self.instance][ - "client" - ].GetModelOperation( + resp: model_interface.GetModelOperationResponse = self.hosts[ + self.instance + ].client.GetModelOperation( request=model_interface.GetModelOperationRequest( name=name, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.operation diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index e4f7501..21423f0 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -1,21 +1,19 @@ # pylint: disable=no-member,wrong-import-position -from collections import defaultdict -from typing import Iterable, Tuple, Union +from typing import Iterable, Tuple, Union, Dict -import grpc from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 -import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck - # pipeline import instill.protogen.vdp.pipeline.v1alpha.connector_pb2 as connector_interface import instill.protogen.vdp.pipeline.v1alpha.operator_definition_pb2 as operator_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_pb2 as pipeline_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service -from instill.clients import constant # common +import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck +from instill.clients.constant import DEFAULT_INSTANCE +from instill.clients.instance import InstillInstance from instill.clients.base import Client from instill.configuration import global_config from instill.utils.error_handler import grpc_handler @@ -25,10 +23,10 @@ class PipelineClient(Client): def __init__(self, namespace: str) -> None: - self.hosts: defaultdict = defaultdict(dict) + self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace - if constant.DEFAULT_INSTANCE in global_config.hosts: - self.instance = constant.DEFAULT_INSTANCE + if DEFAULT_INSTANCE in global_config.hosts: + self.instance = DEFAULT_INSTANCE elif len(global_config.hosts) == 0: self.instance = "" else: @@ -36,35 +34,19 @@ def __init__(self, namespace: str) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): - if not config.secure: - channel = grpc.insecure_channel(config.url) - self.hosts[instance]["metadata"] = ( - ( - "authorization", - f"Bearer {config.token}", - ), - ) - else: - ssl_creds = grpc.ssl_channel_credentials() - call_creds = grpc.access_token_call_credentials(config.token) - creds = grpc.composite_channel_credentials(ssl_creds, call_creds) - channel = grpc.secure_channel( - target=config.url, - credentials=creds, - ) - self.hosts[instance]["metadata"] = "" - self.hosts[instance]["token"] = config.token - self.hosts[instance]["channel"] = channel - self.hosts[instance][ - "client" - ] = pipeline_service.PipelinePublicServiceStub(channel) + self.hosts[instance] = InstillInstance( + config.url, + config.token, + config.secure, + pipeline_service.PipelinePublicServiceStub, + ) @property def hosts(self): return self._hosts @hosts.setter - def hosts(self, hosts: str): + def hosts(self, hosts: Dict[str, InstillInstance]): self._hosts = hosts @property @@ -84,15 +66,15 @@ def metadata(self, metadata: str): self._metadata = metadata def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: pipeline_interface.LivenessResponse = self.hosts[self.instance][ - "client" - ].Liveness(request=pipeline_interface.LivenessRequest()) + resp: pipeline_interface.LivenessResponse = self.hosts[ + self.instance + ].client.Liveness(request=pipeline_interface.LivenessRequest()) return resp.health_check_response.status def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: pipeline_interface.ReadinessResponse = self.hosts[self.instance][ - "client" - ].Readiness(request=pipeline_interface.ReadinessRequest()) + resp: pipeline_interface.ReadinessResponse = self.hosts[ + self.instance + ].client.Readiness(request=pipeline_interface.ReadinessRequest()) return resp.health_check_response.status def is_serving(self) -> bool: @@ -113,14 +95,14 @@ def list_operator_definitions( ) -> Tuple[Iterable, str, int]: resp: operator_interface.ListOperatorDefinitionsResponse = self.hosts[ self.instance - ]["client"].ListUserPipelines( + ].client.ListUserPipelines( request=operator_interface.ListOperatorDefinitionsRequest( filter=filer_str, page_size=total_size, page_token=next_page_token, view=operator_interface.ListOperatorDefinitionsRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.operator_definitions, resp.next_page_token, resp.total_size @@ -131,12 +113,12 @@ def get_operator_definition( ) -> operator_interface.OperatorDefinition: resp: operator_interface.GetOperatorDefinitionResponse = self.hosts[ self.instance - ]["client"].GetOperatorDefinition( + ].client.GetOperatorDefinition( request=operator_interface.GetOperatorDefinitionRequest( name=f"operator-definitions//{name}", view=operator_interface.GetOperatorDefinitionRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.operator_definition @@ -150,51 +132,51 @@ def create_pipeline( id=name, recipe=recipe, ) - resp: pipeline_interface.CreateUserPipelineResponse = self.hosts[self.instance][ - "client" - ].CreateUserPipeline( + resp: pipeline_interface.CreateUserPipelineResponse = self.hosts[ + self.instance + ].client.CreateUserPipeline( request=pipeline_interface.CreateUserPipelineRequest( pipeline=pipeline, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.pipeline @grpc_handler def get_pipeline(self, name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.GetUserPipelineResponse = self.hosts[self.instance][ - "client" - ].GetUserPipeline( + resp: pipeline_interface.GetUserPipelineResponse = self.hosts[ + self.instance + ].client.GetUserPipeline( request=pipeline_interface.GetUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.pipeline @grpc_handler def lookup_pipeline(self, pipeline_uid: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.LookUpPipelineResponse = self.hosts[self.instance][ - "client" - ].LookUpPipeline( + resp: pipeline_interface.LookUpPipelineResponse = self.hosts[ + self.instance + ].client.LookUpPipeline( request=pipeline_interface.LookUpPipelineRequest( permalink=f"pipelines/{pipeline_uid}", view=pipeline_interface.LookUpPipelineRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.pipeline @grpc_handler def rename_pipeline(self, name: str, new_name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.RenameUserPipelineResponse = self.hosts[self.instance][ - "client" - ].RenameUserPipeline( + resp: pipeline_interface.RenameUserPipelineResponse = self.hosts[ + self.instance + ].client.RenameUserPipeline( request=pipeline_interface.RenameUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", new_pipeline_id=new_name, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.pipeline @@ -202,14 +184,14 @@ def rename_pipeline(self, name: str, new_name: str) -> pipeline_interface.Pipeli def update_pipeline( self, pipeline: pipeline_interface.Pipeline, mask: field_mask_pb2.FieldMask ) -> pipeline_interface.Pipeline: - resp: pipeline_interface.UpdateUserPipelineResponse = self.hosts[self.instance][ - "client" - ].UpdateUserPipeline( + resp: pipeline_interface.UpdateUserPipelineResponse = self.hosts[ + self.instance + ].client.UpdateUserPipeline( request=pipeline_interface.UpdateUserPipelineRequest( pipeline=pipeline, update_mask=mask, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.pipeline @@ -217,11 +199,11 @@ def update_pipeline( def validate_pipeline(self, name: str) -> pipeline_interface.Pipeline: resp: pipeline_interface.ValidateUserPipelineResponse = self.hosts[ self.instance - ]["client"].ValidateUserPipeline( + ].client.ValidateUserPipeline( request=pipeline_interface.ValidateUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.pipeline @@ -231,11 +213,25 @@ def trigger_pipeline( ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: resp: pipeline_interface.TriggerUserPipelineResponse = self.hosts[ self.instance - ]["client"].TriggerUserPipeline( + ].client.TriggerUserPipeline( request=pipeline_interface.TriggerUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, + ) + return resp.outputs, resp.metadata + + @grpc_handler + async def trigger_asyncio_pipeline( + self, name: str, inputs: list + ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: + resp: pipeline_interface.TriggerUserPipelineResponse = await self.hosts[ + self.instance + ].async_client.TriggerUserPipeline( + request=pipeline_interface.TriggerUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, ) return resp.outputs, resp.metadata @@ -245,21 +241,21 @@ def trigger_async_pipeline( ) -> operations_pb2.Operation: resp: pipeline_interface.TriggerAsyncUserPipelineResponse = self.hosts[ self.instance - ]["client"].TriggerAsyncUserPipeline( + ].client.TriggerAsyncUserPipeline( request=pipeline_interface.TriggerAsyncUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.operation @grpc_handler def delete_pipeline(self, name: str): - self.hosts[self.instance]["client"].DeleteUserPipeline( + self.hosts[self.instance].client.DeleteUserPipeline( request=pipeline_interface.DeleteUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler @@ -276,7 +272,7 @@ def list_pipelines( pipeline_interface.ListUserPipelinesResponse, ] if not public: - resp = self.hosts[self.instance]["client"].ListUserPipelines( + resp = self.hosts[self.instance].client.ListUserPipelines( request=pipeline_interface.ListUserPipelinesRequest( parent=self.namespace, filter=filer_str, @@ -285,10 +281,10 @@ def list_pipelines( show_deleted=show_deleted, view=pipeline_interface.ListUserPipelinesRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) else: - resp = self.hosts[self.instance]["client"].ListPipelines( + resp = self.hosts[self.instance].client.ListPipelines( request=pipeline_interface.ListPipelinesRequest( filter=filer_str, page_size=total_size, @@ -296,20 +292,20 @@ def list_pipelines( show_deleted=show_deleted, view=pipeline_interface.ListPipelinesRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.pipelines, resp.next_page_token, resp.total_size @grpc_handler def get_operation(self, name: str) -> operations_pb2.Operation: - resp: pipeline_interface.GetOperationResponse = self.hosts[self.instance][ - "client" - ].GetOperation( + resp: pipeline_interface.GetOperationResponse = self.hosts[ + self.instance + ].client.GetOperation( request=pipeline_interface.GetOperationRequest( name=name, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.operation @@ -331,11 +327,11 @@ def create_pipeline_release( ) resp: pipeline_interface.CreateUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].CreateUserPipelineRelease( + ].client.CreateUserPipelineRelease( request=pipeline_interface.CreateUserPipelineReleaseRequest( release=pipeline_release, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.release @@ -351,12 +347,12 @@ def get_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: """ resp: pipeline_interface.GetUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].GetUserPipelineRelease( + ].client.GetUserPipelineRelease( request=pipeline_interface.GetUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", view=pipeline_interface.GetUserPipelineReleaseRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.release @@ -375,12 +371,12 @@ def rename_pipeline_release( """ resp: pipeline_interface.RenameUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].RenameUserPipelineRelease( + ].client.RenameUserPipelineRelease( request=pipeline_interface.RenameUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", new_pipeline_release_id=new_version, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.release @@ -392,12 +388,12 @@ def update_pipeline_release( ) -> pipeline_interface.PipelineRelease: resp: pipeline_interface.UpdateUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].UpdateUserPipelineRelease( + ].client.UpdateUserPipelineRelease( request=pipeline_interface.UpdateUserPipelineReleaseRequest( release=pipeline_release, update_mask=mask, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.release @@ -411,7 +407,7 @@ def list_pipeline_releases( ) -> Tuple[Iterable, str, int]: resp: pipeline_interface.ListUserPipelineReleasesResponse = self.hosts[ self.instance - ]["client"].ListUserPipelineReleases( + ].client.ListUserPipelineReleases( request=pipeline_interface.ListUserPipelineReleasesRequest( parent=self.namespace, filter=filer_str, @@ -420,18 +416,18 @@ def list_pipeline_releases( show_deleted=show_deleted, view=pipeline_interface.ListUserPipelineReleasesRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.releases, resp.next_page_token, resp.total_size @grpc_handler def delete_pipeline_release(self, name: str): - self.hosts[self.instance]["client"].DeleteUserPipelineRelease( + self.hosts[self.instance].client.DeleteUserPipelineRelease( request=pipeline_interface.DeleteUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler @@ -440,11 +436,11 @@ def set_default_pipeline_release( ) -> pipeline_interface.PipelineRelease: resp: pipeline_interface.SetDefaultUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].SetDefaultUserPipelineRelease( + ].client.SetDefaultUserPipelineRelease( request=pipeline_interface.SetDefaultUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.release @@ -452,11 +448,11 @@ def set_default_pipeline_release( def restore_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: resp: pipeline_interface.RestoreUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].RestoreUserPipelineRelease( + ].client.RestoreUserPipelineRelease( request=pipeline_interface.RestoreUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.release @@ -464,11 +460,11 @@ def restore_pipeline_release(self, name: str) -> pipeline_interface.PipelineRele def watch_pipeline_release(self, name: str) -> pipeline_interface.State.ValueType: resp: pipeline_interface.WatchUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].WatchUserPipelineRelease( + ].client.WatchUserPipelineRelease( request=pipeline_interface.WatchUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.state @@ -478,11 +474,11 @@ def trigger_pipeline_release( ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: resp: pipeline_interface.TriggerUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].TriggerUserPipelineReleas( + ].client.TriggerUserPipelineReleas( request=pipeline_interface.TriggerUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.outputs, resp.metadata @@ -492,11 +488,11 @@ def trigger_async_pipeline_release( ) -> operations_pb2.Operation: resp: pipeline_interface.TriggerAsyncUserPipelineReleaseResponse = self.hosts[ self.instance - ]["client"].TriggerAsyncUserPipelineRelease( + ].client.TriggerAsyncUserPipelineRelease( request=pipeline_interface.TriggerAsyncUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.operation @@ -511,11 +507,11 @@ def create_connector( connector.id = name connector.connector_definition_name = definition connector.configuration.update(configuration) - resp = self.hosts[self.instance]["client"].CreateUserConnector( + resp = self.hosts[self.instance].client.CreateUserConnector( request=connector_interface.CreateUserConnectorRequest( connector=connector, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) return resp.connector @@ -523,13 +519,13 @@ def create_connector( @grpc_handler def get_connector(self, name: str) -> connector_interface.Connector: return ( - self.hosts[self.instance]["client"] - .GetUserConnector( + self.hosts[self.instance] + .client.GetUserConnector( request=connector_interface.GetUserConnectorRequest( name=f"{self.namespace}/connectors/{name}", view=connector_interface.GetUserConnectorRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) .connector ) @@ -537,12 +533,12 @@ def get_connector(self, name: str) -> connector_interface.Connector: @grpc_handler def test_connector(self, name: str) -> connector_interface.Connector.State: return ( - self.hosts[self.instance]["client"] - .TestUserConnector( + self.hosts[self.instance] + .client.TestUserConnector( request=connector_interface.TestUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) .state ) @@ -550,12 +546,12 @@ def test_connector(self, name: str) -> connector_interface.Connector.State: @grpc_handler def execute_connector(self, name: str, inputs: list) -> list: return ( - self.hosts[self.instance]["client"] - .ExecuteUserConnector( + self.hosts[self.instance] + .client.ExecuteUserConnector( request=connector_interface.ExecuteUserConnectorRequest( name=f"{self.namespace}/connectors/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) .outputs ) @@ -563,38 +559,38 @@ def execute_connector(self, name: str, inputs: list) -> list: @grpc_handler def watch_connector(self, name: str) -> connector_interface.Connector.State: return ( - self.hosts[self.instance]["client"] - .WatchUserConnector( + self.hosts[self.instance] + .client.WatchUserConnector( request=connector_interface.WatchUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) .state ) @grpc_handler def delete_connector(self, name: str): - self.hosts[self.instance]["client"].DeleteUserConnector( + self.hosts[self.instance].client.DeleteUserConnector( request=connector_interface.DeleteUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) @grpc_handler def list_connectors(self, public=False) -> Tuple[list, str, int]: if not public: - resp = self.hosts[self.instance]["client"].ListUserConnectors( + resp = self.hosts[self.instance].client.ListUserConnectors( request=connector_interface.ListUserConnectorsRequest( parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], + metadata=self.hosts[self.instance].metadata, ) else: - resp = self.hosts[self.instance]["client"].ListConnectors( + resp = self.hosts[self.instance].client.ListConnectors( request=connector_interface.ListConnectorsRequest(), - metadata=(self.hosts[self.instance]["metadata"],), + metadata=(self.hosts[self.instance].metadata,), ) return resp.connectors, resp.next_page_token, resp.total_size From 53fd882f15886b4777cbe569ee4e39c455769384 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 00:20:33 +0800 Subject: [PATCH 02/10] fix(client): fix asyncio channel close await --- instill/clients/client.py | 30 +++++++++++++++++++++--------- instill/clients/instance.py | 22 +++++++++++++--------- instill/clients/mgmt.py | 9 +++++---- instill/clients/model.py | 9 +++++---- instill/clients/pipeline.py | 9 +++++---- 5 files changed, 49 insertions(+), 30 deletions(-) diff --git a/instill/clients/client.py b/instill/clients/client.py index 069f9dc..efb725c 100644 --- a/instill/clients/client.py +++ b/instill/clients/client.py @@ -7,17 +7,21 @@ class InstillClient: - def __init__(self) -> None: - self.mgmt_service = MgmtClient() + def __init__(self, asyncio: bool = False) -> None: + self.mgmt_service = MgmtClient(asyncio=asyncio) if not self.mgmt_service.is_serving(): Logger.w("Instill Core is required") raise NotServingException self.pipeline_service = PipelineClient( - namespace=self.mgmt_service.get_user().name + namespace=self.mgmt_service.get_user().name, + asyncio=asyncio, ) if not self.pipeline_service.is_serving(): Logger.w("Instill VDP is not serving, VDP functionalities will not work") - self.model_service = ModelClient(namespace=self.mgmt_service.get_user().name) + self.model_service = ModelClient( + namespace=self.mgmt_service.get_user().name, + asyncio=asyncio, + ) if not self.model_service.is_serving(): Logger.w( "Instill Model is not serving, Model functionalities will not work" @@ -32,16 +36,24 @@ def close(self): if self.mgmt_service.is_serving(): for host in self.mgmt_service.hosts.values(): host.channel.close() - host.async_channel.close() if self.pipeline_service.is_serving(): for host in self.pipeline_service.hosts.values(): host.channel.close() - host.async_channel.close() if self.model_service.is_serving(): for host in self.model_service.hosts.values(): host.channel.close() - host.async_channel.close() + + async def async_close(self): + if self.mgmt_service.is_serving(): + for host in self.mgmt_service.hosts.values(): + await host.async_channel.close() + if self.pipeline_service.is_serving(): + for host in self.pipeline_service.hosts.values(): + await host.async_channel.close() + if self.model_service.is_serving(): + for host in self.model_service.hosts.values(): + await host.async_channel.close() -def get_client() -> InstillClient: - return InstillClient() +def get_client(asyncio: bool = False) -> InstillClient: + return InstillClient(asyncio=asyncio) diff --git a/instill/clients/instance.py b/instill/clients/instance.py index 81847c7..50d219b 100644 --- a/instill/clients/instance.py +++ b/instill/clients/instance.py @@ -8,33 +8,37 @@ class InstillInstance: - def __init__(self, url: str, token: str, secure: bool, stub): + def __init__(self, stub, url: str, token: str, secure: bool, asyncio: bool): self.token: str = token + self.asyncio: bool = asyncio if not secure: channel = grpc.insecure_channel(url) - async_channel = grpc.aio.insecure_channel(url) self.metadata = ( ( "authorization", f"Bearer {token}", ), ) + if asyncio: + async_channel = grpc.aio.insecure_channel(url) else: ssl_creds = grpc.ssl_channel_credentials() call_creds = grpc.access_token_call_credentials(token) creds = grpc.composite_channel_credentials(ssl_creds, call_creds) channel = grpc.secure_channel(target=url, credentials=creds) - async_channel = grpc.aio.secure_channel(target=url, credentials=creds) self.metadata = "" + if asyncio: + async_channel = grpc.aio.secure_channel(target=url, credentials=creds) self.channel: grpc.Channel = channel - self.async_channel: grpc.Channel = async_channel self.client: Union[ model_service.ModelPublicServiceStub, pipeline_service.PipelinePublicServiceStub, mgmt_service.MgmtPublicServiceStub, ] = stub(channel) - self.async_client: Union[ - model_service.ModelPublicServiceStub, - pipeline_service.PipelinePublicServiceStub, - mgmt_service.MgmtPublicServiceStub, - ] = stub(async_channel) + if asyncio: + self.async_channel: grpc.Channel = async_channel + self.async_client: Union[ + model_service.ModelPublicServiceStub, + pipeline_service.PipelinePublicServiceStub, + mgmt_service.MgmtPublicServiceStub, + ] = stub(async_channel) diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index c235d6a..a774fdb 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -18,7 +18,7 @@ class MgmtClient(Client): - def __init__(self) -> None: + def __init__(self, asyncio: bool) -> None: self.hosts: Dict[str, InstillInstance] = {} if DEFAULT_INSTANCE in global_config.hosts: self.instance = DEFAULT_INSTANCE @@ -30,10 +30,11 @@ def __init__(self) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): self.hosts[instance] = InstillInstance( - config.url, - config.token, - config.secure, mgmt_service.MgmtPublicServiceStub, + url=config.url, + token=config.token, + secure=config.secure, + asyncio=asyncio, ) @property diff --git a/instill/clients/model.py b/instill/clients/model.py index 12551df..6fb949c 100644 --- a/instill/clients/model.py +++ b/instill/clients/model.py @@ -21,7 +21,7 @@ class ModelClient(Client): - def __init__(self, namespace: str) -> None: + def __init__(self, namespace: str, asyncio: bool) -> None: self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace if DEFAULT_INSTANCE in global_config.hosts: @@ -34,10 +34,11 @@ def __init__(self, namespace: str) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): self.hosts[instance] = InstillInstance( - config.url, - config.token, - config.secure, model_service.ModelPublicServiceStub, + url=config.url, + token=config.token, + secure=config.secure, + asyncio=asyncio, ) @property diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index 21423f0..7377e57 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -22,7 +22,7 @@ class PipelineClient(Client): - def __init__(self, namespace: str) -> None: + def __init__(self, namespace: str, asyncio: bool) -> None: self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace if DEFAULT_INSTANCE in global_config.hosts: @@ -35,10 +35,11 @@ def __init__(self, namespace: str) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): self.hosts[instance] = InstillInstance( - config.url, - config.token, - config.secure, pipeline_service.PipelinePublicServiceStub, + url=config.url, + token=config.token, + secure=config.secure, + asyncio=asyncio, ) @property From acae811c8772b9fc5a5f55563955a0ec5a4e4376 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 00:38:06 +0800 Subject: [PATCH 03/10] test(client): adopt new client in tests --- instill/clients/instance.py | 7 ++- instill/clients/mgmt.py | 3 +- instill/clients/model.py | 11 ++-- instill/clients/pipeline.py | 10 +-- instill/tests/test_client.py | 115 ++++++++++++++++++++++++++++------- tests/test_client.py | 115 ++++++++++++++++++++++++++++------- 6 files changed, 200 insertions(+), 61 deletions(-) diff --git a/instill/clients/instance.py b/instill/clients/instance.py index 50d219b..d9b4679 100644 --- a/instill/clients/instance.py +++ b/instill/clients/instance.py @@ -1,14 +1,15 @@ from typing import Union -import grpc +import grpc +import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service -import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service class InstillInstance: def __init__(self, stub, url: str, token: str, secure: bool, asyncio: bool): + self.url: str = url self.token: str = token self.asyncio: bool = asyncio if not secure: @@ -26,7 +27,7 @@ def __init__(self, stub, url: str, token: str, secure: bool, asyncio: bool): call_creds = grpc.access_token_call_credentials(token) creds = grpc.composite_channel_credentials(ssl_creds, call_creds) channel = grpc.secure_channel(target=url, credentials=creds) - self.metadata = "" + self.metadata = (("", ""),) if asyncio: async_channel = grpc.aio.secure_channel(target=url, credentials=creds) self.channel: grpc.Channel = channel diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index a774fdb..21a4dd7 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -1,16 +1,17 @@ # pylint: disable=no-member,wrong-import-position from typing import Dict + import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck # mgmt import instill.protogen.core.mgmt.v1alpha.metric_pb2 as metric_interface import instill.protogen.core.mgmt.v1alpha.mgmt_pb2 as mgmt_interface import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service +from instill.clients.base import Client # common from instill.clients.constant import DEFAULT_INSTANCE from instill.clients.instance import InstillInstance -from instill.clients.base import Client from instill.configuration import global_config from instill.utils.error_handler import grpc_handler diff --git a/instill/clients/model.py b/instill/clients/model.py index 6fb949c..1f77def 100644 --- a/instill/clients/model.py +++ b/instill/clients/model.py @@ -1,21 +1,20 @@ # pylint: disable=no-member,wrong-import-position import time -from typing import Iterable, Tuple, Union, Dict +from typing import Dict, Iterable, Tuple, Union from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 +# common +import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck +import instill.protogen.model.model.v1alpha.model_definition_pb2 as model_definition_interface # model import instill.protogen.model.model.v1alpha.model_pb2 as model_interface import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service -import instill.protogen.model.model.v1alpha.model_definition_pb2 as model_definition_interface - -# common -import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck +from instill.clients.base import Client from instill.clients.constant import DEFAULT_INSTANCE from instill.clients.instance import InstillInstance -from instill.clients.base import Client from instill.configuration import global_config from instill.utils.error_handler import grpc_handler diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index 7377e57..b021392 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -1,20 +1,20 @@ # pylint: disable=no-member,wrong-import-position -from typing import Iterable, Tuple, Union, Dict +from typing import Dict, Iterable, Tuple, Union from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 +# common +import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck + # pipeline import instill.protogen.vdp.pipeline.v1alpha.connector_pb2 as connector_interface import instill.protogen.vdp.pipeline.v1alpha.operator_definition_pb2 as operator_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_pb2 as pipeline_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service - -# common -import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck +from instill.clients.base import Client from instill.clients.constant import DEFAULT_INSTANCE from instill.clients.instance import InstillInstance -from instill.clients.base import Client from instill.configuration import global_config from instill.utils.error_handler import grpc_handler diff --git a/instill/tests/test_client.py b/instill/tests/test_client.py index 8d6d00b..f1ab196 100644 --- a/instill/tests/test_client.py +++ b/instill/tests/test_client.py @@ -1,49 +1,118 @@ # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,no-name-in-module -from collections import defaultdict - +import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service +import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service +import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service from instill.clients import MgmtClient, ModelClient, PipelineClient +from instill.clients.instance import InstillInstance def describe_client(): def describe_instance(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.instance) == "" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", asyncio=False) expect(model_client.instance) == "" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", asyncio=False) expect(pipeline_client.instance) == "" def when_set_correct_type(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) mgmt_client.instance = "staging" expect(mgmt_client.instance) == "staging" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", asyncio=False) model_client.instance = "staging" expect(model_client.instance) == "staging" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", asyncio=False) pipeline_client.instance = "staging" expect(pipeline_client.instance) == "staging" def describe_host(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.hosts) is None - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", asyncio=False) expect(model_client.hosts) is None - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", asyncio=False) expect(pipeline_client.hosts) is None - def when_set_correct_type(expect): - mgmt_client = MgmtClient() - d = defaultdict(dict) # type: ignore - d["test_instance"] = dict({"url": "test_url"}) - mgmt_client.hosts = d - expect(mgmt_client.hosts["test_instance"]["url"]) == "test_url" - model_client = ModelClient(namespace="") - model_client.hosts = d - expect(model_client.hosts["test_instance"]["url"]) == "test_url" - pipeline_client = PipelineClient(namespace="") - pipeline_client.hosts = d - expect(pipeline_client.hosts["test_instance"]["url"]) == "test_url" + def when_set_correct_type_url(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].url) == "test_url" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", asyncio=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].url) == "test_url" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].url) == "test_url" + + def when_set_correct_type_token(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].token) == "token" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", asyncio=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].token) == "token" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].token) == "token" diff --git a/tests/test_client.py b/tests/test_client.py index 8d6d00b..f1ab196 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,49 +1,118 @@ # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,no-name-in-module -from collections import defaultdict - +import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service +import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service +import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service from instill.clients import MgmtClient, ModelClient, PipelineClient +from instill.clients.instance import InstillInstance def describe_client(): def describe_instance(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.instance) == "" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", asyncio=False) expect(model_client.instance) == "" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", asyncio=False) expect(pipeline_client.instance) == "" def when_set_correct_type(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) mgmt_client.instance = "staging" expect(mgmt_client.instance) == "staging" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", asyncio=False) model_client.instance = "staging" expect(model_client.instance) == "staging" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", asyncio=False) pipeline_client.instance = "staging" expect(pipeline_client.instance) == "staging" def describe_host(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.hosts) is None - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", asyncio=False) expect(model_client.hosts) is None - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", asyncio=False) expect(pipeline_client.hosts) is None - def when_set_correct_type(expect): - mgmt_client = MgmtClient() - d = defaultdict(dict) # type: ignore - d["test_instance"] = dict({"url": "test_url"}) - mgmt_client.hosts = d - expect(mgmt_client.hosts["test_instance"]["url"]) == "test_url" - model_client = ModelClient(namespace="") - model_client.hosts = d - expect(model_client.hosts["test_instance"]["url"]) == "test_url" - pipeline_client = PipelineClient(namespace="") - pipeline_client.hosts = d - expect(pipeline_client.hosts["test_instance"]["url"]) == "test_url" + def when_set_correct_type_url(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].url) == "test_url" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", asyncio=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].url) == "test_url" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].url) == "test_url" + + def when_set_correct_type_token(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].token) == "token" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", asyncio=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].token) == "token" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].token) == "token" From 2a3279ae7084c93bff5d1bdd76521e3fea019916 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 03:02:26 +0800 Subject: [PATCH 04/10] chore(client): rename async flag --- instill/clients/client.py | 12 ++++++------ instill/clients/instance.py | 12 ++++++------ instill/clients/mgmt.py | 4 ++-- instill/clients/model.py | 4 ++-- instill/clients/pipeline.py | 4 ++-- instill/tests/test_client.py | 20 ++++++++++---------- tests/test_client.py | 20 ++++++++++---------- 7 files changed, 38 insertions(+), 38 deletions(-) diff --git a/instill/clients/client.py b/instill/clients/client.py index efb725c..ec4e8a5 100644 --- a/instill/clients/client.py +++ b/instill/clients/client.py @@ -7,20 +7,20 @@ class InstillClient: - def __init__(self, asyncio: bool = False) -> None: - self.mgmt_service = MgmtClient(asyncio=asyncio) + def __init__(self, async_enabled: bool = False) -> None: + self.mgmt_service = MgmtClient(async_enabled=async_enabled) if not self.mgmt_service.is_serving(): Logger.w("Instill Core is required") raise NotServingException self.pipeline_service = PipelineClient( namespace=self.mgmt_service.get_user().name, - asyncio=asyncio, + async_enabled=async_enabled, ) if not self.pipeline_service.is_serving(): Logger.w("Instill VDP is not serving, VDP functionalities will not work") self.model_service = ModelClient( namespace=self.mgmt_service.get_user().name, - asyncio=asyncio, + async_enabled=async_enabled, ) if not self.model_service.is_serving(): Logger.w( @@ -55,5 +55,5 @@ async def async_close(self): await host.async_channel.close() -def get_client(asyncio: bool = False) -> InstillClient: - return InstillClient(asyncio=asyncio) +def get_client(async_enabled: bool = False) -> InstillClient: + return InstillClient(async_enabled=async_enabled) diff --git a/instill/clients/instance.py b/instill/clients/instance.py index d9b4679..e4fa072 100644 --- a/instill/clients/instance.py +++ b/instill/clients/instance.py @@ -8,10 +8,11 @@ class InstillInstance: - def __init__(self, stub, url: str, token: str, secure: bool, asyncio: bool): + def __init__(self, stub, url: str, token: str, secure: bool, async_enabled: bool): self.url: str = url self.token: str = token - self.asyncio: bool = asyncio + self.async_enabled: bool = async_enabled + self.metadata: Union[str, tuple] = "" if not secure: channel = grpc.insecure_channel(url) self.metadata = ( @@ -20,15 +21,14 @@ def __init__(self, stub, url: str, token: str, secure: bool, asyncio: bool): f"Bearer {token}", ), ) - if asyncio: + if async_enabled: async_channel = grpc.aio.insecure_channel(url) else: ssl_creds = grpc.ssl_channel_credentials() call_creds = grpc.access_token_call_credentials(token) creds = grpc.composite_channel_credentials(ssl_creds, call_creds) channel = grpc.secure_channel(target=url, credentials=creds) - self.metadata = (("", ""),) - if asyncio: + if async_enabled: async_channel = grpc.aio.secure_channel(target=url, credentials=creds) self.channel: grpc.Channel = channel self.client: Union[ @@ -36,7 +36,7 @@ def __init__(self, stub, url: str, token: str, secure: bool, asyncio: bool): pipeline_service.PipelinePublicServiceStub, mgmt_service.MgmtPublicServiceStub, ] = stub(channel) - if asyncio: + if async_enabled: self.async_channel: grpc.Channel = async_channel self.async_client: Union[ model_service.ModelPublicServiceStub, diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index 21a4dd7..0c140d4 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -19,7 +19,7 @@ class MgmtClient(Client): - def __init__(self, asyncio: bool) -> None: + def __init__(self, async_enabled: bool) -> None: self.hosts: Dict[str, InstillInstance] = {} if DEFAULT_INSTANCE in global_config.hosts: self.instance = DEFAULT_INSTANCE @@ -35,7 +35,7 @@ def __init__(self, asyncio: bool) -> None: url=config.url, token=config.token, secure=config.secure, - asyncio=asyncio, + async_enabled=async_enabled, ) @property diff --git a/instill/clients/model.py b/instill/clients/model.py index 1f77def..96e711b 100644 --- a/instill/clients/model.py +++ b/instill/clients/model.py @@ -20,7 +20,7 @@ class ModelClient(Client): - def __init__(self, namespace: str, asyncio: bool) -> None: + def __init__(self, namespace: str, async_enabled: bool) -> None: self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace if DEFAULT_INSTANCE in global_config.hosts: @@ -37,7 +37,7 @@ def __init__(self, namespace: str, asyncio: bool) -> None: url=config.url, token=config.token, secure=config.secure, - asyncio=asyncio, + async_enabled=async_enabled, ) @property diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index b021392..0dc1471 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -22,7 +22,7 @@ class PipelineClient(Client): - def __init__(self, namespace: str, asyncio: bool) -> None: + def __init__(self, namespace: str, async_enabled: bool) -> None: self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace if DEFAULT_INSTANCE in global_config.hosts: @@ -39,7 +39,7 @@ def __init__(self, namespace: str, asyncio: bool) -> None: url=config.url, token=config.token, secure=config.secure, - asyncio=asyncio, + async_enabled=async_enabled, ) @property diff --git a/instill/tests/test_client.py b/instill/tests/test_client.py index f1ab196..b780d46 100644 --- a/instill/tests/test_client.py +++ b/instill/tests/test_client.py @@ -12,19 +12,19 @@ def describe_instance(): def when_not_set(expect): mgmt_client = MgmtClient(False) expect(mgmt_client.instance) == "" - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.instance) == "" - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.instance) == "" def when_set_correct_type(expect): mgmt_client = MgmtClient(False) mgmt_client.instance = "staging" expect(mgmt_client.instance) == "staging" - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) model_client.instance = "staging" expect(model_client.instance) == "staging" - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.instance = "staging" expect(pipeline_client.instance) == "staging" @@ -32,9 +32,9 @@ def describe_host(): def when_not_set(expect): mgmt_client = MgmtClient(False) expect(mgmt_client.hosts) is None - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.hosts) is None - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.hosts) is None def when_set_correct_type_url(expect): @@ -60,7 +60,7 @@ def when_set_correct_type_url(expect): False, ) } - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) model_client.hosts = model_instance expect(model_client.hosts["test_instance"].url) == "test_url" @@ -73,7 +73,7 @@ def when_set_correct_type_url(expect): False, ) } - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.hosts = pipeline_instance expect(pipeline_client.hosts["test_instance"].url) == "test_url" @@ -100,7 +100,7 @@ def when_set_correct_type_token(expect): False, ) } - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) model_client.hosts = model_instance expect(model_client.hosts["test_instance"].token) == "token" @@ -113,6 +113,6 @@ def when_set_correct_type_token(expect): False, ) } - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.hosts = pipeline_instance expect(pipeline_client.hosts["test_instance"].token) == "token" diff --git a/tests/test_client.py b/tests/test_client.py index f1ab196..b780d46 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -12,19 +12,19 @@ def describe_instance(): def when_not_set(expect): mgmt_client = MgmtClient(False) expect(mgmt_client.instance) == "" - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.instance) == "" - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.instance) == "" def when_set_correct_type(expect): mgmt_client = MgmtClient(False) mgmt_client.instance = "staging" expect(mgmt_client.instance) == "staging" - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) model_client.instance = "staging" expect(model_client.instance) == "staging" - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.instance = "staging" expect(pipeline_client.instance) == "staging" @@ -32,9 +32,9 @@ def describe_host(): def when_not_set(expect): mgmt_client = MgmtClient(False) expect(mgmt_client.hosts) is None - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.hosts) is None - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.hosts) is None def when_set_correct_type_url(expect): @@ -60,7 +60,7 @@ def when_set_correct_type_url(expect): False, ) } - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) model_client.hosts = model_instance expect(model_client.hosts["test_instance"].url) == "test_url" @@ -73,7 +73,7 @@ def when_set_correct_type_url(expect): False, ) } - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.hosts = pipeline_instance expect(pipeline_client.hosts["test_instance"].url) == "test_url" @@ -100,7 +100,7 @@ def when_set_correct_type_token(expect): False, ) } - model_client = ModelClient(namespace="", asyncio=False) + model_client = ModelClient(namespace="", async_enabled=False) model_client.hosts = model_instance expect(model_client.hosts["test_instance"].token) == "token" @@ -113,6 +113,6 @@ def when_set_correct_type_token(expect): False, ) } - pipeline_client = PipelineClient(namespace="", asyncio=False) + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.hosts = pipeline_instance expect(pipeline_client.hosts["test_instance"].token) == "token" From 5d36a85bebf8cc977fa278e924660d2ab4fbadbd Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 03:59:13 +0800 Subject: [PATCH 05/10] feat(pipeline): adopt async request factory --- instill/clients/base.py | 21 + instill/clients/pipeline.py | 827 ++++++++++++++++++++++++---------- instill/resources/pipeline.py | 17 +- 3 files changed, 622 insertions(+), 243 deletions(-) diff --git a/instill/clients/base.py b/instill/clients/base.py index 7c3ea62..9a244a1 100644 --- a/instill/clients/base.py +++ b/instill/clients/base.py @@ -1,5 +1,8 @@ from abc import ABC, abstractmethod +import grpc +import google.protobuf.message + class Client(ABC): """Base interface class for creating mgmt/pipeline/connector/model clients. @@ -49,3 +52,21 @@ def readiness(self): @abstractmethod def is_serving(self): raise NotImplementedError + + +class RequestFactory: + def __init__( + self, + method: grpc.UnaryUnaryMultiCallable, + request: google.protobuf.message.Message, + metadata, + ) -> None: + self.method = method + self.request = request + self.metadata = metadata + + def send_sync(self): + return self.method(request=self.request, metadata=self.metadata) + + async def send_async(self): + return await self.method(request=self.request, metadata=self.metadata) diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index 0dc1471..8f598d0 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -1,7 +1,6 @@ # pylint: disable=no-member,wrong-import-position -from typing import Dict, Iterable, Tuple, Union +from typing import Dict, Union -from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 # common @@ -12,7 +11,7 @@ import instill.protogen.vdp.pipeline.v1alpha.operator_definition_pb2 as operator_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_pb2 as pipeline_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service -from instill.clients.base import Client +from instill.clients.base import Client, RequestFactory from instill.clients.constant import DEFAULT_INSTANCE from instill.clients.instance import InstillInstance from instill.configuration import global_config @@ -66,22 +65,38 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: pipeline_interface.LivenessResponse = self.hosts[ - self.instance - ].client.Liveness(request=pipeline_interface.LivenessRequest()) - return resp.health_check_response.status + def liveness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Liveness, + request=pipeline_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Liveness, + request=pipeline_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() - def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: pipeline_interface.ReadinessResponse = self.hosts[ - self.instance - ].client.Readiness(request=pipeline_interface.ReadinessRequest()) - return resp.health_check_response.status + def readiness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Readiness, + request=pipeline_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Readiness, + request=pipeline_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() def is_serving(self) -> bool: try: return ( - self.readiness() + self.readiness().status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: @@ -90,174 +105,278 @@ def is_serving(self) -> bool: @grpc_handler def list_operator_definitions( self, - filer_str: str = "", + filter_str: str = "", next_page_token: str = "", total_size: int = 100, - ) -> Tuple[Iterable, str, int]: - resp: operator_interface.ListOperatorDefinitionsResponse = self.hosts[ - self.instance - ].client.ListUserPipelines( + async_enabled: bool = False, + ) -> operator_interface.ListOperatorDefinitionsResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ListOperatorDefinitions, + request=operator_interface.ListOperatorDefinitionsRequest( + filter=filter_str, + page_size=total_size, + page_token=next_page_token, + view=operator_interface.ListOperatorDefinitionsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListOperatorDefinitions, request=operator_interface.ListOperatorDefinitionsRequest( - filter=filer_str, + filter=filter_str, page_size=total_size, page_token=next_page_token, view=operator_interface.ListOperatorDefinitionsRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) - - return resp.operator_definitions, resp.next_page_token, resp.total_size + ).send_sync() @grpc_handler def get_operator_definition( - self, name: str - ) -> operator_interface.OperatorDefinition: - resp: operator_interface.GetOperatorDefinitionResponse = self.hosts[ - self.instance - ].client.GetOperatorDefinition( + self, name: str, async_enabled: bool = False + ) -> operator_interface.GetOperatorDefinitionResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetOperatorDefinition, + request=operator_interface.GetOperatorDefinitionRequest( + name=f"operator-definitions//{name}", + view=operator_interface.GetOperatorDefinitionRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetOperatorDefinition, request=operator_interface.GetOperatorDefinitionRequest( name=f"operator-definitions//{name}", view=operator_interface.GetOperatorDefinitionRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.operator_definition + ).send_sync() @grpc_handler def create_pipeline( self, name: str, recipe: pipeline_interface.Recipe, - ) -> pipeline_interface.Pipeline: + async_enabled: bool = False, + ) -> pipeline_interface.CreateUserPipelineResponse: pipeline = pipeline_interface.Pipeline( id=name, recipe=recipe, ) - resp: pipeline_interface.CreateUserPipelineResponse = self.hosts[ - self.instance - ].client.CreateUserPipeline( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateuserPipeline, + request=pipeline_interface.CreateUserPipelineRequest( + pipeline=pipeline, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserPipeline, request=pipeline_interface.CreateUserPipelineRequest( pipeline=pipeline, parent=self.namespace ), metadata=self.hosts[self.instance].metadata, - ) - return resp.pipeline + ).send_sync() @grpc_handler - def get_pipeline(self, name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.GetUserPipelineResponse = self.hosts[ - self.instance - ].client.GetUserPipeline( + def get_pipeline( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.GetUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserPipeline, + request=pipeline_interface.GetUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserPipeline, request=pipeline_interface.GetUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), metadata=self.hosts[self.instance].metadata, - ) - return resp.pipeline + ).send_sync() @grpc_handler - def lookup_pipeline(self, pipeline_uid: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.LookUpPipelineResponse = self.hosts[ - self.instance - ].client.LookUpPipeline( + def lookup_pipeline( + self, + pipeline_uid: str, + async_enabled: bool = False, + ) -> pipeline_interface.LookUpPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.LookUpPipeline, + request=pipeline_interface.LookUpPipelineRequest( + permalink=f"pipelines/{pipeline_uid}", + view=pipeline_interface.LookUpPipelineRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.LookUpPipeline, request=pipeline_interface.LookUpPipelineRequest( permalink=f"pipelines/{pipeline_uid}", view=pipeline_interface.LookUpPipelineRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.pipeline + ).send_sync() @grpc_handler - def rename_pipeline(self, name: str, new_name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.RenameUserPipelineResponse = self.hosts[ - self.instance - ].client.RenameUserPipeline( + def rename_pipeline( + self, + name: str, + new_name: str, + async_enabled: bool = False, + ) -> pipeline_interface.RenameUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.RenameUserPipeline, + request=pipeline_interface.RenameUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}", + new_pipeline_id=new_name, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.RenameUserPipeline, request=pipeline_interface.RenameUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", new_pipeline_id=new_name, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.pipeline + ).send_sync() @grpc_handler def update_pipeline( - self, pipeline: pipeline_interface.Pipeline, mask: field_mask_pb2.FieldMask - ) -> pipeline_interface.Pipeline: - resp: pipeline_interface.UpdateUserPipelineResponse = self.hosts[ - self.instance - ].client.UpdateUserPipeline( + self, + pipeline: pipeline_interface.Pipeline, + mask: field_mask_pb2.FieldMask, + async_enabled: bool = False, + ) -> pipeline_interface.UpdateUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UpdateUserPipeline, + request=pipeline_interface.UpdateUserPipelineRequest( + pipeline=pipeline, + update_mask=mask, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UpdateUserPipeline, request=pipeline_interface.UpdateUserPipelineRequest( pipeline=pipeline, update_mask=mask, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.pipeline + ).send_sync() @grpc_handler - def validate_pipeline(self, name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.ValidateUserPipelineResponse = self.hosts[ - self.instance - ].client.ValidateUserPipeline( + def validate_pipeline( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.ValidateUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ValidateUserPipeline, + request=pipeline_interface.ValidateUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ValidateUserPipeline, request=pipeline_interface.ValidateUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), metadata=self.hosts[self.instance].metadata, - ) - return resp.pipeline + ).send_sync() @grpc_handler def trigger_pipeline( - self, name: str, inputs: list - ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: - resp: pipeline_interface.TriggerUserPipelineResponse = self.hosts[ - self.instance - ].client.TriggerUserPipeline( - request=pipeline_interface.TriggerUserPipelineRequest( - name=f"{self.namespace}/pipelines/{name}", inputs=inputs - ), - metadata=self.hosts[self.instance].metadata, - ) - return resp.outputs, resp.metadata + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerUserPipeline, + request=pipeline_interface.TriggerUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() - @grpc_handler - async def trigger_asyncio_pipeline( - self, name: str, inputs: list - ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: - resp: pipeline_interface.TriggerUserPipelineResponse = await self.hosts[ - self.instance - ].async_client.TriggerUserPipeline( + return RequestFactory( + method=self.hosts[self.instance].client.TriggerUserPipeline, request=pipeline_interface.TriggerUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), metadata=self.hosts[self.instance].metadata, - ) - return resp.outputs, resp.metadata + ).send_sync() @grpc_handler def trigger_async_pipeline( - self, name: str, inputs: list - ) -> operations_pb2.Operation: - resp: pipeline_interface.TriggerAsyncUserPipelineResponse = self.hosts[ - self.instance - ].client.TriggerAsyncUserPipeline( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerAsyncUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerAsyncUserPipeline, + request=pipeline_interface.TriggerAsyncUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerAsyncUserPipeline, request=pipeline_interface.TriggerAsyncUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), metadata=self.hosts[self.instance].metadata, - ) - return resp.operation + ).send_sync() @grpc_handler - def delete_pipeline(self, name: str): - self.hosts[self.instance].client.DeleteUserPipeline( + def delete_pipeline( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.DeleteUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserPipeline, + request=pipeline_interface.DeleteUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserPipeline, request=pipeline_interface.DeleteUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler def list_pipelines( @@ -267,13 +386,18 @@ def list_pipelines( total_size: int = 100, show_deleted: bool = False, public=False, - ) -> Tuple[Iterable, str, int]: - resp: Union[ - pipeline_interface.ListPipelinesResponse, - pipeline_interface.ListUserPipelinesResponse, - ] - if not public: - resp = self.hosts[self.instance].client.ListUserPipelines( + async_enabled: bool = False, + ) -> Union[ + pipeline_interface.ListPipelinesResponse, + pipeline_interface.ListUserPipelinesResponse, + ]: + if async_enabled: + if public: + method = self.hosts[self.instance].async_client.ListPipelines + else: + method = self.hosts[self.instance].async_client.ListUserPipelines + return RequestFactory( + method=method, request=pipeline_interface.ListUserPipelinesRequest( parent=self.namespace, filter=filer_str, @@ -283,38 +407,53 @@ def list_pipelines( view=pipeline_interface.ListUserPipelinesRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) + ).send_async() + if public: + method = self.hosts[self.instance].client.ListPipelines else: - resp = self.hosts[self.instance].client.ListPipelines( - request=pipeline_interface.ListPipelinesRequest( - filter=filer_str, - page_size=total_size, - page_token=next_page_token, - show_deleted=show_deleted, - view=pipeline_interface.ListPipelinesRequest.VIEW_FULL, + method = self.hosts[self.instance].client.ListUserPipelines + return RequestFactory( + method=method, + request=pipeline_interface.ListUserPipelinesRequest( + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=pipeline_interface.ListUserPipelinesRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + + @grpc_handler + def get_operation( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.GetOperationResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetOperation, + request=pipeline_interface.GetOperationRequest( + name=name, ), metadata=self.hosts[self.instance].metadata, - ) - - return resp.pipelines, resp.next_page_token, resp.total_size + ).send_async() - @grpc_handler - def get_operation(self, name: str) -> operations_pb2.Operation: - resp: pipeline_interface.GetOperationResponse = self.hosts[ - self.instance - ].client.GetOperation( + return RequestFactory( + method=self.hosts[self.instance].client.GetOperation, request=pipeline_interface.GetOperationRequest( name=name, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.operation + ).send_sync() @grpc_handler def create_pipeline_release( self, version: str, - ) -> pipeline_interface.PipelineRelease: + async_enabled: bool = False, + ) -> pipeline_interface.CreateUserPipelineReleaseResponse: """Create a release version of a pipeline Args: @@ -326,18 +465,29 @@ def create_pipeline_release( pipeline_release = pipeline_interface.PipelineRelease( id=version, ) - resp: pipeline_interface.CreateUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.CreateUserPipelineRelease( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateUserPipelineRelease, + request=pipeline_interface.CreateUserPipelineReleaseRequest( + release=pipeline_release, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserPipelineRelease, request=pipeline_interface.CreateUserPipelineReleaseRequest( release=pipeline_release, parent=self.namespace ), metadata=self.hosts[self.instance].metadata, - ) - return resp.release + ).send_sync() @grpc_handler - def get_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: + def get_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.GetUserPipelineReleaseResponse: """Get a released pipeline Args: @@ -346,21 +496,32 @@ def get_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: Returns: pipeline_interface.Pipeline: Released pipeline """ - resp: pipeline_interface.GetUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.GetUserPipelineRelease( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserPipelineRelease, + request=pipeline_interface.GetUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + view=pipeline_interface.GetUserPipelineReleaseRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserPipelineRelease, request=pipeline_interface.GetUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", view=pipeline_interface.GetUserPipelineReleaseRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.release + ).send_sync() @grpc_handler def rename_pipeline_release( - self, name: str, new_version: str - ) -> pipeline_interface.PipelineRelease: + self, + name: str, + new_version: str, + async_enabled: bool = False, + ) -> pipeline_interface.RenameUserPipelineReleaseResponse: """Rename a released pipeline Args: @@ -370,33 +531,50 @@ def rename_pipeline_release( Returns: pipeline_interface.PipelineRelease: released pipeline """ - resp: pipeline_interface.RenameUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.RenameUserPipelineRelease( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.RenameUserPipelineRelease, + request=pipeline_interface.RenameUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + new_pipeline_release_id=new_version, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.RenameUserPipelineRelease, request=pipeline_interface.RenameUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", new_pipeline_release_id=new_version, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.release + ).send_sync() @grpc_handler def update_pipeline_release( self, pipeline_release: pipeline_interface.PipelineRelease, mask: field_mask_pb2.FieldMask, - ) -> pipeline_interface.PipelineRelease: - resp: pipeline_interface.UpdateUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.UpdateUserPipelineRelease( + async_enabled: bool = False, + ) -> pipeline_interface.UpdateUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UpdateUserPipelineRelease, + request=pipeline_interface.UpdateUserPipelineReleaseRequest( + release=pipeline_release, + update_mask=mask, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UpdateUserPipelineRelease, request=pipeline_interface.UpdateUserPipelineReleaseRequest( release=pipeline_release, update_mask=mask, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.release + ).send_sync() @grpc_handler def list_pipeline_releases( @@ -405,10 +583,24 @@ def list_pipeline_releases( next_page_token: str = "", total_size: int = 100, show_deleted: bool = False, - ) -> Tuple[Iterable, str, int]: - resp: pipeline_interface.ListUserPipelineReleasesResponse = self.hosts[ - self.instance - ].client.ListUserPipelineReleases( + async_enabled: bool = False, + ) -> pipeline_interface.ListUserPipelineReleasesResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ListUserPipelineReleases, + request=pipeline_interface.ListUserPipelineReleasesRequest( + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=pipeline_interface.ListUserPipelineReleasesRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListUserPipelineReleases, request=pipeline_interface.ListUserPipelineReleasesRequest( parent=self.namespace, filter=filer_str, @@ -418,84 +610,153 @@ def list_pipeline_releases( view=pipeline_interface.ListUserPipelineReleasesRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) - - return resp.releases, resp.next_page_token, resp.total_size + ).send_sync() @grpc_handler - def delete_pipeline_release(self, name: str): - self.hosts[self.instance].client.DeleteUserPipelineRelease( + def delete_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.DeleteUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserPipelineRelease, + request=pipeline_interface.DeleteUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserPipelineRelease, request=pipeline_interface.DeleteUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}" ), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler def set_default_pipeline_release( - self, name: str - ) -> pipeline_interface.PipelineRelease: - resp: pipeline_interface.SetDefaultUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.SetDefaultUserPipelineRelease( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.SetDefaultUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.SetDefaultUserPipelineRelease, + request=pipeline_interface.SetDefaultUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.SetDefaultUserPipelineRelease, request=pipeline_interface.SetDefaultUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), metadata=self.hosts[self.instance].metadata, - ) - return resp.release + ).send_sync() @grpc_handler - def restore_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: - resp: pipeline_interface.RestoreUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.RestoreUserPipelineRelease( + def restore_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.RestoreUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.RestoreUserPipelineRelease, + request=pipeline_interface.RestoreUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.RestoreUserPipelineRelease, request=pipeline_interface.RestoreUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), metadata=self.hosts[self.instance].metadata, - ) - return resp.release + ).send_sync() @grpc_handler - def watch_pipeline_release(self, name: str) -> pipeline_interface.State.ValueType: - resp: pipeline_interface.WatchUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.WatchUserPipelineRelease( + def watch_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.WatchUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.WatchUserPipelineRelease, + request=pipeline_interface.WatchUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.WatchUserPipelineRelease, request=pipeline_interface.WatchUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), metadata=self.hosts[self.instance].metadata, - ) - return resp.state + ).send_sync() @grpc_handler def trigger_pipeline_release( - self, name: str, inputs: list - ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: - resp: pipeline_interface.TriggerUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.TriggerUserPipelineReleas( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerUserPipelineReleas, + request=pipeline_interface.TriggerUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerUserPipelineReleas, request=pipeline_interface.TriggerUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), metadata=self.hosts[self.instance].metadata, - ) - return resp.outputs, resp.metadata + ).send_sync() @grpc_handler def trigger_async_pipeline_release( - self, name: str, inputs: list - ) -> operations_pb2.Operation: - resp: pipeline_interface.TriggerAsyncUserPipelineReleaseResponse = self.hosts[ - self.instance - ].client.TriggerAsyncUserPipelineRelease( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerAsyncUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.TriggerAsyncUserPipelineRelease, + request=pipeline_interface.TriggerAsyncUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerAsyncUserPipelineRelease, request=pipeline_interface.TriggerAsyncUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), metadata=self.hosts[self.instance].metadata, - ) - return resp.operation + ).send_sync() @grpc_handler def create_connector( @@ -503,95 +764,187 @@ def create_connector( name: str, definition: str, configuration: dict, - ) -> connector_interface.Connector: + async_enabled: bool = False, + ) -> connector_interface.CreateUserConnectorResponse: connector = connector_interface.Connector() connector.id = name connector.connector_definition_name = definition connector.configuration.update(configuration) - resp = self.hosts[self.instance].client.CreateUserConnector( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateUserConnector, + request=connector_interface.CreateUserConnectorRequest( + connector=connector, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserConnector, request=connector_interface.CreateUserConnectorRequest( connector=connector, parent=self.namespace ), metadata=self.hosts[self.instance].metadata, - ) - - return resp.connector + ).send_sync() @grpc_handler - def get_connector(self, name: str) -> connector_interface.Connector: - return ( - self.hosts[self.instance] - .client.GetUserConnector( + def get_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.GetUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserConnector, request=connector_interface.GetUserConnectorRequest( name=f"{self.namespace}/connectors/{name}", view=connector_interface.GetUserConnectorRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) - .connector - ) + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserConnector, + request=connector_interface.GetUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}", + view=connector_interface.GetUserConnectorRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def test_connector(self, name: str) -> connector_interface.Connector.State: - return ( - self.hosts[self.instance] - .client.TestUserConnector( + def test_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.TestUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TestUserConnector, request=connector_interface.TestUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), metadata=self.hosts[self.instance].metadata, - ) - .state - ) + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TestUserConnector, + request=connector_interface.TestUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def execute_connector(self, name: str, inputs: list) -> list: - return ( - self.hosts[self.instance] - .client.ExecuteUserConnector( + def execute_connector( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> connector_interface.ExecuteUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ExecuteUserConnector, request=connector_interface.ExecuteUserConnectorRequest( name=f"{self.namespace}/connectors/{name}", inputs=inputs ), metadata=self.hosts[self.instance].metadata, - ) - .outputs - ) + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ExecuteUserConnector, + request=connector_interface.ExecuteUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def watch_connector(self, name: str) -> connector_interface.Connector.State: - return ( - self.hosts[self.instance] - .client.WatchUserConnector( + def watch_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.WatchUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.WatchUserConnector, request=connector_interface.WatchUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), metadata=self.hosts[self.instance].metadata, - ) - .state - ) + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.WatchUserConnector, + request=connector_interface.WatchUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def delete_connector(self, name: str): - self.hosts[self.instance].client.DeleteUserConnector( + def delete_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.DeleteUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserConnector, + request=connector_interface.DeleteUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserConnector, request=connector_interface.DeleteUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler - def list_connectors(self, public=False) -> Tuple[list, str, int]: - if not public: - resp = self.hosts[self.instance].client.ListUserConnectors( + def list_connectors( + self, + filer_str: str = "", + next_page_token: str = "", + total_size: int = 100, + show_deleted: bool = False, + public=False, + async_enabled: bool = False, + ) -> connector_interface.ListUserConnectorsResponse: + if async_enabled: + if public: + method = self.hosts[self.instance].async_client.ListConnectors + else: + method = self.hosts[self.instance].async_client.ListUserConnectors + return RequestFactory( + method=method, request=connector_interface.ListUserConnectorsRequest( - parent=self.namespace + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListUserConnectorsRequest.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) + ).send_async() + if public: + method = self.hosts[self.instance].client.ListConnectors else: - resp = self.hosts[self.instance].client.ListConnectors( - request=connector_interface.ListConnectorsRequest(), - metadata=(self.hosts[self.instance].metadata,), - ) - - return resp.connectors, resp.next_page_token, resp.total_size + method = self.hosts[self.instance].client.ListUserConnectors + return RequestFactory( + method=method, + request=connector_interface.ListUserConnectorsRequest( + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListUserConnectorsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() diff --git a/instill/resources/pipeline.py b/instill/resources/pipeline.py index 34b7084..35b9a1a 100644 --- a/instill/resources/pipeline.py +++ b/instill/resources/pipeline.py @@ -17,9 +17,11 @@ def __init__( ) -> None: super().__init__() self.client = client - pipeline = client.pipeline_service.get_pipeline(name=name, silent=True) + pipeline = client.pipeline_service.get_pipeline(name=name, silent=True).pipeline if pipeline is None: - pipeline = client.pipeline_service.create_pipeline(name=name, recipe=recipe) + pipeline = client.pipeline_service.create_pipeline( + name=name, recipe=recipe + ).pipeline if pipeline is None: raise BaseException("pipeline creation failed") @@ -28,9 +30,10 @@ def __init__( def __call__( self, task_inputs: list ) -> Tuple[list, pipeline_interface.TriggerMetadata]: - return self.client.pipeline_service.trigger_pipeline( + resp = self.client.pipeline_service.trigger_pipeline( self.resource.id, task_inputs ) + return resp.outputs, resp.metadata @property def client(self): @@ -52,18 +55,20 @@ def _update(self): self.resource = self.client.pipeline_service.get_pipeline(name=self.resource.id) def get_operation(self, operation: operations_pb2.Operation): - return self.client.pipeline_service.get_operation(operation.name) + return self.client.pipeline_service.get_operation(operation.name).operation def trigger_async(self, task_inputs: list) -> operations_pb2.Operation: return self.client.pipeline_service.trigger_async_pipeline( self.resource.id, task_inputs - ) + ).operation def get_recipe(self) -> str: return self.resource.recipe def validate_pipeline(self) -> pipeline_interface.Pipeline: - return self.client.pipeline_service.validate_pipeline(name=self.resource.id) + return self.client.pipeline_service.validate_pipeline( + name=self.resource.id + ).pipeline def delete(self): if self.resource is not None: From 6c26a64a9882d3bd3fc737a86a2418f80f82717b Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 04:10:19 +0800 Subject: [PATCH 06/10] feat(mgmt): adopt async request factory --- instill/clients/mgmt.py | 196 +++++++++++++++++++++++++++++++--------- 1 file changed, 155 insertions(+), 41 deletions(-) diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index 0c140d4..ab8a606 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -7,9 +7,9 @@ import instill.protogen.core.mgmt.v1alpha.metric_pb2 as metric_interface import instill.protogen.core.mgmt.v1alpha.mgmt_pb2 as mgmt_interface import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service -from instill.clients.base import Client # common +from instill.clients.base import Client, RequestFactory from instill.clients.constant import DEFAULT_INSTANCE from instill.clients.instance import InstillInstance from instill.configuration import global_config @@ -62,106 +62,220 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: mgmt_interface.LivenessResponse = self.hosts[ - self.instance - ].client.Liveness(request=mgmt_interface.LivenessRequest()) - return resp.health_check_response.status + def liveness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Liveness, + request=mgmt_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() - def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: mgmt_interface.ReadinessResponse = self.hosts[ - self.instance - ].client.Readiness(request=mgmt_interface.ReadinessRequest()) - return resp.health_check_response.status + return RequestFactory( + method=self.hosts[self.instance].client.Liveness, + request=mgmt_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + + def readiness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Liveness, + request=mgmt_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Liveness, + request=mgmt_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() def is_serving(self) -> bool: try: return ( - self.readiness() + self.readiness().status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: return False @grpc_handler - def login(self, username="admin", password="password") -> str: - resp: mgmt_interface.AuthLoginResponse = self.hosts[ - self.instance - ].client.AuthLogin( + def login( + self, + username="admin", + password="password", + async_enabled: bool = False, + ) -> mgmt_interface.AuthLoginResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.AuthLogin, + request=mgmt_interface.AuthLoginRequest( + username=username, password=password + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.AuthLogin, request=mgmt_interface.AuthLoginRequest( username=username, password=password - ) - ) - return resp.access_token + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_token(self, name: str) -> mgmt_interface.ApiToken: - resp: mgmt_interface.GetTokenResponse = self.hosts[ - self.instance - ].client.GetToken( + def get_token( + self, + name: str, + async_enabled: bool = False, + ) -> mgmt_interface.GetTokenResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetToken, + request=mgmt_interface.GetTokenRequest(name=name), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetToken, request=mgmt_interface.GetTokenRequest(name=name), metadata=self.hosts[self.instance].metadata, - ) - return resp.token + ).send_sync() @grpc_handler - def get_user(self) -> mgmt_interface.User: - resp: mgmt_interface.GetUserResponse = self.hosts[self.instance].client.GetUser( + def get_user( + self, + async_enabled: bool = False, + ) -> mgmt_interface.GetUserResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUser, + request=mgmt_interface.GetUserRequest(name="users/me"), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUser, request=mgmt_interface.GetUserRequest(name="users/me"), metadata=self.hosts[self.instance].metadata, - ) - return resp.user + ).send_sync() @grpc_handler def list_pipeline_trigger_records( self, + async_enabled: bool = False, ) -> metric_interface.ListPipelineTriggerRecordsResponse: - return self.hosts[self.instance].client.ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListPipelineTriggerRecords, + request=metric_interface.ListPipelineTriggerChartRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListPipelineTriggerRecords, request=metric_interface.ListPipelineTriggerChartRecordsRequest(), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler def list_pipeline_trigger_table_records( self, + async_enabled: bool = False, ) -> metric_interface.ListPipelineTriggerTableRecordsRequest: - return self.hosts[self.instance].client.ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListPipelineTriggerTableRecords, + request=metric_interface.ListPipelineTriggerTableRecordsResponse(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListPipelineTriggerTableRecords, request=metric_interface.ListPipelineTriggerTableRecordsResponse(), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler def list_pipeline_trigger_chart_records( self, + async_enabled: bool = False, ) -> metric_interface.ListPipelineTriggerChartRecordsResponse: - return self.hosts[self.instance].client.ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListPipelineTriggerChartRecords, + request=metric_interface.ListPipelineTriggerChartRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListPipelineTriggerChartRecords, request=metric_interface.ListPipelineTriggerChartRecordsRequest(), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler def list_connector_execute_records( self, + async_enabled: bool = False, ) -> metric_interface.ListConnectorExecuteRecordsResponse: - return self.hosts[self.instance].client.ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListConnectorExecuteRecords, + request=metric_interface.ListConnectorExecuteRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListConnectorExecuteRecords, request=metric_interface.ListConnectorExecuteRecordsRequest(), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler def list_connector_execute_table_records( self, + async_enabled: bool = False, ) -> metric_interface.ListConnectorExecuteTableRecordsResponse: - return self.hosts[self.instance].client.ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListConnectorExecuteTableRecords, + request=metric_interface.ListConnectorExecuteTableRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListConnectorExecuteTableRecords, request=metric_interface.ListConnectorExecuteTableRecordsRequest(), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler def list_connector_execute_chart_records( self, + async_enabled: bool = False, ) -> metric_interface.ListConnectorExecuteChartRecordsResponse: - return self.hosts[self.instance].client.ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListConnectorExecuteChartRecords, + request=metric_interface.ListConnectorExecuteChartRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListConnectorExecuteChartRecords, request=metric_interface.ListConnectorExecuteChartRecordsRequest(), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() From ef899de198b248adc64232071508ec542c185109 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 04:14:02 +0800 Subject: [PATCH 07/10] chore(connector): update response value --- instill/resources/connector.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/instill/resources/connector.py b/instill/resources/connector.py index ba3baba..c377a37 100644 --- a/instill/resources/connector.py +++ b/instill/resources/connector.py @@ -16,26 +16,29 @@ def __init__( ) -> None: super().__init__() self.client = client - connector = client.pipeline_service.get_connector(name=name, silent=True) + connector = client.pipeline_service.get_connector( + name=name, silent=True + ).connector if connector is None: connector = client.pipeline_service.create_connector( name=name, definition=definition, configuration=configuration, - ) + ).connector if connector is None: raise BaseException("connector creation failed") self.resource = connector - def __call__(self, task_inputs: list, mode="execute") -> list: + def __call__(self, task_inputs: list, mode="execute"): if mode == "execute": - return self.client.pipeline_service.execute_connector( + resp = self.client.pipeline_service.execute_connector( self.resource.id, task_inputs ) + return resp.outputs return self.client.pipeline_service.test_connector( self.resource.id, task_inputs - ) + ).state @property def client(self): @@ -65,10 +68,10 @@ def get_definition(self) -> connector_definition_interface.ConnectorDefinition: return self.resource.connector_definition def get_state(self) -> connector_interface.Connector.State: - return self.client.pipeline_service.watch_connector(self.resource.id) + return self.client.pipeline_service.watch_connector(self.resource.id).state def test(self) -> connector_interface.Connector.State: - return self.client.pipeline_service.test_connector(self.resource.id) + return self.client.pipeline_service.test_connector(self.resource.id).state def delete(self): if self.resource is not None: From 9fcbfb3cafc09aaa0a8a3c2a9ea8d0e1b2298e55 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 04:37:13 +0800 Subject: [PATCH 08/10] chore: fix is serving response --- instill/clients/base.py | 2 +- instill/clients/client.py | 4 ++-- instill/clients/mgmt.py | 10 +++++----- instill/clients/pipeline.py | 10 +++++++--- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/instill/clients/base.py b/instill/clients/base.py index 9a244a1..3c54dd6 100644 --- a/instill/clients/base.py +++ b/instill/clients/base.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod -import grpc import google.protobuf.message +import grpc class Client(ABC): diff --git a/instill/clients/client.py b/instill/clients/client.py index ec4e8a5..c90f2ef 100644 --- a/instill/clients/client.py +++ b/instill/clients/client.py @@ -13,13 +13,13 @@ def __init__(self, async_enabled: bool = False) -> None: Logger.w("Instill Core is required") raise NotServingException self.pipeline_service = PipelineClient( - namespace=self.mgmt_service.get_user().name, + namespace=self.mgmt_service.get_user().user.name, async_enabled=async_enabled, ) if not self.pipeline_service.is_serving(): Logger.w("Instill VDP is not serving, VDP functionalities will not work") self.model_service = ModelClient( - namespace=self.mgmt_service.get_user().name, + namespace=self.mgmt_service.get_user().user.name, async_enabled=async_enabled, ) if not self.model_service.is_serving(): diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index ab8a606..5097c3d 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -62,7 +62,7 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + def liveness(self, async_enabled: bool = False) -> mgmt_interface.LivenessResponse: if async_enabled: return RequestFactory( method=self.hosts[self.instance].async_client.Liveness, @@ -76,16 +76,16 @@ def liveness(self, async_enabled: bool = False) -> healthcheck.HealthCheckRespon metadata=self.hosts[self.instance].metadata, ).send_sync() - def readiness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + def readiness(self, async_enabled: bool = False) -> mgmt_interface.ReadinessResponse: if async_enabled: return RequestFactory( - method=self.hosts[self.instance].async_client.Liveness, + method=self.hosts[self.instance].async_client.Readiness, request=mgmt_interface.ReadinessRequest(), metadata=self.hosts[self.instance].metadata, ).send_async() return RequestFactory( - method=self.hosts[self.instance].client.Liveness, + method=self.hosts[self.instance].client.Readiness, request=mgmt_interface.ReadinessRequest(), metadata=self.hosts[self.instance].metadata, ).send_sync() @@ -93,7 +93,7 @@ def readiness(self, async_enabled: bool = False) -> healthcheck.HealthCheckRespo def is_serving(self) -> bool: try: return ( - self.readiness().status + self.readiness().health_check_response.status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index 8f598d0..127fa0a 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -65,7 +65,9 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + def liveness( + self, async_enabled: bool = False + ) -> pipeline_interface.LivenessResponse: if async_enabled: return RequestFactory( method=self.hosts[self.instance].async_client.Liveness, @@ -79,7 +81,9 @@ def liveness(self, async_enabled: bool = False) -> healthcheck.HealthCheckRespon metadata=self.hosts[self.instance].metadata, ).send_sync() - def readiness(self, async_enabled: bool = False) -> healthcheck.HealthCheckResponse: + def readiness( + self, async_enabled: bool = False + ) -> pipeline_interface.ReadinessResponse: if async_enabled: return RequestFactory( method=self.hosts[self.instance].async_client.Readiness, @@ -96,7 +100,7 @@ def readiness(self, async_enabled: bool = False) -> healthcheck.HealthCheckRespo def is_serving(self) -> bool: try: return ( - self.readiness().status + self.readiness().health_check_response.status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: From e2e87b9f34ac5cdb50c93729609af46ecb1f94e6 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 05:29:29 +0800 Subject: [PATCH 09/10] chore: fix checks --- instill/clients/base.py | 9 +- instill/clients/mgmt.py | 4 +- instill/clients/model.py | 417 ++++++++++++++++++++++++------------ instill/clients/pipeline.py | 56 ++++- instill/resources/model.py | 48 ++++- 5 files changed, 380 insertions(+), 154 deletions(-) diff --git a/instill/clients/base.py b/instill/clients/base.py index 3c54dd6..1d4bd87 100644 --- a/instill/clients/base.py +++ b/instill/clients/base.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from typing import Union import google.protobuf.message import grpc @@ -57,7 +58,7 @@ def is_serving(self): class RequestFactory: def __init__( self, - method: grpc.UnaryUnaryMultiCallable, + method: Union[grpc.UnaryUnaryMultiCallable, grpc.StreamUnaryMultiCallable], request: google.protobuf.message.Message, metadata, ) -> None: @@ -68,5 +69,11 @@ def __init__( def send_sync(self): return self.method(request=self.request, metadata=self.metadata) + def send_stream(self): + return self.method( + request_iterator=iter([self.request]), + metadata=self.metadata, + ) + async def send_async(self): return await self.method(request=self.request, metadata=self.metadata) diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index 5097c3d..70e235a 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -76,7 +76,9 @@ def liveness(self, async_enabled: bool = False) -> mgmt_interface.LivenessRespon metadata=self.hosts[self.instance].metadata, ).send_sync() - def readiness(self, async_enabled: bool = False) -> mgmt_interface.ReadinessResponse: + def readiness( + self, async_enabled: bool = False + ) -> mgmt_interface.ReadinessResponse: if async_enabled: return RequestFactory( method=self.hosts[self.instance].async_client.Readiness, diff --git a/instill/clients/model.py b/instill/clients/model.py index 96e711b..bd27c24 100644 --- a/instill/clients/model.py +++ b/instill/clients/model.py @@ -1,8 +1,6 @@ # pylint: disable=no-member,wrong-import-position -import time -from typing import Dict, Iterable, Tuple, Union +from typing import Dict -from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 # common @@ -12,7 +10,7 @@ # model import instill.protogen.model.model.v1alpha.model_pb2 as model_interface import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service -from instill.clients.base import Client +from instill.clients.base import Client, RequestFactory from instill.clients.constant import DEFAULT_INSTANCE from instill.clients.instance import InstillInstance from instill.configuration import global_config @@ -64,38 +62,67 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: model_interface.LivenessResponse = self.hosts[ - self.instance - ].client.Liveness(request=model_interface.LivenessRequest()) - return resp.health_check_response.status + def liveness(self, async_enabled: bool = False) -> model_interface.LivenessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Liveness, + request=model_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() - def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: model_interface.ReadinessResponse = self.hosts[ - self.instance - ].client.Readiness(request=model_interface.ReadinessRequest()) - return resp.health_check_response.status + return RequestFactory( + method=self.hosts[self.instance].client.Liveness, + request=model_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + + def readiness( + self, async_enabled: bool = False + ) -> model_interface.ReadinessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Readiness, + request=model_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Readiness, + request=model_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() def is_serving(self) -> bool: try: return ( - self.readiness() + self.readiness().health_check_response.status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: return False @grpc_handler - def watch_model(self, model_name: str) -> model_interface.Model.State.ValueType: - resp: model_interface.WatchUserModelResponse = self.hosts[ - self.instance - ].client.WatchUserModel( + def watch_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.WatchUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.WatchUserModel, + request=model_interface.WatchUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.WatchUserModel, request=model_interface.WatchUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), metadata=self.hosts[self.instance].metadata, - ) - return resp.state + ).send_sync() @grpc_handler def create_model_local( @@ -103,7 +130,7 @@ def create_model_local( model_name: str, model_description: str, model_path: str, - ) -> model_interface.Model: + ) -> model_interface.CreateUserModelBinaryFileUploadResponse: model = model_interface.Model() model.id = model_name model.description = model_description @@ -114,25 +141,11 @@ def create_model_local( req = model_interface.CreateUserModelBinaryFileUploadRequest( parent=self.namespace, model=model, content=data ) - create_resp: model_interface.CreateUserModelBinaryFileUploadResponse = ( - self.hosts[self.instance].client.CreateUserModelBinaryFileUpload( - request_iterator=iter([req]), - metadata=self.hosts[self.instance].metadata, - ) - ) - - while self.get_operation(name=create_resp.operation.name).done is not True: - time.sleep(1) - - state = self.watch_model(model_name=model_name) - while state == 0: - time.sleep(1) - state = self.watch_model(model_name=model_name) - - if state == 1: - return self.get_model(model_name=model_name) - - raise SystemError("model creation failed") + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserModelBinaryFileUpload, + request=req, + metadata=self.hosts[self.instance].metadata, + ).send_stream() @grpc_handler def create_model( @@ -140,166 +153,296 @@ def create_model( name: str, definition: str, configuration: dict, - ) -> model_interface.Model: + async_enabled: bool = False, + ) -> model_interface.CreateUserModelResponse: model = model_interface.Model() model.id = name model.model_definition = definition model.configuration.update(configuration) - create_resp: model_interface.CreateUserModelResponse = self.hosts[ - self.instance - ].client.CreateUserModel( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateUserModel, + request=model_interface.CreateUserModelRequest( + model=model, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserModel, request=model_interface.CreateUserModelRequest( model=model, parent=self.namespace ), metadata=self.hosts[self.instance].metadata, - ) - - while self.get_operation(name=create_resp.operation.name).done is not True: - time.sleep(1) - - # TODO: due to state update delay of controller - # TODO: should optimize this in model-backend - time.sleep(3) - - state = self.watch_model(model_name=name) - while state == 0: - time.sleep(1) - state = self.watch_model(model_name=name) - - if state == 1: - return self.get_model(model_name=name) - - raise SystemError("model creation failed") + ).send_sync() @grpc_handler - def deploy_model(self, model_name: str) -> model_interface.Model.State: - self.hosts[self.instance].client.DeployUserModel( + def deploy_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.DeployUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeployUserModel, + request=model_interface.DeployUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeployUserModel, request=model_interface.DeployUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), metadata=self.hosts[self.instance].metadata, - ) - - state = self.watch_model(model_name=model_name) - while state not in (2, 3): - time.sleep(1) - state = self.watch_model(model_name=model_name) - - return state + ).send_sync() @grpc_handler - def undeploy_model(self, model_name: str) -> model_interface.Model.State: - self.hosts[self.instance].client.UndeployUserModel( + def undeploy_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.UndeployUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UndeployUserModel, + request=model_interface.UndeployUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UndeployUserModel, request=model_interface.UndeployUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), metadata=self.hosts[self.instance].metadata, - ) - - state = self.watch_model(model_name=model_name) - while state not in (1, 3): - time.sleep(1) - state = self.watch_model(model_name=model_name) - - return state + ).send_sync() @grpc_handler - def trigger_model(self, model_name: str, task_inputs: list) -> Iterable: - resp: model_interface.TriggerUserModelResponse = self.hosts[ - self.instance - ].client.TriggerUserModel( + def trigger_model( + self, + model_name: str, + task_inputs: list, + async_enabled: bool = False, + ) -> model_interface.TriggerUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerUserModel, + request=model_interface.TriggerUserModelRequest( + name=f"{self.namespace}/models/{model_name}", + task_inputs=task_inputs, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerUserModel, request=model_interface.TriggerUserModelRequest( name=f"{self.namespace}/models/{model_name}", task_inputs=task_inputs ), metadata=self.hosts[self.instance].metadata, - ) - return resp.task_outputs + ).send_sync() @grpc_handler - def delete_model(self, model_name: str): - self.hosts[self.instance].client.DeleteUserModel( + def delete_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.DeleteUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserModel, + request=model_interface.DeleteUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserModel, request=model_interface.DeleteUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), metadata=self.hosts[self.instance].metadata, - ) + ).send_sync() @grpc_handler - def get_model(self, model_name: str) -> model_interface.Model: - resp: model_interface.GetUserModelResponse = self.hosts[ - self.instance - ].client.GetUserModel( + def get_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.GetUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserModel, + request=model_interface.GetUserModelRequest( + name=f"{self.namespace}/models/{model_name}", + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserModel, request=model_interface.GetUserModelRequest( name=f"{self.namespace}/models/{model_name}", view=model_definition_interface.VIEW_FULL, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.model + ).send_sync() @grpc_handler def update_model( - self, model: model_interface.Model, mask: field_mask_pb2.FieldMask - ) -> model_interface.Model: - resp: model_interface.UpdateUserModelResponse = self.hosts[ - self.instance - ].client.UpdateUserModel( + self, + model: model_interface.Model, + mask: field_mask_pb2.FieldMask, + async_enabled: bool = False, + ) -> model_interface.UpdateUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UpdateUserModel, + request=model_interface.UpdateUserModelRequest( + model=model, + update_mask=mask, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UpdateUserModel, request=model_interface.UpdateUserModelRequest( model=model, update_mask=mask, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.model + ).send_sync() @grpc_handler - def lookup_model(self, model_uid: str) -> model_interface.Model: - resp: model_interface.LookUpModelResponse = self.hosts[ - self.instance - ].client.LookUpModel( + def lookup_model( + self, + model_uid: str, + async_enabled: bool = False, + ) -> model_interface.LookUpModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.LookUpModel, + request=model_interface.LookUpModelRequest( + permalink=f"models/{model_uid}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.LookUpModel, request=model_interface.LookUpModelRequest(permalink=f"models/{model_uid}"), metadata=self.hosts[self.instance].metadata, - ) - return resp.model + ).send_sync() @grpc_handler - def get_model_card(self, model_name: str) -> model_interface.ModelCard: - resp: model_interface.GetUserModelCardResponse = self.hosts[ - self.instance - ].client.GetUserModel( + def get_model_card( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.GetUserModelCardResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserModelCard, + request=model_interface.GetUserModelCardRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserModelCard, request=model_interface.GetUserModelCardRequest( name=f"{self.namespace}/models/{model_name}" ), metadata=self.hosts[self.instance].metadata, - ) - return resp.readme + ).send_sync() @grpc_handler - def list_models(self, public=False) -> Tuple[Iterable, str, int]: - resp: Union[ - model_interface.ListModelsResponse, model_interface.ListUserModelsResponse - ] - if not public: - resp = self.hosts[self.instance].client.ListUserModels( - request=model_interface.ListUserModelsRequest(parent=self.namespace), + def list_models( + self, + next_page_token: str = "", + total_size: int = 100, + show_deleted: bool = False, + public=False, + async_enabled: bool = False, + ) -> model_interface.ListUserModelsResponse: + if async_enabled: + if public: + method = self.hosts[self.instance].async_client.ListModels + return RequestFactory( + method=method, + request=model_interface.ListModelsRequest( + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + method = self.hosts[self.instance].async_client.ListUserModels + return RequestFactory( + method=method, + request=model_interface.ListUserModelsRequest( + parent=self.namespace, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), metadata=self.hosts[self.instance].metadata, - ) - else: - resp = self.hosts[self.instance].client.ListModels( - request=model_interface.ListModelsRequest(), + ).send_async() + if public: + method = self.hosts[self.instance].client.ListModels + return RequestFactory( + method=method, + request=model_interface.ListModelsRequest( + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), metadata=self.hosts[self.instance].metadata, - ) - - return resp.models, resp.next_page_token, resp.total_size + ).send_sync() + method = self.hosts[self.instance].client.ListUserModels + return RequestFactory( + method=method, + request=model_interface.ListUserModelsRequest( + parent=self.namespace, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_operation(self, name: str) -> operations_pb2.Operation: - resp: model_interface.GetModelOperationResponse = self.hosts[ - self.instance - ].client.GetModelOperation( + def get_operation( + self, + name: str, + async_enabled: bool = False, + ) -> model_interface.GetModelOperationResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetModelOperation, + request=model_interface.GetModelOperationRequest( + name=name, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetModelOperation, request=model_interface.GetModelOperationRequest( name=name, ), metadata=self.hosts[self.instance].metadata, - ) - return resp.operation + ).send_sync() diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index 127fa0a..4a55c33 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -398,8 +398,18 @@ def list_pipelines( if async_enabled: if public: method = self.hosts[self.instance].async_client.ListPipelines - else: - method = self.hosts[self.instance].async_client.ListUserPipelines + return RequestFactory( + method=method, + request=pipeline_interface.ListPipelinesRequest( + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=pipeline_interface.ListPipelinesRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + method = self.hosts[self.instance].async_client.ListUserPipelines return RequestFactory( method=method, request=pipeline_interface.ListUserPipelinesRequest( @@ -414,8 +424,18 @@ def list_pipelines( ).send_async() if public: method = self.hosts[self.instance].client.ListPipelines - else: - method = self.hosts[self.instance].client.ListUserPipelines + return RequestFactory( + method=method, + request=pipeline_interface.ListPipelinesRequest( + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=pipeline_interface.ListPipelinesRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + method = self.hosts[self.instance].client.ListUserPipelines return RequestFactory( method=method, request=pipeline_interface.ListUserPipelinesRequest( @@ -922,8 +942,18 @@ def list_connectors( if async_enabled: if public: method = self.hosts[self.instance].async_client.ListConnectors - else: - method = self.hosts[self.instance].async_client.ListUserConnectors + return RequestFactory( + method=method, + request=connector_interface.ListConnectorsRequest( + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListConnectorsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + method = self.hosts[self.instance].async_client.ListUserConnectors return RequestFactory( method=method, request=connector_interface.ListUserConnectorsRequest( @@ -938,8 +968,18 @@ def list_connectors( ).send_async() if public: method = self.hosts[self.instance].client.ListConnectors - else: - method = self.hosts[self.instance].client.ListUserConnectors + return RequestFactory( + method=method, + request=connector_interface.ListConnectorsRequest( + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListConnectorsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + method = self.hosts[self.instance].client.ListUserConnectors return RequestFactory( method=method, request=connector_interface.ListUserConnectorsRequest( diff --git a/instill/resources/model.py b/instill/resources/model.py index 1f4e3d9..8b1f6b5 100644 --- a/instill/resources/model.py +++ b/instill/resources/model.py @@ -1,4 +1,6 @@ # pylint: disable=no-member,wrong-import-position,no-name-in-module +import time + import instill.protogen.model.model.v1alpha.model_definition_pb2 as model_definition_interface import instill.protogen.model.model.v1alpha.model_pb2 as model_interface from instill.clients import InstillClient @@ -17,18 +19,36 @@ def __init__( self.client = client model = client.model_service.get_model(model_name=name, silent=True) if model is None: - model = client.model_service.create_model( + operation = client.model_service.create_model( name=name, definition=definition, configuration=configuration, - ) - if model is None: + ).operation + while ( + client.model_service.get_operation(name=operation.name).operation.done + is not True + ): + time.sleep(1) + # TODO: due to state update delay of controller + # TODO: should optimize this in model-backend + time.sleep(3) + + state = client.model_service.watch_model(model_name=name).state + while state == 0: + time.sleep(1) + state = client.model_service.watch_model(model_name=name).state + + if state == 1: + model = client.model_service.get_model(model_name=name).model + else: raise BaseException("model creation failed") self.resource = model def __call__(self, task_inputs: list) -> list: - return self.client.model_service.trigger_model(self.resource.id, task_inputs) + return self.client.model_service.trigger_model( + self.resource.id, task_inputs + ).task_outputs @property def client(self): @@ -47,24 +67,38 @@ def resource(self, resource: model_interface.Model): self._resource = resource def _update(self): - self.resource = self.client.model_service.get_model(model_name=self.resource.id) + self.resource = self.client.model_service.get_model( + model_name=self.resource.id + ).model def get_definition(self) -> model_definition_interface.ModelDefinition: return self.resource.model_definition def get_readme(self) -> str: - return self.client.model_service.get_model_card(self.resource.id) + return self.client.model_service.get_model_card(self.resource.id).readme def get_state(self) -> model_interface.Model.State: - return self.client.model_service.watch_model(self.resource.id) + return self.client.model_service.watch_model(self.resource.id).state def deploy(self) -> model_interface.Model: self.client.model_service.deploy_model(self.resource.id) + state = self.client.model_service.watch_model(model_name=self.resource.id).state + while state not in (2, 3): + time.sleep(1) + state = self.client.model_service.watch_model( + model_name=self.resource.id + ).state self._update() return self._resource def undeploy(self) -> model_interface.Model: self.client.model_service.undeploy_model(self.resource.id) + state = self.client.model_service.watch_model(model_name=self.resource.id).state + while state not in (1, 3): + time.sleep(1) + state = self.client.model_service.watch_model( + model_name=self.resource.id + ).state self._update() return self._resource From 54e05cecb2eed9e512b91890646b0f48ab8523cb Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 5 Dec 2023 05:34:11 +0800 Subject: [PATCH 10/10] chore: add back get_client --- instill/clients/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instill/clients/__init__.py b/instill/clients/__init__.py index 9dde8e5..acd20ff 100644 --- a/instill/clients/__init__.py +++ b/instill/clients/__init__.py @@ -1,4 +1,4 @@ -from instill.clients.client import InstillClient +from instill.clients.client import InstillClient, get_client from instill.clients.mgmt import MgmtClient from instill.clients.model import ModelClient from instill.clients.pipeline import PipelineClient