Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions memphis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ async def __ping_consumer(self, callback):
if "consumer not found" in str(e) or "stream not found" in str(e):
callback(MemphisError(str(e)))

async def destroy(self):
async def destroy(self, timeout_retries=5):
"""Destroy the consumer."""
if self.t_consume is not None:
self.t_consume.cancel()
Expand All @@ -308,8 +308,8 @@ async def destroy(self):
}
consumer_name = json.dumps(
destroy_consumer_req, indent=2).encode("utf-8")
res = await self.connection.broker_manager.request(
"$memphis_consumer_destructions", consumer_name, timeout=5
res = await self.connection._request(
"$memphis_consumer_destructions", consumer_name, 5, timeout_retries
)
error = res.data.decode("utf-8")
if error != "" and not "not exist" in error:
Expand Down
42 changes: 27 additions & 15 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ async def station(
tiered_storage_enabled: bool = False,
partitions_number: int = 1,
dls_station: str = "",
timeout_retries=5,
):
"""Creates a station.
Args:
Expand Down Expand Up @@ -291,8 +292,8 @@ async def station(
create_station_req_bytes = json.dumps(create_station_req, indent=2).encode(
"utf-8"
)
err_msg = await self.broker_manager.request(
"$memphis_station_creations", create_station_req_bytes, timeout=20
err_msg = await self._request(
"$memphis_station_creations", create_station_req_bytes, 20, timeout_retries
)
err_msg = err_msg.data.decode("utf-8")

Expand All @@ -315,7 +316,7 @@ async def attach_schema(self, name, station_name):
"""
await self.enforce_schema(name, station_name)

async def enforce_schema(self, name, station_name):
async def enforce_schema(self, name, station_name, timeout_retries=5):
"""Enforce a schema on an existing station.
Args:
name (str): schema name.
Expand All @@ -329,8 +330,8 @@ async def enforce_schema(self, name, station_name):
msg = {"name": name, "station_name": station_name,
"username": self.username}
msg_to_send = json.dumps(msg).encode("utf-8")
err_msg = await self.broker_manager.request(
"$memphis_schema_attachments", msg_to_send, timeout=20
err_msg = await self._request(
"$memphis_schema_attachments", msg_to_send, 20, timeout_retries
)
err_msg = err_msg.data.decode("utf-8")

Expand All @@ -339,7 +340,7 @@ async def enforce_schema(self, name, station_name):
except Exception as e:
raise MemphisError(str(e)) from e

async def detach_schema(self, station_name):
async def detach_schema(self, station_name, timeout_retries=5):
"""Detaches a schema from station.
Args:
station_name (str): station name.
Expand All @@ -351,8 +352,8 @@ async def detach_schema(self, station_name):
raise MemphisError("station name is missing")
msg = {"station_name": station_name, "username": self.username}
msg_to_send = json.dumps(msg).encode("utf-8")
err_msg = await self.broker_manager.request(
"$memphis_schema_detachments", msg_to_send, timeout=20
err_msg = await self._request(
"$memphis_schema_detachments", msg_to_send, 20, timeout_retries
)
err_msg = err_msg.data.decode("utf-8")

Expand Down Expand Up @@ -404,11 +405,21 @@ def __normalize_host(self, host):
return host.split("https://")[1]
return host

async def _request(self, subject, payload, timeout, timeout_retries=5):
try:
res = await self.broker_manager.request(subject, payload, timeout=timeout)
return res
except Exception as e:
if 'timeout' not in str(e).lower() or timeout_retries <= 0:
raise MemphisError(str(e)) from e
return await self._request(subject, payload, timeout=timeout, timeout_retries=timeout_retries-1)

async def producer(
self,
station_name: str,
producer_name: str,
generate_random_suffix: bool = False,
timeout_retries=5,
):
"""Creates a producer.
Args:
Expand Down Expand Up @@ -446,8 +457,8 @@ async def producer(
create_producer_req_bytes = json.dumps(create_producer_req, indent=2).encode(
"utf-8"
)
create_res = await self.broker_manager.request(
"$memphis_producer_creations", create_producer_req_bytes, timeout=20
create_res = await self._request(
"$memphis_producer_creations", create_producer_req_bytes, 20, timeout_retries
)
create_res = create_res.data.decode("utf-8")
create_res = json.loads(create_res)
Expand Down Expand Up @@ -625,6 +636,7 @@ async def consumer(
generate_random_suffix: bool = False,
start_consume_from_sequence: int = 1,
last_messages: int = -1,
timeout_retries=5,
):
"""Creates a consumer.
Args:.
Expand Down Expand Up @@ -684,8 +696,8 @@ async def consumer(
create_consumer_req_bytes = json.dumps(create_consumer_req, indent=2).encode(
"utf-8"
)
creation_res = await self.broker_manager.request(
"$memphis_consumer_creations", create_consumer_req_bytes, timeout=20
creation_res = await self._request(
"$memphis_consumer_creations", create_consumer_req_bytes, 20, timeout_retries
)
creation_res = creation_res.data.decode("utf-8")
if creation_res != "":
Expand Down Expand Up @@ -867,7 +879,7 @@ async def fetch_messages(
except Exception as e:
raise MemphisError(str(e)) from e

async def create_schema(self, schema_name, schema_type, schema_path):
async def create_schema(self, schema_name, schema_type, schema_path, timeout_retries=5):

"""Creates a new schema.
Args:.
Expand Down Expand Up @@ -899,8 +911,8 @@ async def create_schema(self, schema_name, schema_type, schema_path):

create_schema_req_bytes = json.dumps(create_schema_req, indent=2).encode("utf-8")

create_res = await self.broker_manager.request(
"$memphis_schema_creations", create_schema_req_bytes, timeout=20)
create_res = await self._request(
"$memphis_schema_creations", create_schema_req_bytes, 20, timeout_retries)

create_res = create_res.data.decode("utf-8")
create_res = json.loads(create_res)
Expand Down
6 changes: 3 additions & 3 deletions memphis/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def produce(
)
raise MemphisError(str(e)) from e

async def destroy(self):
async def destroy(self, timeout_retries=5):
"""Destroy the producer."""
try:
# drain buffered async messages
Expand All @@ -229,8 +229,8 @@ async def destroy(self):
}

producer_name = json.dumps(destroy_producer_req).encode("utf-8")
res = await self.connection.broker_manager.request(
"$memphis_producer_destructions", producer_name, timeout=5
res = await self.connection._request(
"$memphis_producer_destructions", producer_name, 5, timeout_retries
)
error = res.data.decode("utf-8")
if error != "" and not "not exist" in error:
Expand Down
6 changes: 3 additions & 3 deletions memphis/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ def validate_avro_schema(self, message):
except fastavro.validation.ValidationError as e:
raise MemphisSchemaError("Schema validation has failed: " + str(e))

async def destroy(self):
async def destroy(self, timeout_retries=5):
"""Destroy the station."""
try:
name_req = {"station_name": self.name, "username": self.connection.username}
station_name = json.dumps(name_req, indent=2).encode("utf-8")
res = await self.connection.broker_manager.request(
"$memphis_station_destructions", station_name, timeout=5
res = await self.connection._request(
"$memphis_station_destructions", station_name, 5, timeout_retries
)
error = res.data.decode("utf-8")
if error != "" and not "not exist" in error:
Expand Down