diff --git a/instill/clients/base.py b/instill/clients/base.py index 7c3ea62..1d4bd87 100644 --- a/instill/clients/base.py +++ b/instill/clients/base.py @@ -1,4 +1,8 @@ from abc import ABC, abstractmethod +from typing import Union + +import google.protobuf.message +import grpc class Client(ABC): @@ -49,3 +53,27 @@ def readiness(self): @abstractmethod def is_serving(self): raise NotImplementedError + + +class RequestFactory: + def __init__( + self, + method: Union[grpc.UnaryUnaryMultiCallable, grpc.StreamUnaryMultiCallable], + request: google.protobuf.message.Message, + metadata, + ) -> None: + self.method = method + self.request = request + self.metadata = metadata + + def send_sync(self): + return self.method(request=self.request, metadata=self.metadata) + + def send_stream(self): + return self.method( + request_iterator=iter([self.request]), + metadata=self.metadata, + ) + + async def send_async(self): + return await self.method(request=self.request, metadata=self.metadata) diff --git a/instill/clients/client.py b/instill/clients/client.py index 54ca1f9..c90f2ef 100644 --- a/instill/clients/client.py +++ b/instill/clients/client.py @@ -5,49 +5,23 @@ from instill.utils.error_handler import NotServingException from instill.utils.logger import Logger -_mgmt_client = None -_pipeline_client = None -_model_client = None -_client = None - - -def _get_mgmt_client() -> MgmtClient: - global _mgmt_client - - if _mgmt_client is None: - _mgmt_client = MgmtClient() - - return _mgmt_client - - -def _get_pipeline_client() -> PipelineClient: - global _pipeline_client - - if _pipeline_client is None: - _pipeline_client = PipelineClient(namespace=_get_mgmt_client().get_user().name) - - return _pipeline_client - - -def _get_model_client() -> ModelClient: - global _model_client - - if _model_client is None: - _model_client = ModelClient(namespace=_get_mgmt_client().get_user().name) - - return _model_client - class InstillClient: - def __init__(self) -> None: - self.mgmt_service = _get_mgmt_client() + def __init__(self, async_enabled: bool = False) -> None: + self.mgmt_service = MgmtClient(async_enabled=async_enabled) if not self.mgmt_service.is_serving(): Logger.w("Instill Core is required") raise NotServingException - self.pipeline_service = _get_pipeline_client() + self.pipeline_service = PipelineClient( + namespace=self.mgmt_service.get_user().user.name, + async_enabled=async_enabled, + ) if not self.pipeline_service.is_serving(): Logger.w("Instill VDP is not serving, VDP functionalities will not work") - self.model_service = _get_model_client() + self.model_service = ModelClient( + namespace=self.mgmt_service.get_user().user.name, + async_enabled=async_enabled, + ) if not self.model_service.is_serving(): Logger.w( "Instill Model is not serving, Model functionalities will not work" @@ -61,19 +35,25 @@ def set_instance(self, instance: str): def close(self): if self.mgmt_service.is_serving(): for host in self.mgmt_service.hosts.values(): - host["channel"].close() + host.channel.close() if self.pipeline_service.is_serving(): for host in self.pipeline_service.hosts.values(): - host["channel"].close() + host.channel.close() if self.model_service.is_serving(): for host in self.model_service.hosts.values(): - host["channel"].close() + host.channel.close() + async def async_close(self): + if self.mgmt_service.is_serving(): + for host in self.mgmt_service.hosts.values(): + await host.async_channel.close() + if self.pipeline_service.is_serving(): + for host in self.pipeline_service.hosts.values(): + await host.async_channel.close() + if self.model_service.is_serving(): + for host in self.model_service.hosts.values(): + await host.async_channel.close() -def get_client() -> InstillClient: - global _client - - if _client is None: - _client = InstillClient() - return _client +def get_client(async_enabled: bool = False) -> InstillClient: + return InstillClient(async_enabled=async_enabled) diff --git a/instill/clients/instance.py b/instill/clients/instance.py new file mode 100644 index 0000000..e4fa072 --- /dev/null +++ b/instill/clients/instance.py @@ -0,0 +1,45 @@ +from typing import Union + +import grpc + +import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service +import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service +import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service + + +class InstillInstance: + def __init__(self, stub, url: str, token: str, secure: bool, async_enabled: bool): + self.url: str = url + self.token: str = token + self.async_enabled: bool = async_enabled + self.metadata: Union[str, tuple] = "" + if not secure: + channel = grpc.insecure_channel(url) + self.metadata = ( + ( + "authorization", + f"Bearer {token}", + ), + ) + if async_enabled: + async_channel = grpc.aio.insecure_channel(url) + else: + ssl_creds = grpc.ssl_channel_credentials() + call_creds = grpc.access_token_call_credentials(token) + creds = grpc.composite_channel_credentials(ssl_creds, call_creds) + channel = grpc.secure_channel(target=url, credentials=creds) + if async_enabled: + async_channel = grpc.aio.secure_channel(target=url, credentials=creds) + self.channel: grpc.Channel = channel + self.client: Union[ + model_service.ModelPublicServiceStub, + pipeline_service.PipelinePublicServiceStub, + mgmt_service.MgmtPublicServiceStub, + ] = stub(channel) + if async_enabled: + self.async_channel: grpc.Channel = async_channel + self.async_client: Union[ + model_service.ModelPublicServiceStub, + pipeline_service.PipelinePublicServiceStub, + mgmt_service.MgmtPublicServiceStub, + ] = stub(async_channel) diff --git a/instill/clients/mgmt.py b/instill/clients/mgmt.py index 30a24fa..70e235a 100644 --- a/instill/clients/mgmt.py +++ b/instill/clients/mgmt.py @@ -1,7 +1,5 @@ # pylint: disable=no-member,wrong-import-position -from collections import defaultdict - -import grpc +from typing import Dict import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck @@ -9,10 +7,11 @@ import instill.protogen.core.mgmt.v1alpha.metric_pb2 as metric_interface import instill.protogen.core.mgmt.v1alpha.mgmt_pb2 as mgmt_interface import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service -from instill.clients import constant # common -from instill.clients.base import Client +from instill.clients.base import Client, RequestFactory +from instill.clients.constant import DEFAULT_INSTANCE +from instill.clients.instance import InstillInstance from instill.configuration import global_config from instill.utils.error_handler import grpc_handler @@ -20,10 +19,10 @@ class MgmtClient(Client): - def __init__(self) -> None: - self.hosts: defaultdict = defaultdict(dict) - if constant.DEFAULT_INSTANCE in global_config.hosts: - self.instance = constant.DEFAULT_INSTANCE + def __init__(self, async_enabled: bool) -> None: + self.hosts: Dict[str, InstillInstance] = {} + if DEFAULT_INSTANCE in global_config.hosts: + self.instance = DEFAULT_INSTANCE elif len(global_config.hosts) == 0: self.instance = "" else: @@ -31,27 +30,12 @@ def __init__(self) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): - if not config.secure: - channel = grpc.insecure_channel(config.url) - self.hosts[instance]["metadata"] = ( - ( - "authorization", - f"Bearer {config.token}", - ), - ) - else: - ssl_creds = grpc.ssl_channel_credentials() - call_creds = grpc.access_token_call_credentials(config.token) - creds = grpc.composite_channel_credentials(ssl_creds, call_creds) - channel = grpc.secure_channel( - target=config.url, - credentials=creds, - ) - self.hosts[instance]["metadata"] = "" - self.hosts[instance]["token"] = config.token - self.hosts[instance]["channel"] = channel - self.hosts[instance]["client"] = mgmt_service.MgmtPublicServiceStub( - channel + self.hosts[instance] = InstillInstance( + mgmt_service.MgmtPublicServiceStub, + url=config.url, + token=config.token, + secure=config.secure, + async_enabled=async_enabled, ) @property @@ -78,108 +62,222 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: mgmt_interface.LivenessResponse = self.hosts[self.instance][ - "client" - ].Liveness(request=mgmt_interface.LivenessRequest()) - return resp.health_check_response.status + def liveness(self, async_enabled: bool = False) -> mgmt_interface.LivenessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Liveness, + request=mgmt_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Liveness, + request=mgmt_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() - def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: mgmt_interface.ReadinessResponse = self.hosts[self.instance][ - "client" - ].Readiness(request=mgmt_interface.ReadinessRequest()) - return resp.health_check_response.status + def readiness( + self, async_enabled: bool = False + ) -> mgmt_interface.ReadinessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Readiness, + request=mgmt_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Readiness, + request=mgmt_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() def is_serving(self) -> bool: try: return ( - self.readiness() + self.readiness().health_check_response.status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: return False @grpc_handler - def login(self, username="admin", password="password") -> str: - resp: mgmt_interface.AuthLoginResponse = self.hosts[self.instance][ - "client" - ].AuthLogin( + def login( + self, + username="admin", + password="password", + async_enabled: bool = False, + ) -> mgmt_interface.AuthLoginResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.AuthLogin, + request=mgmt_interface.AuthLoginRequest( + username=username, password=password + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.AuthLogin, request=mgmt_interface.AuthLoginRequest( username=username, password=password - ) - ) - return resp.access_token + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_token(self, name: str) -> mgmt_interface.ApiToken: - resp: mgmt_interface.GetTokenResponse = self.hosts[self.instance][ - "client" - ].GetToken( + def get_token( + self, + name: str, + async_enabled: bool = False, + ) -> mgmt_interface.GetTokenResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetToken, + request=mgmt_interface.GetTokenRequest(name=name), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetToken, request=mgmt_interface.GetTokenRequest(name=name), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.token + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_user(self) -> mgmt_interface.User: - resp: mgmt_interface.GetUserResponse = self.hosts[self.instance][ - "client" - ].GetUser( + def get_user( + self, + async_enabled: bool = False, + ) -> mgmt_interface.GetUserResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUser, + request=mgmt_interface.GetUserRequest(name="users/me"), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUser, request=mgmt_interface.GetUserRequest(name="users/me"), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.user + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_pipeline_trigger_records( self, + async_enabled: bool = False, ) -> metric_interface.ListPipelineTriggerRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListPipelineTriggerRecords, + request=metric_interface.ListPipelineTriggerChartRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListPipelineTriggerRecords, request=metric_interface.ListPipelineTriggerChartRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_pipeline_trigger_table_records( self, + async_enabled: bool = False, ) -> metric_interface.ListPipelineTriggerTableRecordsRequest: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListPipelineTriggerTableRecords, + request=metric_interface.ListPipelineTriggerTableRecordsResponse(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListPipelineTriggerTableRecords, request=metric_interface.ListPipelineTriggerTableRecordsResponse(), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_pipeline_trigger_chart_records( self, + async_enabled: bool = False, ) -> metric_interface.ListPipelineTriggerChartRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListPipelineTriggerChartRecords, + request=metric_interface.ListPipelineTriggerChartRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListPipelineTriggerChartRecords, request=metric_interface.ListPipelineTriggerChartRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_connector_execute_records( self, + async_enabled: bool = False, ) -> metric_interface.ListConnectorExecuteRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListConnectorExecuteRecords, + request=metric_interface.ListConnectorExecuteRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListConnectorExecuteRecords, request=metric_interface.ListConnectorExecuteRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_connector_execute_table_records( self, + async_enabled: bool = False, ) -> metric_interface.ListConnectorExecuteTableRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListConnectorExecuteTableRecords, + request=metric_interface.ListConnectorExecuteTableRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListConnectorExecuteTableRecords, request=metric_interface.ListConnectorExecuteTableRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_connector_execute_chart_records( self, + async_enabled: bool = False, ) -> metric_interface.ListConnectorExecuteChartRecordsResponse: - return self.hosts[self.instance]["client"].ListPipelineTriggerRecords( + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.ListConnectorExecuteChartRecords, + request=metric_interface.ListConnectorExecuteChartRecordsRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListConnectorExecuteChartRecords, request=metric_interface.ListConnectorExecuteChartRecordsRequest(), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() diff --git a/instill/clients/model.py b/instill/clients/model.py index 23ec2e8..bd27c24 100644 --- a/instill/clients/model.py +++ b/instill/clients/model.py @@ -1,32 +1,28 @@ # pylint: disable=no-member,wrong-import-position -import time -from collections import defaultdict -from typing import Iterable, Tuple, Union +from typing import Dict -import grpc -from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 +# common import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck import instill.protogen.model.model.v1alpha.model_definition_pb2 as model_definition_interface # model import instill.protogen.model.model.v1alpha.model_pb2 as model_interface import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service -from instill.clients import constant -from instill.clients.base import Client - -# common +from instill.clients.base import Client, RequestFactory +from instill.clients.constant import DEFAULT_INSTANCE +from instill.clients.instance import InstillInstance from instill.configuration import global_config from instill.utils.error_handler import grpc_handler class ModelClient(Client): - def __init__(self, namespace: str) -> None: - self.hosts: defaultdict = defaultdict(dict) + def __init__(self, namespace: str, async_enabled: bool) -> None: + self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace - if constant.DEFAULT_INSTANCE in global_config.hosts: - self.instance = constant.DEFAULT_INSTANCE + if DEFAULT_INSTANCE in global_config.hosts: + self.instance = DEFAULT_INSTANCE elif len(global_config.hosts) == 0: self.instance = "" else: @@ -34,27 +30,12 @@ def __init__(self, namespace: str) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): - if not config.secure: - channel = grpc.insecure_channel(config.url) - self.hosts[instance]["metadata"] = ( - ( - "authorization", - f"Bearer {config.token}", - ), - ) - else: - ssl_creds = grpc.ssl_channel_credentials() - call_creds = grpc.access_token_call_credentials(config.token) - creds = grpc.composite_channel_credentials(ssl_creds, call_creds) - channel = grpc.secure_channel( - target=config.url, - credentials=creds, - ) - self.hosts[instance]["metadata"] = "" - self.hosts[instance]["token"] = config.token - self.hosts[instance]["channel"] = channel - self.hosts[instance]["client"] = model_service.ModelPublicServiceStub( - channel + self.hosts[instance] = InstillInstance( + model_service.ModelPublicServiceStub, + url=config.url, + token=config.token, + secure=config.secure, + async_enabled=async_enabled, ) @property @@ -81,38 +62,67 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: model_interface.LivenessResponse = self.hosts[self.instance][ - "client" - ].Liveness(request=model_interface.LivenessRequest()) - return resp.health_check_response.status - - def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: model_interface.ReadinessResponse = self.hosts[self.instance][ - "client" - ].Readiness(request=model_interface.ReadinessRequest()) - return resp.health_check_response.status + def liveness(self, async_enabled: bool = False) -> model_interface.LivenessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Liveness, + request=model_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Liveness, + request=model_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + + def readiness( + self, async_enabled: bool = False + ) -> model_interface.ReadinessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Readiness, + request=model_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Readiness, + request=model_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() def is_serving(self) -> bool: try: return ( - self.readiness() + self.readiness().health_check_response.status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: return False @grpc_handler - def watch_model(self, model_name: str) -> model_interface.Model.State.ValueType: - resp: model_interface.WatchUserModelResponse = self.hosts[self.instance][ - "client" - ].WatchUserModel( + def watch_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.WatchUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.WatchUserModel, + request=model_interface.WatchUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.WatchUserModel, request=model_interface.WatchUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.state + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def create_model_local( @@ -120,7 +130,7 @@ def create_model_local( model_name: str, model_description: str, model_path: str, - ) -> model_interface.Model: + ) -> model_interface.CreateUserModelBinaryFileUploadResponse: model = model_interface.Model() model.id = model_name model.description = model_description @@ -131,25 +141,11 @@ def create_model_local( req = model_interface.CreateUserModelBinaryFileUploadRequest( parent=self.namespace, model=model, content=data ) - create_resp: model_interface.CreateUserModelBinaryFileUploadResponse = ( - self.hosts[self.instance]["client"].CreateUserModelBinaryFileUpload( - request_iterator=iter([req]), - metadata=self.hosts[self.instance]["metadata"], - ) - ) - - while self.get_operation(name=create_resp.operation.name).done is not True: - time.sleep(1) - - state = self.watch_model(model_name=model_name) - while state == 0: - time.sleep(1) - state = self.watch_model(model_name=model_name) - - if state == 1: - return self.get_model(model_name=model_name) - - raise SystemError("model creation failed") + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserModelBinaryFileUpload, + request=req, + metadata=self.hosts[self.instance].metadata, + ).send_stream() @grpc_handler def create_model( @@ -157,166 +153,296 @@ def create_model( name: str, definition: str, configuration: dict, - ) -> model_interface.Model: + async_enabled: bool = False, + ) -> model_interface.CreateUserModelResponse: model = model_interface.Model() model.id = name model.model_definition = definition model.configuration.update(configuration) - create_resp: model_interface.CreateUserModelResponse = self.hosts[ - self.instance - ]["client"].CreateUserModel( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateUserModel, + request=model_interface.CreateUserModelRequest( + model=model, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserModel, request=model_interface.CreateUserModelRequest( model=model, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], - ) - - while self.get_operation(name=create_resp.operation.name).done is not True: - time.sleep(1) - - # TODO: due to state update delay of controller - # TODO: should optimize this in model-backend - time.sleep(3) - - state = self.watch_model(model_name=name) - while state == 0: - time.sleep(1) - state = self.watch_model(model_name=name) - - if state == 1: - return self.get_model(model_name=name) - - raise SystemError("model creation failed") + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def deploy_model(self, model_name: str) -> model_interface.Model.State: - self.hosts[self.instance]["client"].DeployUserModel( + def deploy_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.DeployUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeployUserModel, + request=model_interface.DeployUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeployUserModel, request=model_interface.DeployUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - - state = self.watch_model(model_name=model_name) - while state not in (2, 3): - time.sleep(1) - state = self.watch_model(model_name=model_name) - - return state + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def undeploy_model(self, model_name: str) -> model_interface.Model.State: - self.hosts[self.instance]["client"].UndeployUserModel( + def undeploy_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.UndeployUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UndeployUserModel, + request=model_interface.UndeployUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UndeployUserModel, request=model_interface.UndeployUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - - state = self.watch_model(model_name=model_name) - while state not in (1, 3): - time.sleep(1) - state = self.watch_model(model_name=model_name) - - return state + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def trigger_model(self, model_name: str, task_inputs: list) -> Iterable: - resp: model_interface.TriggerUserModelResponse = self.hosts[self.instance][ - "client" - ].TriggerUserModel( + def trigger_model( + self, + model_name: str, + task_inputs: list, + async_enabled: bool = False, + ) -> model_interface.TriggerUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerUserModel, + request=model_interface.TriggerUserModelRequest( + name=f"{self.namespace}/models/{model_name}", + task_inputs=task_inputs, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerUserModel, request=model_interface.TriggerUserModelRequest( name=f"{self.namespace}/models/{model_name}", task_inputs=task_inputs ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.task_outputs + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def delete_model(self, model_name: str): - self.hosts[self.instance]["client"].DeleteUserModel( + def delete_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.DeleteUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserModel, + request=model_interface.DeleteUserModelRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserModel, request=model_interface.DeleteUserModelRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_model(self, model_name: str) -> model_interface.Model: - resp: model_interface.GetUserModelResponse = self.hosts[self.instance][ - "client" - ].GetUserModel( + def get_model( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.GetUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserModel, + request=model_interface.GetUserModelRequest( + name=f"{self.namespace}/models/{model_name}", + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserModel, request=model_interface.GetUserModelRequest( name=f"{self.namespace}/models/{model_name}", view=model_definition_interface.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.model + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def update_model( - self, model: model_interface.Model, mask: field_mask_pb2.FieldMask - ) -> model_interface.Model: - resp: model_interface.UpdateUserModelResponse = self.hosts[self.instance][ - "client" - ].UpdateUserModel( + self, + model: model_interface.Model, + mask: field_mask_pb2.FieldMask, + async_enabled: bool = False, + ) -> model_interface.UpdateUserModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UpdateUserModel, + request=model_interface.UpdateUserModelRequest( + model=model, + update_mask=mask, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UpdateUserModel, request=model_interface.UpdateUserModelRequest( model=model, update_mask=mask, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.model + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def lookup_model(self, model_uid: str) -> model_interface.Model: - resp: model_interface.LookUpModelResponse = self.hosts[self.instance][ - "client" - ].LookUpModel( + def lookup_model( + self, + model_uid: str, + async_enabled: bool = False, + ) -> model_interface.LookUpModelResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.LookUpModel, + request=model_interface.LookUpModelRequest( + permalink=f"models/{model_uid}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.LookUpModel, request=model_interface.LookUpModelRequest(permalink=f"models/{model_uid}"), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.model + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_model_card(self, model_name: str) -> model_interface.ModelCard: - resp: model_interface.GetUserModelCardResponse = self.hosts[self.instance][ - "client" - ].GetUserModel( + def get_model_card( + self, + model_name: str, + async_enabled: bool = False, + ) -> model_interface.GetUserModelCardResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserModelCard, + request=model_interface.GetUserModelCardRequest( + name=f"{self.namespace}/models/{model_name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserModelCard, request=model_interface.GetUserModelCardRequest( name=f"{self.namespace}/models/{model_name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.readme + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def list_models(self, public=False) -> Tuple[Iterable, str, int]: - resp: Union[ - model_interface.ListModelsResponse, model_interface.ListUserModelsResponse - ] - if not public: - resp = self.hosts[self.instance]["client"].ListUserModels( - request=model_interface.ListUserModelsRequest(parent=self.namespace), - metadata=self.hosts[self.instance]["metadata"], - ) - else: - resp = self.hosts[self.instance]["client"].ListModels( - request=model_interface.ListModelsRequest(), - metadata=self.hosts[self.instance]["metadata"], - ) - - return resp.models, resp.next_page_token, resp.total_size + def list_models( + self, + next_page_token: str = "", + total_size: int = 100, + show_deleted: bool = False, + public=False, + async_enabled: bool = False, + ) -> model_interface.ListUserModelsResponse: + if async_enabled: + if public: + method = self.hosts[self.instance].async_client.ListModels + return RequestFactory( + method=method, + request=model_interface.ListModelsRequest( + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + method = self.hosts[self.instance].async_client.ListUserModels + return RequestFactory( + method=method, + request=model_interface.ListUserModelsRequest( + parent=self.namespace, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + if public: + method = self.hosts[self.instance].client.ListModels + return RequestFactory( + method=method, + request=model_interface.ListModelsRequest( + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + method = self.hosts[self.instance].client.ListUserModels + return RequestFactory( + method=method, + request=model_interface.ListUserModelsRequest( + parent=self.namespace, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=model_definition_interface.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_operation(self, name: str) -> operations_pb2.Operation: - resp: model_interface.GetModelOperationResponse = self.hosts[self.instance][ - "client" - ].GetModelOperation( + def get_operation( + self, + name: str, + async_enabled: bool = False, + ) -> model_interface.GetModelOperationResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetModelOperation, + request=model_interface.GetModelOperationRequest( + name=name, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetModelOperation, request=model_interface.GetModelOperationRequest( name=name, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.operation + metadata=self.hosts[self.instance].metadata, + ).send_sync() diff --git a/instill/clients/pipeline.py b/instill/clients/pipeline.py index e4f7501..4a55c33 100644 --- a/instill/clients/pipeline.py +++ b/instill/clients/pipeline.py @@ -1,11 +1,9 @@ # pylint: disable=no-member,wrong-import-position -from collections import defaultdict -from typing import Iterable, Tuple, Union +from typing import Dict, Union -import grpc -from google.longrunning import operations_pb2 from google.protobuf import field_mask_pb2 +# common import instill.protogen.common.healthcheck.v1alpha.healthcheck_pb2 as healthcheck # pipeline @@ -13,10 +11,9 @@ import instill.protogen.vdp.pipeline.v1alpha.operator_definition_pb2 as operator_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_pb2 as pipeline_interface import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service -from instill.clients import constant - -# common -from instill.clients.base import Client +from instill.clients.base import Client, RequestFactory +from instill.clients.constant import DEFAULT_INSTANCE +from instill.clients.instance import InstillInstance from instill.configuration import global_config from instill.utils.error_handler import grpc_handler @@ -24,11 +21,11 @@ class PipelineClient(Client): - def __init__(self, namespace: str) -> None: - self.hosts: defaultdict = defaultdict(dict) + def __init__(self, namespace: str, async_enabled: bool) -> None: + self.hosts: Dict[str, InstillInstance] = {} self.namespace: str = namespace - if constant.DEFAULT_INSTANCE in global_config.hosts: - self.instance = constant.DEFAULT_INSTANCE + if DEFAULT_INSTANCE in global_config.hosts: + self.instance = DEFAULT_INSTANCE elif len(global_config.hosts) == 0: self.instance = "" else: @@ -36,35 +33,20 @@ def __init__(self, namespace: str) -> None: if global_config.hosts is not None: for instance, config in global_config.hosts.items(): - if not config.secure: - channel = grpc.insecure_channel(config.url) - self.hosts[instance]["metadata"] = ( - ( - "authorization", - f"Bearer {config.token}", - ), - ) - else: - ssl_creds = grpc.ssl_channel_credentials() - call_creds = grpc.access_token_call_credentials(config.token) - creds = grpc.composite_channel_credentials(ssl_creds, call_creds) - channel = grpc.secure_channel( - target=config.url, - credentials=creds, - ) - self.hosts[instance]["metadata"] = "" - self.hosts[instance]["token"] = config.token - self.hosts[instance]["channel"] = channel - self.hosts[instance][ - "client" - ] = pipeline_service.PipelinePublicServiceStub(channel) + self.hosts[instance] = InstillInstance( + pipeline_service.PipelinePublicServiceStub, + url=config.url, + token=config.token, + secure=config.secure, + async_enabled=async_enabled, + ) @property def hosts(self): return self._hosts @hosts.setter - def hosts(self, hosts: str): + def hosts(self, hosts: Dict[str, InstillInstance]): self._hosts = hosts @property @@ -83,22 +65,42 @@ def metadata(self): def metadata(self, metadata: str): self._metadata = metadata - def liveness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: pipeline_interface.LivenessResponse = self.hosts[self.instance][ - "client" - ].Liveness(request=pipeline_interface.LivenessRequest()) - return resp.health_check_response.status - - def readiness(self) -> healthcheck.HealthCheckResponse.ServingStatus: - resp: pipeline_interface.ReadinessResponse = self.hosts[self.instance][ - "client" - ].Readiness(request=pipeline_interface.ReadinessRequest()) - return resp.health_check_response.status + def liveness( + self, async_enabled: bool = False + ) -> pipeline_interface.LivenessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Liveness, + request=pipeline_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Liveness, + request=pipeline_interface.LivenessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + + def readiness( + self, async_enabled: bool = False + ) -> pipeline_interface.ReadinessResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.Readiness, + request=pipeline_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.Readiness, + request=pipeline_interface.ReadinessRequest(), + metadata=self.hosts[self.instance].metadata, + ).send_sync() def is_serving(self) -> bool: try: return ( - self.readiness() + self.readiness().health_check_response.status == healthcheck.HealthCheckResponse.SERVING_STATUS_SERVING ) except Exception: @@ -107,160 +109,278 @@ def is_serving(self) -> bool: @grpc_handler def list_operator_definitions( self, - filer_str: str = "", + filter_str: str = "", next_page_token: str = "", total_size: int = 100, - ) -> Tuple[Iterable, str, int]: - resp: operator_interface.ListOperatorDefinitionsResponse = self.hosts[ - self.instance - ]["client"].ListUserPipelines( + async_enabled: bool = False, + ) -> operator_interface.ListOperatorDefinitionsResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ListOperatorDefinitions, + request=operator_interface.ListOperatorDefinitionsRequest( + filter=filter_str, + page_size=total_size, + page_token=next_page_token, + view=operator_interface.ListOperatorDefinitionsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListOperatorDefinitions, request=operator_interface.ListOperatorDefinitionsRequest( - filter=filer_str, + filter=filter_str, page_size=total_size, page_token=next_page_token, view=operator_interface.ListOperatorDefinitionsRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - - return resp.operator_definitions, resp.next_page_token, resp.total_size + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def get_operator_definition( - self, name: str - ) -> operator_interface.OperatorDefinition: - resp: operator_interface.GetOperatorDefinitionResponse = self.hosts[ - self.instance - ]["client"].GetOperatorDefinition( + self, name: str, async_enabled: bool = False + ) -> operator_interface.GetOperatorDefinitionResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetOperatorDefinition, + request=operator_interface.GetOperatorDefinitionRequest( + name=f"operator-definitions//{name}", + view=operator_interface.GetOperatorDefinitionRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetOperatorDefinition, request=operator_interface.GetOperatorDefinitionRequest( name=f"operator-definitions//{name}", view=operator_interface.GetOperatorDefinitionRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.operator_definition + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def create_pipeline( self, name: str, recipe: pipeline_interface.Recipe, - ) -> pipeline_interface.Pipeline: + async_enabled: bool = False, + ) -> pipeline_interface.CreateUserPipelineResponse: pipeline = pipeline_interface.Pipeline( id=name, recipe=recipe, ) - resp: pipeline_interface.CreateUserPipelineResponse = self.hosts[self.instance][ - "client" - ].CreateUserPipeline( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateuserPipeline, + request=pipeline_interface.CreateUserPipelineRequest( + pipeline=pipeline, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserPipeline, request=pipeline_interface.CreateUserPipelineRequest( pipeline=pipeline, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.pipeline + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_pipeline(self, name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.GetUserPipelineResponse = self.hosts[self.instance][ - "client" - ].GetUserPipeline( + def get_pipeline( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.GetUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserPipeline, + request=pipeline_interface.GetUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserPipeline, request=pipeline_interface.GetUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.pipeline + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def lookup_pipeline(self, pipeline_uid: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.LookUpPipelineResponse = self.hosts[self.instance][ - "client" - ].LookUpPipeline( + def lookup_pipeline( + self, + pipeline_uid: str, + async_enabled: bool = False, + ) -> pipeline_interface.LookUpPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.LookUpPipeline, + request=pipeline_interface.LookUpPipelineRequest( + permalink=f"pipelines/{pipeline_uid}", + view=pipeline_interface.LookUpPipelineRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.LookUpPipeline, request=pipeline_interface.LookUpPipelineRequest( permalink=f"pipelines/{pipeline_uid}", view=pipeline_interface.LookUpPipelineRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.pipeline + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def rename_pipeline(self, name: str, new_name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.RenameUserPipelineResponse = self.hosts[self.instance][ - "client" - ].RenameUserPipeline( + def rename_pipeline( + self, + name: str, + new_name: str, + async_enabled: bool = False, + ) -> pipeline_interface.RenameUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.RenameUserPipeline, + request=pipeline_interface.RenameUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}", + new_pipeline_id=new_name, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.RenameUserPipeline, request=pipeline_interface.RenameUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", new_pipeline_id=new_name, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.pipeline + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def update_pipeline( - self, pipeline: pipeline_interface.Pipeline, mask: field_mask_pb2.FieldMask - ) -> pipeline_interface.Pipeline: - resp: pipeline_interface.UpdateUserPipelineResponse = self.hosts[self.instance][ - "client" - ].UpdateUserPipeline( + self, + pipeline: pipeline_interface.Pipeline, + mask: field_mask_pb2.FieldMask, + async_enabled: bool = False, + ) -> pipeline_interface.UpdateUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UpdateUserPipeline, + request=pipeline_interface.UpdateUserPipelineRequest( + pipeline=pipeline, + update_mask=mask, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UpdateUserPipeline, request=pipeline_interface.UpdateUserPipelineRequest( pipeline=pipeline, update_mask=mask, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.pipeline + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def validate_pipeline(self, name: str) -> pipeline_interface.Pipeline: - resp: pipeline_interface.ValidateUserPipelineResponse = self.hosts[ - self.instance - ]["client"].ValidateUserPipeline( + def validate_pipeline( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.ValidateUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ValidateUserPipeline, + request=pipeline_interface.ValidateUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ValidateUserPipeline, request=pipeline_interface.ValidateUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.pipeline + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def trigger_pipeline( - self, name: str, inputs: list - ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: - resp: pipeline_interface.TriggerUserPipelineResponse = self.hosts[ - self.instance - ]["client"].TriggerUserPipeline( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerUserPipeline, + request=pipeline_interface.TriggerUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerUserPipeline, request=pipeline_interface.TriggerUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.outputs, resp.metadata + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def trigger_async_pipeline( - self, name: str, inputs: list - ) -> operations_pb2.Operation: - resp: pipeline_interface.TriggerAsyncUserPipelineResponse = self.hosts[ - self.instance - ]["client"].TriggerAsyncUserPipeline( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerAsyncUserPipelineResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerAsyncUserPipeline, + request=pipeline_interface.TriggerAsyncUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerAsyncUserPipeline, request=pipeline_interface.TriggerAsyncUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.operation + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def delete_pipeline(self, name: str): - self.hosts[self.instance]["client"].DeleteUserPipeline( + def delete_pipeline( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.DeleteUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserPipeline, + request=pipeline_interface.DeleteUserPipelineRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserPipeline, request=pipeline_interface.DeleteUserPipelineRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_pipelines( @@ -270,13 +390,28 @@ def list_pipelines( total_size: int = 100, show_deleted: bool = False, public=False, - ) -> Tuple[Iterable, str, int]: - resp: Union[ - pipeline_interface.ListPipelinesResponse, - pipeline_interface.ListUserPipelinesResponse, - ] - if not public: - resp = self.hosts[self.instance]["client"].ListUserPipelines( + async_enabled: bool = False, + ) -> Union[ + pipeline_interface.ListPipelinesResponse, + pipeline_interface.ListUserPipelinesResponse, + ]: + if async_enabled: + if public: + method = self.hosts[self.instance].async_client.ListPipelines + return RequestFactory( + method=method, + request=pipeline_interface.ListPipelinesRequest( + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=pipeline_interface.ListPipelinesRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + method = self.hosts[self.instance].async_client.ListUserPipelines + return RequestFactory( + method=method, request=pipeline_interface.ListUserPipelinesRequest( parent=self.namespace, filter=filer_str, @@ -285,10 +420,12 @@ def list_pipelines( show_deleted=show_deleted, view=pipeline_interface.ListUserPipelinesRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - else: - resp = self.hosts[self.instance]["client"].ListPipelines( + metadata=self.hosts[self.instance].metadata, + ).send_async() + if public: + method = self.hosts[self.instance].client.ListPipelines + return RequestFactory( + method=method, request=pipeline_interface.ListPipelinesRequest( filter=filer_str, page_size=total_size, @@ -296,28 +433,51 @@ def list_pipelines( show_deleted=show_deleted, view=pipeline_interface.ListPipelinesRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - - return resp.pipelines, resp.next_page_token, resp.total_size + metadata=self.hosts[self.instance].metadata, + ).send_sync() + method = self.hosts[self.instance].client.ListUserPipelines + return RequestFactory( + method=method, + request=pipeline_interface.ListUserPipelinesRequest( + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=pipeline_interface.ListUserPipelinesRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_operation(self, name: str) -> operations_pb2.Operation: - resp: pipeline_interface.GetOperationResponse = self.hosts[self.instance][ - "client" - ].GetOperation( + def get_operation( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.GetOperationResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetOperation, + request=pipeline_interface.GetOperationRequest( + name=name, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetOperation, request=pipeline_interface.GetOperationRequest( name=name, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.operation + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def create_pipeline_release( self, version: str, - ) -> pipeline_interface.PipelineRelease: + async_enabled: bool = False, + ) -> pipeline_interface.CreateUserPipelineReleaseResponse: """Create a release version of a pipeline Args: @@ -329,18 +489,29 @@ def create_pipeline_release( pipeline_release = pipeline_interface.PipelineRelease( id=version, ) - resp: pipeline_interface.CreateUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].CreateUserPipelineRelease( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateUserPipelineRelease, + request=pipeline_interface.CreateUserPipelineReleaseRequest( + release=pipeline_release, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserPipelineRelease, request=pipeline_interface.CreateUserPipelineReleaseRequest( release=pipeline_release, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.release + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: + def get_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.GetUserPipelineReleaseResponse: """Get a released pipeline Args: @@ -349,21 +520,32 @@ def get_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: Returns: pipeline_interface.Pipeline: Released pipeline """ - resp: pipeline_interface.GetUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].GetUserPipelineRelease( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserPipelineRelease, + request=pipeline_interface.GetUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + view=pipeline_interface.GetUserPipelineReleaseRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserPipelineRelease, request=pipeline_interface.GetUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", view=pipeline_interface.GetUserPipelineReleaseRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.release + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def rename_pipeline_release( - self, name: str, new_version: str - ) -> pipeline_interface.PipelineRelease: + self, + name: str, + new_version: str, + async_enabled: bool = False, + ) -> pipeline_interface.RenameUserPipelineReleaseResponse: """Rename a released pipeline Args: @@ -373,33 +555,50 @@ def rename_pipeline_release( Returns: pipeline_interface.PipelineRelease: released pipeline """ - resp: pipeline_interface.RenameUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].RenameUserPipelineRelease( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.RenameUserPipelineRelease, + request=pipeline_interface.RenameUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + new_pipeline_release_id=new_version, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.RenameUserPipelineRelease, request=pipeline_interface.RenameUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", new_pipeline_release_id=new_version, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.release + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def update_pipeline_release( self, pipeline_release: pipeline_interface.PipelineRelease, mask: field_mask_pb2.FieldMask, - ) -> pipeline_interface.PipelineRelease: - resp: pipeline_interface.UpdateUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].UpdateUserPipelineRelease( + async_enabled: bool = False, + ) -> pipeline_interface.UpdateUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.UpdateUserPipelineRelease, + request=pipeline_interface.UpdateUserPipelineReleaseRequest( + release=pipeline_release, + update_mask=mask, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.UpdateUserPipelineRelease, request=pipeline_interface.UpdateUserPipelineReleaseRequest( release=pipeline_release, update_mask=mask, ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.release + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def list_pipeline_releases( @@ -408,10 +607,24 @@ def list_pipeline_releases( next_page_token: str = "", total_size: int = 100, show_deleted: bool = False, - ) -> Tuple[Iterable, str, int]: - resp: pipeline_interface.ListUserPipelineReleasesResponse = self.hosts[ - self.instance - ]["client"].ListUserPipelineReleases( + async_enabled: bool = False, + ) -> pipeline_interface.ListUserPipelineReleasesResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ListUserPipelineReleases, + request=pipeline_interface.ListUserPipelineReleasesRequest( + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=pipeline_interface.ListUserPipelineReleasesRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ListUserPipelineReleases, request=pipeline_interface.ListUserPipelineReleasesRequest( parent=self.namespace, filter=filer_str, @@ -420,85 +633,154 @@ def list_pipeline_releases( show_deleted=show_deleted, view=pipeline_interface.ListUserPipelineReleasesRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - - return resp.releases, resp.next_page_token, resp.total_size + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def delete_pipeline_release(self, name: str): - self.hosts[self.instance]["client"].DeleteUserPipelineRelease( + def delete_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.DeleteUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserPipelineRelease, + request=pipeline_interface.DeleteUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserPipelineRelease, request=pipeline_interface.DeleteUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def set_default_pipeline_release( - self, name: str - ) -> pipeline_interface.PipelineRelease: - resp: pipeline_interface.SetDefaultUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].SetDefaultUserPipelineRelease( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.SetDefaultUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.SetDefaultUserPipelineRelease, + request=pipeline_interface.SetDefaultUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.SetDefaultUserPipelineRelease, request=pipeline_interface.SetDefaultUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.release + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def restore_pipeline_release(self, name: str) -> pipeline_interface.PipelineRelease: - resp: pipeline_interface.RestoreUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].RestoreUserPipelineRelease( + def restore_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.RestoreUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.RestoreUserPipelineRelease, + request=pipeline_interface.RestoreUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.RestoreUserPipelineRelease, request=pipeline_interface.RestoreUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.release + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def watch_pipeline_release(self, name: str) -> pipeline_interface.State.ValueType: - resp: pipeline_interface.WatchUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].WatchUserPipelineRelease( + def watch_pipeline_release( + self, + name: str, + async_enabled: bool = False, + ) -> pipeline_interface.WatchUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.WatchUserPipelineRelease, + request=pipeline_interface.WatchUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.WatchUserPipelineRelease, request=pipeline_interface.WatchUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.state + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def trigger_pipeline_release( - self, name: str, inputs: list - ) -> Tuple[Iterable, pipeline_interface.TriggerMetadata]: - resp: pipeline_interface.TriggerUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].TriggerUserPipelineReleas( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TriggerUserPipelineReleas, + request=pipeline_interface.TriggerUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerUserPipelineReleas, request=pipeline_interface.TriggerUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.outputs, resp.metadata + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def trigger_async_pipeline_release( - self, name: str, inputs: list - ) -> operations_pb2.Operation: - resp: pipeline_interface.TriggerAsyncUserPipelineReleaseResponse = self.hosts[ - self.instance - ]["client"].TriggerAsyncUserPipelineRelease( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> pipeline_interface.TriggerAsyncUserPipelineReleaseResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[ + self.instance + ].async_client.TriggerAsyncUserPipelineRelease, + request=pipeline_interface.TriggerAsyncUserPipelineReleaseRequest( + name=f"{self.namespace}/pipelines/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TriggerAsyncUserPipelineRelease, request=pipeline_interface.TriggerAsyncUserPipelineReleaseRequest( name=f"{self.namespace}/pipelines/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], - ) - return resp.operation + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler def create_connector( @@ -506,95 +788,207 @@ def create_connector( name: str, definition: str, configuration: dict, - ) -> connector_interface.Connector: + async_enabled: bool = False, + ) -> connector_interface.CreateUserConnectorResponse: connector = connector_interface.Connector() connector.id = name connector.connector_definition_name = definition connector.configuration.update(configuration) - resp = self.hosts[self.instance]["client"].CreateUserConnector( + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.CreateUserConnector, + request=connector_interface.CreateUserConnectorRequest( + connector=connector, parent=self.namespace + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.CreateUserConnector, request=connector_interface.CreateUserConnectorRequest( connector=connector, parent=self.namespace ), - metadata=self.hosts[self.instance]["metadata"], - ) - - return resp.connector + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def get_connector(self, name: str) -> connector_interface.Connector: - return ( - self.hosts[self.instance]["client"] - .GetUserConnector( + def get_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.GetUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.GetUserConnector, request=connector_interface.GetUserConnectorRequest( name=f"{self.namespace}/connectors/{name}", view=connector_interface.GetUserConnectorRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - .connector - ) + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.GetUserConnector, + request=connector_interface.GetUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}", + view=connector_interface.GetUserConnectorRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def test_connector(self, name: str) -> connector_interface.Connector.State: - return ( - self.hosts[self.instance]["client"] - .TestUserConnector( + def test_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.TestUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.TestUserConnector, request=connector_interface.TestUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - .state - ) + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.TestUserConnector, + request=connector_interface.TestUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def execute_connector(self, name: str, inputs: list) -> list: - return ( - self.hosts[self.instance]["client"] - .ExecuteUserConnector( + def execute_connector( + self, + name: str, + inputs: list, + async_enabled: bool = False, + ) -> connector_interface.ExecuteUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.ExecuteUserConnector, request=connector_interface.ExecuteUserConnectorRequest( name=f"{self.namespace}/connectors/{name}", inputs=inputs ), - metadata=self.hosts[self.instance]["metadata"], - ) - .outputs - ) + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.ExecuteUserConnector, + request=connector_interface.ExecuteUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}", inputs=inputs + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def watch_connector(self, name: str) -> connector_interface.Connector.State: - return ( - self.hosts[self.instance]["client"] - .WatchUserConnector( + def watch_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.WatchUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.WatchUserConnector, request=connector_interface.WatchUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) - .state - ) + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.WatchUserConnector, + request=connector_interface.WatchUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def delete_connector(self, name: str): - self.hosts[self.instance]["client"].DeleteUserConnector( + def delete_connector( + self, + name: str, + async_enabled: bool = False, + ) -> connector_interface.DeleteUserConnectorResponse: + if async_enabled: + return RequestFactory( + method=self.hosts[self.instance].async_client.DeleteUserConnector, + request=connector_interface.DeleteUserConnectorRequest( + name=f"{self.namespace}/connectors/{name}" + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + + return RequestFactory( + method=self.hosts[self.instance].client.DeleteUserConnector, request=connector_interface.DeleteUserConnectorRequest( name=f"{self.namespace}/connectors/{name}" ), - metadata=self.hosts[self.instance]["metadata"], - ) + metadata=self.hosts[self.instance].metadata, + ).send_sync() @grpc_handler - def list_connectors(self, public=False) -> Tuple[list, str, int]: - if not public: - resp = self.hosts[self.instance]["client"].ListUserConnectors( + def list_connectors( + self, + filer_str: str = "", + next_page_token: str = "", + total_size: int = 100, + show_deleted: bool = False, + public=False, + async_enabled: bool = False, + ) -> connector_interface.ListUserConnectorsResponse: + if async_enabled: + if public: + method = self.hosts[self.instance].async_client.ListConnectors + return RequestFactory( + method=method, + request=connector_interface.ListConnectorsRequest( + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListConnectorsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_async() + method = self.hosts[self.instance].async_client.ListUserConnectors + return RequestFactory( + method=method, request=connector_interface.ListUserConnectorsRequest( - parent=self.namespace + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListUserConnectorsRequest.VIEW_FULL, ), - metadata=self.hosts[self.instance]["metadata"], - ) - else: - resp = self.hosts[self.instance]["client"].ListConnectors( - request=connector_interface.ListConnectorsRequest(), - metadata=(self.hosts[self.instance]["metadata"],), - ) - - return resp.connectors, resp.next_page_token, resp.total_size + metadata=self.hosts[self.instance].metadata, + ).send_async() + if public: + method = self.hosts[self.instance].client.ListConnectors + return RequestFactory( + method=method, + request=connector_interface.ListConnectorsRequest( + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListConnectorsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() + method = self.hosts[self.instance].client.ListUserConnectors + return RequestFactory( + method=method, + request=connector_interface.ListUserConnectorsRequest( + parent=self.namespace, + filter=filer_str, + page_size=total_size, + page_token=next_page_token, + show_deleted=show_deleted, + view=connector_interface.ListUserConnectorsRequest.VIEW_FULL, + ), + metadata=self.hosts[self.instance].metadata, + ).send_sync() diff --git a/instill/resources/connector.py b/instill/resources/connector.py index ba3baba..c377a37 100644 --- a/instill/resources/connector.py +++ b/instill/resources/connector.py @@ -16,26 +16,29 @@ def __init__( ) -> None: super().__init__() self.client = client - connector = client.pipeline_service.get_connector(name=name, silent=True) + connector = client.pipeline_service.get_connector( + name=name, silent=True + ).connector if connector is None: connector = client.pipeline_service.create_connector( name=name, definition=definition, configuration=configuration, - ) + ).connector if connector is None: raise BaseException("connector creation failed") self.resource = connector - def __call__(self, task_inputs: list, mode="execute") -> list: + def __call__(self, task_inputs: list, mode="execute"): if mode == "execute": - return self.client.pipeline_service.execute_connector( + resp = self.client.pipeline_service.execute_connector( self.resource.id, task_inputs ) + return resp.outputs return self.client.pipeline_service.test_connector( self.resource.id, task_inputs - ) + ).state @property def client(self): @@ -65,10 +68,10 @@ def get_definition(self) -> connector_definition_interface.ConnectorDefinition: return self.resource.connector_definition def get_state(self) -> connector_interface.Connector.State: - return self.client.pipeline_service.watch_connector(self.resource.id) + return self.client.pipeline_service.watch_connector(self.resource.id).state def test(self) -> connector_interface.Connector.State: - return self.client.pipeline_service.test_connector(self.resource.id) + return self.client.pipeline_service.test_connector(self.resource.id).state def delete(self): if self.resource is not None: diff --git a/instill/resources/model.py b/instill/resources/model.py index 1f4e3d9..8b1f6b5 100644 --- a/instill/resources/model.py +++ b/instill/resources/model.py @@ -1,4 +1,6 @@ # pylint: disable=no-member,wrong-import-position,no-name-in-module +import time + import instill.protogen.model.model.v1alpha.model_definition_pb2 as model_definition_interface import instill.protogen.model.model.v1alpha.model_pb2 as model_interface from instill.clients import InstillClient @@ -17,18 +19,36 @@ def __init__( self.client = client model = client.model_service.get_model(model_name=name, silent=True) if model is None: - model = client.model_service.create_model( + operation = client.model_service.create_model( name=name, definition=definition, configuration=configuration, - ) - if model is None: + ).operation + while ( + client.model_service.get_operation(name=operation.name).operation.done + is not True + ): + time.sleep(1) + # TODO: due to state update delay of controller + # TODO: should optimize this in model-backend + time.sleep(3) + + state = client.model_service.watch_model(model_name=name).state + while state == 0: + time.sleep(1) + state = client.model_service.watch_model(model_name=name).state + + if state == 1: + model = client.model_service.get_model(model_name=name).model + else: raise BaseException("model creation failed") self.resource = model def __call__(self, task_inputs: list) -> list: - return self.client.model_service.trigger_model(self.resource.id, task_inputs) + return self.client.model_service.trigger_model( + self.resource.id, task_inputs + ).task_outputs @property def client(self): @@ -47,24 +67,38 @@ def resource(self, resource: model_interface.Model): self._resource = resource def _update(self): - self.resource = self.client.model_service.get_model(model_name=self.resource.id) + self.resource = self.client.model_service.get_model( + model_name=self.resource.id + ).model def get_definition(self) -> model_definition_interface.ModelDefinition: return self.resource.model_definition def get_readme(self) -> str: - return self.client.model_service.get_model_card(self.resource.id) + return self.client.model_service.get_model_card(self.resource.id).readme def get_state(self) -> model_interface.Model.State: - return self.client.model_service.watch_model(self.resource.id) + return self.client.model_service.watch_model(self.resource.id).state def deploy(self) -> model_interface.Model: self.client.model_service.deploy_model(self.resource.id) + state = self.client.model_service.watch_model(model_name=self.resource.id).state + while state not in (2, 3): + time.sleep(1) + state = self.client.model_service.watch_model( + model_name=self.resource.id + ).state self._update() return self._resource def undeploy(self) -> model_interface.Model: self.client.model_service.undeploy_model(self.resource.id) + state = self.client.model_service.watch_model(model_name=self.resource.id).state + while state not in (1, 3): + time.sleep(1) + state = self.client.model_service.watch_model( + model_name=self.resource.id + ).state self._update() return self._resource diff --git a/instill/resources/pipeline.py b/instill/resources/pipeline.py index 34b7084..35b9a1a 100644 --- a/instill/resources/pipeline.py +++ b/instill/resources/pipeline.py @@ -17,9 +17,11 @@ def __init__( ) -> None: super().__init__() self.client = client - pipeline = client.pipeline_service.get_pipeline(name=name, silent=True) + pipeline = client.pipeline_service.get_pipeline(name=name, silent=True).pipeline if pipeline is None: - pipeline = client.pipeline_service.create_pipeline(name=name, recipe=recipe) + pipeline = client.pipeline_service.create_pipeline( + name=name, recipe=recipe + ).pipeline if pipeline is None: raise BaseException("pipeline creation failed") @@ -28,9 +30,10 @@ def __init__( def __call__( self, task_inputs: list ) -> Tuple[list, pipeline_interface.TriggerMetadata]: - return self.client.pipeline_service.trigger_pipeline( + resp = self.client.pipeline_service.trigger_pipeline( self.resource.id, task_inputs ) + return resp.outputs, resp.metadata @property def client(self): @@ -52,18 +55,20 @@ def _update(self): self.resource = self.client.pipeline_service.get_pipeline(name=self.resource.id) def get_operation(self, operation: operations_pb2.Operation): - return self.client.pipeline_service.get_operation(operation.name) + return self.client.pipeline_service.get_operation(operation.name).operation def trigger_async(self, task_inputs: list) -> operations_pb2.Operation: return self.client.pipeline_service.trigger_async_pipeline( self.resource.id, task_inputs - ) + ).operation def get_recipe(self) -> str: return self.resource.recipe def validate_pipeline(self) -> pipeline_interface.Pipeline: - return self.client.pipeline_service.validate_pipeline(name=self.resource.id) + return self.client.pipeline_service.validate_pipeline( + name=self.resource.id + ).pipeline def delete(self): if self.resource is not None: diff --git a/instill/tests/test_client.py b/instill/tests/test_client.py index 8d6d00b..b780d46 100644 --- a/instill/tests/test_client.py +++ b/instill/tests/test_client.py @@ -1,49 +1,118 @@ # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,no-name-in-module -from collections import defaultdict - +import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service +import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service +import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service from instill.clients import MgmtClient, ModelClient, PipelineClient +from instill.clients.instance import InstillInstance def describe_client(): def describe_instance(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.instance) == "" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.instance) == "" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.instance) == "" def when_set_correct_type(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) mgmt_client.instance = "staging" expect(mgmt_client.instance) == "staging" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", async_enabled=False) model_client.instance = "staging" expect(model_client.instance) == "staging" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.instance = "staging" expect(pipeline_client.instance) == "staging" def describe_host(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.hosts) is None - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.hosts) is None - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.hosts) is None - def when_set_correct_type(expect): - mgmt_client = MgmtClient() - d = defaultdict(dict) # type: ignore - d["test_instance"] = dict({"url": "test_url"}) - mgmt_client.hosts = d - expect(mgmt_client.hosts["test_instance"]["url"]) == "test_url" - model_client = ModelClient(namespace="") - model_client.hosts = d - expect(model_client.hosts["test_instance"]["url"]) == "test_url" - pipeline_client = PipelineClient(namespace="") - pipeline_client.hosts = d - expect(pipeline_client.hosts["test_instance"]["url"]) == "test_url" + def when_set_correct_type_url(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].url) == "test_url" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", async_enabled=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].url) == "test_url" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", async_enabled=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].url) == "test_url" + + def when_set_correct_type_token(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].token) == "token" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", async_enabled=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].token) == "token" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", async_enabled=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].token) == "token" diff --git a/tests/test_client.py b/tests/test_client.py index 8d6d00b..b780d46 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,49 +1,118 @@ # pylint: disable=redefined-outer-name,unused-variable,expression-not-assigned,no-name-in-module -from collections import defaultdict - +import instill.protogen.core.mgmt.v1alpha.mgmt_public_service_pb2_grpc as mgmt_service +import instill.protogen.model.model.v1alpha.model_public_service_pb2_grpc as model_service +import instill.protogen.vdp.pipeline.v1alpha.pipeline_public_service_pb2_grpc as pipeline_service from instill.clients import MgmtClient, ModelClient, PipelineClient +from instill.clients.instance import InstillInstance def describe_client(): def describe_instance(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.instance) == "" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.instance) == "" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.instance) == "" def when_set_correct_type(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) mgmt_client.instance = "staging" expect(mgmt_client.instance) == "staging" - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", async_enabled=False) model_client.instance = "staging" expect(model_client.instance) == "staging" - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", async_enabled=False) pipeline_client.instance = "staging" expect(pipeline_client.instance) == "staging" def describe_host(): def when_not_set(expect): - mgmt_client = MgmtClient() + mgmt_client = MgmtClient(False) expect(mgmt_client.hosts) is None - model_client = ModelClient(namespace="") + model_client = ModelClient(namespace="", async_enabled=False) expect(model_client.hosts) is None - pipeline_client = PipelineClient(namespace="") + pipeline_client = PipelineClient(namespace="", async_enabled=False) expect(pipeline_client.hosts) is None - def when_set_correct_type(expect): - mgmt_client = MgmtClient() - d = defaultdict(dict) # type: ignore - d["test_instance"] = dict({"url": "test_url"}) - mgmt_client.hosts = d - expect(mgmt_client.hosts["test_instance"]["url"]) == "test_url" - model_client = ModelClient(namespace="") - model_client.hosts = d - expect(model_client.hosts["test_instance"]["url"]) == "test_url" - pipeline_client = PipelineClient(namespace="") - pipeline_client.hosts = d - expect(pipeline_client.hosts["test_instance"]["url"]) == "test_url" + def when_set_correct_type_url(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].url) == "test_url" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", async_enabled=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].url) == "test_url" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", async_enabled=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].url) == "test_url" + + def when_set_correct_type_token(expect): + mgmt_instance = { + "test_instance": InstillInstance( + mgmt_service.MgmtPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + mgmt_client = MgmtClient(False) + mgmt_client.hosts = mgmt_instance + expect(mgmt_client.hosts["test_instance"].token) == "token" + + model_instance = { + "test_instance": InstillInstance( + model_service.ModelPublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + model_client = ModelClient(namespace="", async_enabled=False) + model_client.hosts = model_instance + expect(model_client.hosts["test_instance"].token) == "token" + + pipeline_instance = { + "test_instance": InstillInstance( + pipeline_service.PipelinePublicServiceStub, + "test_url", + "token", + False, + False, + ) + } + pipeline_client = PipelineClient(namespace="", async_enabled=False) + pipeline_client.hosts = pipeline_instance + expect(pipeline_client.hosts["test_instance"].token) == "token"