diff --git a/memphis/memphis.py b/memphis/memphis.py index 59d5f71..6cb0edc 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -404,8 +404,15 @@ async def producer( if not self.is_connection_active: raise MemphisError("Connection is dead") real_name = producer_name.lower() + internal_station_name = get_internal_name(station_name) if generate_random_suffix: producer_name = self.__generate_random_suffix(producer_name) + else: + map_key = internal_station_name + "_" + producer_name.lower() + producer = None + if map_key in self.producers_map: + return self.producers_map[map_key] + create_producer_req = { "name": producer_name, "station_name": station_name, @@ -425,7 +432,6 @@ async def producer( if create_res["error"] != "": raise MemphisError(create_res["error"]) - internal_station_name = get_internal_name(station_name) if "partitions_update" in create_res: if create_res["partitions_update"]["partitions_list"] is not None: self.partition_producers_updates_data[internal_station_name] = create_res[ diff --git a/memphis/producer.py b/memphis/producer.py index 282684e..6310793 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -237,6 +237,8 @@ async def produce( if self.internal_station_name not in self.connection.partition_producers_updates_data: partition_name = self.internal_station_name + elif len(self.connection.partition_producers_updates_data[self.internal_station_name]['partitions_list']) == 1: + partition_name = f"{self.internal_station_name}${self.connection.partition_producers_updates_data[self.internal_station_name]['partitions_list'][0]}" else: partition_name = f"{self.internal_station_name}${str(next(self.partition_generator))}"