diff --git a/memphis/memphis.py b/memphis/memphis.py index 82c02eb..a14d37b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -426,11 +426,11 @@ async def producer( raise MemphisError(create_res["error"]) internal_station_name = get_internal_name(station_name) - - if create_res["partitions_update"]["partitions_list"] is not None: - self.partition_producers_updates_data[internal_station_name] = create_res[ - "partitions_update" - ] + if "partitions_update" in create_res: + if create_res["partitions_update"]["partitions_list"] is not None: + self.partition_producers_updates_data[internal_station_name] = create_res[ + "partitions_update" + ] self.station_schemaverse_to_dls[internal_station_name] = create_res[ "schemaverse_to_dls" @@ -615,17 +615,20 @@ async def consumer( "$memphis_consumer_creations", create_consumer_req_bytes, timeout=5 ) creation_res = creation_res.data.decode("utf-8") - creation_res = json.loads(creation_res) - + if creation_res != "": + try: + creation_res = json.loads(creation_res) - if creation_res["error"] != "": - raise MemphisError(creation_res["error"]) - internal_station_name = get_internal_name(station_name) - - if creation_res["partitions_update"]["partitions_list"] is not None: - self.partition_consumers_updates_data[internal_station_name] = creation_res["partitions_update"] + if creation_res["error"] != "": + raise MemphisError(creation_res["error"]) + internal_station_name = get_internal_name(station_name) + if creation_res["partitions_update"]["partitions_list"] is not None: + self.partition_consumers_updates_data[internal_station_name] = creation_res["partitions_update"] + except: + raise MemphisError(creation_res) + internal_station_name = get_internal_name(station_name) map_key = internal_station_name + "_" + real_name consumer = Consumer( self,