From 3def8c765c8109dca7c01f97be6d46cb5699f1f7 Mon Sep 17 00:00:00 2001 From: daniel-davidd Date: Tue, 15 Aug 2023 10:46:41 +0300 Subject: [PATCH 1/2] changed stream name to ping and added validation for the errors --- memphis/consumer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index d0a3875..5a93831 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -251,20 +251,21 @@ async def __ping_consumer(self, callback): await asyncio.sleep(self.ping_consumer_interval_ms / 1000) station_inner = get_internal_name(self.station_name) consumer_group = get_internal_name(self.consumer_group) - if self.inner_station_name not in self.connection.partition_consumers_updates_data: + if self.inner_station_name in self.connection.partition_consumers_updates_data: for p in self.connection.partition_consumers_updates_data[station_inner]["partitions_list"]: - stream_name = f"{station_inner}${str(p)}.final" + stream_name = f"{station_inner}${str(p)}" await self.connection.broker_connection.consumer_info( stream_name, consumer_group, timeout=30 ) else: - stream_name = f"{station_inner}.final" + stream_name = station_inner await self.connection.broker_connection.consumer_info( stream_name, consumer_group, timeout=30 ) except Exception as e: - callback(MemphisError(str(e))) + if "consumer not found" or "stream not found" in str(e): + callback(MemphisError(str(e))) async def destroy(self): """Destroy the consumer.""" From 56fa8f96817afbd07f71fea814c990c54ff04f89 Mon Sep 17 00:00:00 2001 From: daniel-davidd Date: Tue, 15 Aug 2023 11:00:46 +0300 Subject: [PATCH 2/2] resolved pylint --- memphis/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 5a93831..c6dbdc5 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -264,7 +264,7 @@ async def __ping_consumer(self, callback): ) except Exception as e: - if "consumer not found" or "stream not found" in str(e): + if "consumer not found" in str(e) or "stream not found" in str(e): callback(MemphisError(str(e))) async def destroy(self):