diff --git a/memphis/consumer.py b/memphis/consumer.py index 9edf861..824267b 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -289,7 +289,7 @@ async def __ping_consumer(self, callback): if "consumer not found" in str(e) or "stream not found" in str(e): callback(MemphisError(str(e))) - async def destroy(self): + async def destroy(self, timeout_retries=5): """Destroy the consumer.""" if self.t_consume is not None: self.t_consume.cancel() @@ -308,8 +308,8 @@ async def destroy(self): } consumer_name = json.dumps( destroy_consumer_req, indent=2).encode("utf-8") - res = await self.connection.broker_manager.request( - "$memphis_consumer_destructions", consumer_name, timeout=5 + res = await self.connection._request( + "$memphis_consumer_destructions", consumer_name, 5, timeout_retries ) error = res.data.decode("utf-8") if error != "" and not "not exist" in error: diff --git a/memphis/memphis.py b/memphis/memphis.py index 636e872..6b82e54 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -247,6 +247,7 @@ async def station( tiered_storage_enabled: bool = False, partitions_number: int = 1, dls_station: str = "", + timeout_retries=5, ): """Creates a station. Args: @@ -291,8 +292,8 @@ async def station( create_station_req_bytes = json.dumps(create_station_req, indent=2).encode( "utf-8" ) - err_msg = await self.broker_manager.request( - "$memphis_station_creations", create_station_req_bytes, timeout=20 + err_msg = await self._request( + "$memphis_station_creations", create_station_req_bytes, 20, timeout_retries ) err_msg = err_msg.data.decode("utf-8") @@ -315,7 +316,7 @@ async def attach_schema(self, name, station_name): """ await self.enforce_schema(name, station_name) - async def enforce_schema(self, name, station_name): + async def enforce_schema(self, name, station_name, timeout_retries=5): """Enforce a schema on an existing station. Args: name (str): schema name. @@ -329,8 +330,8 @@ async def enforce_schema(self, name, station_name): msg = {"name": name, "station_name": station_name, "username": self.username} msg_to_send = json.dumps(msg).encode("utf-8") - err_msg = await self.broker_manager.request( - "$memphis_schema_attachments", msg_to_send, timeout=20 + err_msg = await self._request( + "$memphis_schema_attachments", msg_to_send, 20, timeout_retries ) err_msg = err_msg.data.decode("utf-8") @@ -339,7 +340,7 @@ async def enforce_schema(self, name, station_name): except Exception as e: raise MemphisError(str(e)) from e - async def detach_schema(self, station_name): + async def detach_schema(self, station_name, timeout_retries=5): """Detaches a schema from station. Args: station_name (str): station name. @@ -351,8 +352,8 @@ async def detach_schema(self, station_name): raise MemphisError("station name is missing") msg = {"station_name": station_name, "username": self.username} msg_to_send = json.dumps(msg).encode("utf-8") - err_msg = await self.broker_manager.request( - "$memphis_schema_detachments", msg_to_send, timeout=20 + err_msg = await self._request( + "$memphis_schema_detachments", msg_to_send, 20, timeout_retries ) err_msg = err_msg.data.decode("utf-8") @@ -404,11 +405,21 @@ def __normalize_host(self, host): return host.split("https://")[1] return host + async def _request(self, subject, payload, timeout, timeout_retries=5): + try: + res = await self.broker_manager.request(subject, payload, timeout=timeout) + return res + except Exception as e: + if 'timeout' not in str(e).lower() or timeout_retries <= 0: + raise MemphisError(str(e)) from e + return await self._request(subject, payload, timeout=timeout, timeout_retries=timeout_retries-1) + async def producer( self, station_name: str, producer_name: str, generate_random_suffix: bool = False, + timeout_retries=5, ): """Creates a producer. Args: @@ -446,8 +457,8 @@ async def producer( create_producer_req_bytes = json.dumps(create_producer_req, indent=2).encode( "utf-8" ) - create_res = await self.broker_manager.request( - "$memphis_producer_creations", create_producer_req_bytes, timeout=20 + create_res = await self._request( + "$memphis_producer_creations", create_producer_req_bytes, 20, timeout_retries ) create_res = create_res.data.decode("utf-8") create_res = json.loads(create_res) @@ -625,6 +636,7 @@ async def consumer( generate_random_suffix: bool = False, start_consume_from_sequence: int = 1, last_messages: int = -1, + timeout_retries=5, ): """Creates a consumer. Args:. @@ -684,8 +696,8 @@ async def consumer( create_consumer_req_bytes = json.dumps(create_consumer_req, indent=2).encode( "utf-8" ) - creation_res = await self.broker_manager.request( - "$memphis_consumer_creations", create_consumer_req_bytes, timeout=20 + creation_res = await self._request( + "$memphis_consumer_creations", create_consumer_req_bytes, 20, timeout_retries ) creation_res = creation_res.data.decode("utf-8") if creation_res != "": @@ -867,7 +879,7 @@ async def fetch_messages( except Exception as e: raise MemphisError(str(e)) from e - async def create_schema(self, schema_name, schema_type, schema_path): + async def create_schema(self, schema_name, schema_type, schema_path, timeout_retries=5): """Creates a new schema. Args:. @@ -899,8 +911,8 @@ async def create_schema(self, schema_name, schema_type, schema_path): create_schema_req_bytes = json.dumps(create_schema_req, indent=2).encode("utf-8") - create_res = await self.broker_manager.request( - "$memphis_schema_creations", create_schema_req_bytes, timeout=20) + create_res = await self._request( + "$memphis_schema_creations", create_schema_req_bytes, 20, timeout_retries) create_res = create_res.data.decode("utf-8") create_res = json.loads(create_res) diff --git a/memphis/producer.py b/memphis/producer.py index a912c33..b662089 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -213,7 +213,7 @@ async def produce( ) raise MemphisError(str(e)) from e - async def destroy(self): + async def destroy(self, timeout_retries=5): """Destroy the producer.""" try: # drain buffered async messages @@ -229,8 +229,8 @@ async def destroy(self): } producer_name = json.dumps(destroy_producer_req).encode("utf-8") - res = await self.connection.broker_manager.request( - "$memphis_producer_destructions", producer_name, timeout=5 + res = await self.connection._request( + "$memphis_producer_destructions", producer_name, 5, timeout_retries ) error = res.data.decode("utf-8") if error != "" and not "not exist" in error: diff --git a/memphis/station.py b/memphis/station.py index b2bc572..06d10f2 100644 --- a/memphis/station.py +++ b/memphis/station.py @@ -163,13 +163,13 @@ def validate_avro_schema(self, message): except fastavro.validation.ValidationError as e: raise MemphisSchemaError("Schema validation has failed: " + str(e)) - async def destroy(self): + async def destroy(self, timeout_retries=5): """Destroy the station.""" try: name_req = {"station_name": self.name, "username": self.connection.username} station_name = json.dumps(name_req, indent=2).encode("utf-8") - res = await self.connection.broker_manager.request( - "$memphis_station_destructions", station_name, timeout=5 + res = await self.connection._request( + "$memphis_station_destructions", station_name, 5, timeout_retries ) error = res.data.decode("utf-8") if error != "" and not "not exist" in error: