From 5be0c0e86e22ca068116c02315bdac837b3ebb4f Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Sat, 20 Aug 2022 22:09:03 +0300 Subject: [PATCH 1/8] implement connect in sdk --- examples/full_example.py | 24 +++++++++++++ examples/producer.py | 22 ------------ memphis/memphis.py | 73 ++++++++++++++-------------------------- memphis/producer.py | 24 +++++++++++++ setup.py | 2 +- 5 files changed, 75 insertions(+), 70 deletions(-) create mode 100644 examples/full_example.py delete mode 100644 examples/producer.py create mode 100644 memphis/producer.py diff --git a/examples/full_example.py b/examples/full_example.py new file mode 100644 index 0000000..009c4de --- /dev/null +++ b/examples/full_example.py @@ -0,0 +1,24 @@ +import asyncio + +from memphis.memphis import Memphis +# from memphis import Memphis + + +async def main(): + try: + memphis = Memphis() + await memphis.connect(host="localhost", username="root", connection_token="memphis") + + # producer = memphis.producer( + # station_name="", producer_name="") + # for i in range(100): + # await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) + + except Exception as e: + print(e) + + finally: + await memphis.close() + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/producer.py b/examples/producer.py deleted file mode 100644 index 042e20d..0000000 --- a/examples/producer.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio -from memphis import Memphis - - -async def main(): - try: - memphis = Memphis() - await memphis.connect(host="", username="", connection_token="") - - producer = memphis.producer( - station_name="", producer_name="") - for i in range(100): - await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) - - except Exception as e: - print(e) - - finally: - await memphis.close() - -if __name__ == '__main__': - asyncio.run(main()) diff --git a/memphis/memphis.py b/memphis/memphis.py index 94e3c32..03975af 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -13,15 +13,23 @@ import socket import json +import time + +# import nats as broker + +import sys +sys.path.insert(0, "../memphis-nats.py/") import nats as broker from nats.errors import TimeoutError + import uuid -from memphis.http_request import http_request -from threading import Timer +from http_request import http_request +from threading import Thread, Timer import asyncio +# from memphis.http_request import http_request -import memphis.retention_types -import memphis.storage_types +import retention_types +import storage_types class set_interval(): @@ -44,7 +52,7 @@ def __init__(self): self.connected = False self.is_connection_active = False - async def connect(self, host, username, connection_token, management_port=5555, tcp_port=6666, data_port=7766, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): + async def connect(self, host, username, connection_token, management_port=5555, tcp_port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): """Creates connection with Memphis. Args: host (str): memphis host. @@ -52,7 +60,6 @@ async def connect(self, host, username, connection_token, management_port=5555, connection_token (str): broker token. management_port (int, optional): management port. Defaults to 5555. tcp_port (int, optional): tcp port. Defaults to 6666. - data_port (int, optional): data port. Defaults to 7766. reconnect (bool, optional): whether to do reconnect while connection is lost. Defaults to True. max_reconnect (int, optional): The reconnect attempt. Defaults to 3. reconnect_interval_ms (int, optional): Interval in miliseconds between reconnect attempts. Defaults to 200. @@ -63,52 +70,24 @@ async def connect(self, host, username, connection_token, management_port=5555, self.connection_token = connection_token self.management_port = management_port self.tcp_port = tcp_port - self.data_port = data_port self.reconnect = reconnect self.max_reconnect = 9 if max_reconnect > 9 else max_reconnect self.reconnect_interval_ms = reconnect_interval_ms self.timeout_ms = timeout_ms try: - self.client.connect((self.host, self.tcp_port)) - except OSError as msg: - self.client = None - self.__close() - raise Exception(msg) - - connection_details = {"username": self.username, - "broker_creds": self.connection_token, "connection_id": None} - self.client.send(json.dumps(connection_details).encode()) - data = self.client.recv(1024) - try: - data = json.loads(data) + self.broker_manager = await broker.connect(servers=self.host+":"+str(self.tcp_port), + allow_reconnect=self.reconnect, + reconnect_time_wait=self.reconnect_interval_ms/1000, + connect_timeout=self.timeout_ms/1000, + max_reconnect_attempts=self.max_reconnect, user=self.username, + token=self.connection_token) + + self.broker_connection = self.broker_manager.jetstream() + timeout = 3 + self.broker_manager.getConnectionId(timeout) except Exception as e: - raise Exception(data) - self.connection_id = data['connection_id'] - self.access_token_exp = data['access_token_exp'] - - self.is_connection_active = True - self.reconnect_attempts = 0 - - if data['access_token']: - self.access_token = data['access_token'] - self.t_keep_acess_token_fresh = set_interval( - self.__keep_acess_token_fresh, self.access_token_exp/1000) - - if data['ping_interval_ms']: - self.ping_interval_ms = data['ping_interval_ms'] - self.t_ping_server = set_interval( - self.__ping_server, self.ping_interval_ms/1000) - - if not self.connected: - try: - self.broker_manager = await broker.connect(servers=self.host+":"+str(self.data_port), allow_reconnect=self.reconnect, reconnect_time_wait=self.reconnect_interval_ms/1000, connect_timeout=self.timeout_ms/1000, max_reconnect_attempts=self.max_reconnect, token=self.connection_token) - self.broker_connection = self.broker_manager.jetstream() - self.connected = True - except Exception as e: - raise Exception(e) + raise Exception(e) - self.t_timeout = set_interval( - self.__handel_reconnect_timeout, self.timeout_ms/1000) def __handel_reconnect_timeout(self): if(not self.reconnect or self.reconnect_attempts == self.max_reconnect or not self.is_connection_active): @@ -138,7 +117,7 @@ def factory(self, name, description=""): else: raise Exception(e) - def station(self, name, factory_name, retention_type=memphis.retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=memphis.storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0): + def station(self, name, factory_name, retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0): """Creates a station. Args: name (str): station name. @@ -223,7 +202,7 @@ async def __close(self): while self.reconnect_attempts < self.max_reconnect: self.reconnect_attempts += 1 try: - await self.connect(host=self.host, management_port=self.management_port, tcp_port=self.tcp_port, data_port=self.data_port, username=self.username, connection_token=self.connection_token, reconnect=self.reconnect, max_reconnect=self.max_reconnect, reconnect_interval_ms=self.reconnect_interval_ms, timeout_ms=self.timeout_ms) + await self.connect(host=self.host, management_port=self.management_port, tcp_port=self.tcp_port, username=self.username, connection_token=self.connection_token, reconnect=self.reconnect, max_reconnect=self.max_reconnect, reconnect_interval_ms=self.reconnect_interval_ms, timeout_ms=self.timeout_ms) print("Reconnect to memphis has been succeeded") return except Exception as e: diff --git a/memphis/producer.py b/memphis/producer.py new file mode 100644 index 0000000..2705448 --- /dev/null +++ b/memphis/producer.py @@ -0,0 +1,24 @@ +import asyncio + +from memphis import Memphis + + +async def main(): + try: + memphis = Memphis() + await memphis.connect(host="localhost", username="root", connection_token="memphis") + + # producer = memphis.producer( + # station_name="sname", producer_name="pname") + # for i in range(100): + # await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) + + except Exception as e: + print("error" ,e) + + finally: + await memphis.close() + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/setup.py b/setup.py index bd85f84..26345b1 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,6 @@ download_url='https://github.com/memphisdev/memphis.py/archive/refs/tags/v0.1.7.tar.gz', keywords=['message broker', 'devtool', 'streaming', 'data'], install_requires=[ - 'nats-py', 'asyncio', 'requests' ], @@ -30,3 +29,4 @@ 'Programming Language :: Python :: 3.12', ], ) + # 'nats-py', From 75d987365ba06e8262d90bacc92957edfe15bce2 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Sun, 21 Aug 2022 00:03:33 +0300 Subject: [PATCH 2/8] add todo --- memphis/producer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/memphis/producer.py b/memphis/producer.py index 2705448..b936566 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -2,7 +2,7 @@ from memphis import Memphis - +#TODO need to move this example to examples directory - need to fix the problems with imports async def main(): try: memphis = Memphis() @@ -16,8 +16,8 @@ async def main(): except Exception as e: print("error" ,e) - finally: - await memphis.close() + # finally: + # await memphis.close() if __name__ == '__main__': From 683b7fe91a4591367bc3bbc788cd56f4da51fd15 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 24 Aug 2022 17:50:23 +0300 Subject: [PATCH 3/8] nats-integration-python-sdk --- memphis/memphis.py | 182 ++++++++++++++++++++++++++++++-------------- memphis/producer.py | 32 ++++++-- setup.py | 6 +- 3 files changed, 156 insertions(+), 64 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 03975af..f908671 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -11,22 +11,15 @@ # See the License for the specific language governing permissions and # limitations under the License +import random import socket import json -import time -# import nats as broker - -import sys -sys.path.insert(0, "../memphis-nats.py/") import nats as broker -from nats.errors import TimeoutError import uuid -from http_request import http_request -from threading import Thread, Timer +from threading import Timer import asyncio -# from memphis.http_request import http_request import retention_types import storage_types @@ -51,15 +44,15 @@ def __init__(self): self.client = socket.socket() self.connected = False self.is_connection_active = False - - async def connect(self, host, username, connection_token, management_port=5555, tcp_port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): + + async def connect(self, host, username, connection_token, management_port=5555, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): """Creates connection with Memphis. Args: host (str): memphis host. username (str): user of type root/application. connection_token (str): broker token. management_port (int, optional): management port. Defaults to 5555. - tcp_port (int, optional): tcp port. Defaults to 6666. + port (int, optional): port. Defaults to 6666. reconnect (bool, optional): whether to do reconnect while connection is lost. Defaults to True. max_reconnect (int, optional): The reconnect attempt. Defaults to 3. reconnect_interval_ms (int, optional): Interval in miliseconds between reconnect attempts. Defaults to 200. @@ -69,31 +62,27 @@ async def connect(self, host, username, connection_token, management_port=5555, self.username = username self.connection_token = connection_token self.management_port = management_port - self.tcp_port = tcp_port + self.port = port self.reconnect = reconnect self.max_reconnect = 9 if max_reconnect > 9 else max_reconnect self.reconnect_interval_ms = reconnect_interval_ms self.timeout_ms = timeout_ms + self.connection_id = self.__generateConnectionID() try: - self.broker_manager = await broker.connect(servers=self.host+":"+str(self.tcp_port), + self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port), allow_reconnect=self.reconnect, reconnect_time_wait=self.reconnect_interval_ms/1000, connect_timeout=self.timeout_ms/1000, - max_reconnect_attempts=self.max_reconnect, user=self.username, - token=self.connection_token) + max_reconnect_attempts=self.max_reconnect, + token=self.connection_token, + name=self.connection_id + "::" + self.username, max_outstanding_pings=1) self.broker_connection = self.broker_manager.jetstream() - timeout = 3 - self.broker_manager.getConnectionId(timeout) + self.is_connection_active = True except Exception as e: raise Exception(e) - - def __handel_reconnect_timeout(self): - if(not self.reconnect or self.reconnect_attempts == self.max_reconnect or not self.is_connection_active): - raise Exception("Connection timeout has reached") - - def factory(self, name, description=""): + async def factory(self, name, description=""): """Creates a factory. Args: name (str): factory name. @@ -107,17 +96,25 @@ def factory(self, name, description=""): try: if not self.is_connection_active: raise Exception("Connection is dead") + createFactoryReq = { + "factory_name": name, + "factory_description": description + } + create_factory_req_bytes = json.dumps(createFactoryReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_factory_creations", create_factory_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) + return Factory(self, name) - response = http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/factories/createFactory', - headers={"Authorization": "Bearer " + self.access_token}, body_params={"name": name, "description": description}) - return Factory(self, json.loads(response)['name']) except Exception as e: if str(e).find('already exist') != -1: return Factory(self, name.lower()) else: raise Exception(e) - def station(self, name, factory_name, retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0): + async def station(self, name, factory_name, retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0): """Creates a station. Args: name (str): station name. @@ -134,9 +131,25 @@ def station(self, name, factory_name, retention_type=retention_types.MAX_MESSAGE try: if not self.is_connection_active: raise Exception("Connection is dead") - response = http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/stations/createStation', headers={"Authorization": "Bearer " + self.access_token}, body_params={ - "name": name, "factory_name": factory_name, "retention_type": retention_type, "retention_value": retention_value, "storage_type": storage_type, "replicas": replicas, "dedup_enabled": dedup_enabled, "dedup_window_in_ms": dedup_window_ms}) - return Station(self, json.loads(response)['name']) + + createStationReq = { + "name": name, + "factory_name": factory_name, + "retention_type": retention_type, + "retention_value": retention_value, + "storage_type": storage_type, + "replicas": replicas, + "dedup_enabled": dedup_enabled, + "dedup_window_in_ms": dedup_window_ms + } + create_station_req_bytes = json.dumps(createStationReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_station_creations", create_station_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) + return Station(self, name) + except Exception as e: if str(e).find('already exist') != -1: return Station(self, name.lower()) @@ -165,6 +178,11 @@ async def close(self): except: return + def __generateConnectionID(self): + lst = [random.choice('0123456789abcdef') for n in range(24)] + s = "".join(lst) + return s + def __normalize_host(self, host): if (host.startswith("http://")): return host.split("http://")[1] @@ -223,7 +241,7 @@ async def __close(self): else: break - def producer(self, station_name, producer_name): + async def producer(self, station_name, producer_name): """Creates a producer. Args: station_name (str): station name to produce messages into. @@ -237,16 +255,31 @@ def producer(self, station_name, producer_name): try: if not self.is_connection_active: raise Exception("Connection is dead") - - http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/producers/createProducer', headers={"Authorization": "Bearer " + self.access_token}, body_params={ - "name": producer_name, "station_name": station_name, "connection_id": self.connection_id, "producer_type": "application"}) + + createProducerReq = { + "name": producer_name, + "station_name": station_name, + "connection_id": self.connection_id, + "producer_type": "application" + } + create_producer_req_bytes = json.dumps(createProducerReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_producer_creations", create_producer_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) return Producer(self, producer_name, station_name) + except Exception as e: - raise Exception(e) + if str(e).find('already exist') != -1: + return Producer(self, producer_name.lower(), station_name.lower()) + else: + raise Exception(e) + - def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10): + async def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10): """Creates a consumer. - Args: + Args:. station_name (str): station name to consume messages from. consumer_name (str): name for the consumer. consumer_group (str, optional): consumer group name. Defaults to the consumer name. @@ -262,11 +295,30 @@ def consumer(self, station_name, consumer_name, consumer_group="", pull_interval if not self.is_connection_active: raise Exception("Connection is dead") cg = consumer_name if not consumer_group else consumer_group - http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/consumers/createConsumer', headers={"Authorization": "Bearer " + self.access_token}, body_params={ - "name": consumer_name, "station_name": station_name, "connection_id": self.connection_id, "consumer_type": "application", "consumers_group": cg, "max_ack_time_ms": max_ack_time_ms, "max_msg_deliveries": max_msg_deliveries}) + + createConsumerReq = { + 'name': consumer_name, + "station_name": station_name, + "connection_id": self.connection_id, + "consumer_type": 'application', + "consumers_group": consumer_group, + "max_ack_time_ms": max_ack_time_ms, + "max_msg_deliveries": max_msg_deliveries + } + + create_consumer_req_bytes = json.dumps(createConsumerReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_consumer_creations", create_consumer_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries) + except Exception as e: - raise Exception(e) + if str(e).find('already exist') != -1: + return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries) + else: + raise Exception(e) class Factory: @@ -274,14 +326,18 @@ def __init__(self, connection, name): self.connection = connection self.name = name.lower() - def destroy(self): + async def destroy(self): """Destroy the factory. """ try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/factories/removeFactory', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"factory_name": self.name}) + nameReq = { + "factory_name":self.name + } + factory_name = json.dumps(nameReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_factory_destructions', factory_name) + except Exception as e: - raise Exception(e) + return class Station: @@ -289,14 +345,18 @@ def __init__(self, connection, name): self.connection = connection self.name = name.lower() - def destroy(self): + async def destroy(self): """Destroy the station. """ try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/stations/removeStation', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"station_name": self.name}) + nameReq = { + "station_name":self.name + } + station_name = json.dumps(nameReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_station_destructions', station_name) + except Exception as e: - raise Exception(e) + return class Producer: @@ -324,13 +384,18 @@ async def produce(self, message, ack_wait_sec=15): else: raise Exception(e) - def destroy(self): + async def destroy(self): """Destroy the producer. """ try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/producers/destroyProducer', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"name": self.producer_name, "station_name": self.station_name}) - except: + destroyProducerReq = { + "name": self.producer_name, + "station_name":self.station_name + } + producer_name = json.dumps(destroyProducerReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_producer_destructions', producer_name) + + except Exception as e: return @@ -389,13 +454,18 @@ async def __consume_dlq(self, callback): async def __ping_consumer(self): x = await self.connection.broker_connection.consumer_info(self.station_name, durable=self.consumer_group) - def destroy(self): + async def destroy(self): """Destroy the consumer. """ self.pull_interval_ms = None try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/consumers/destroyConsumer', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"name": self.consumer_name, "station_name": self.station_name}) + destroyConsumerReq = { + "name": self.consumer_name, + "station_name":self.station_name + } + consumer_name = json.dumps(destroyConsumerReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_consumer_destructions', consumer_name) + except Exception as e: return diff --git a/memphis/producer.py b/memphis/producer.py index b936566..f29200c 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -2,16 +2,38 @@ from memphis import Memphis -#TODO need to move this example to examples directory - need to fix the problems with imports async def main(): + + async def msg_handler(msgs, error): + try: + for msg in msgs: + print("message: ", msg.get_data()) + await msg.ack() + if error: + print(error) + except Exception as e: + print(e) + return + try: memphis = Memphis() await memphis.connect(host="localhost", username="root", connection_token="memphis") - # producer = memphis.producer( - # station_name="sname", producer_name="pname") - # for i in range(100): - # await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) + myfactory = await memphis.factory("shoham_shay_123567891012345") + myStation = await memphis.station("station_shoham12345678911", myfactory.name) + myProducer = await memphis.producer(myStation.name, "producername123") + for i in range(1): + await myProducer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) + myConsumer = await memphis.consumer( + station_name=myStation.name, consumer_name="consumer_name", consumer_group="") + myConsumer.consume(msg_handler) + # Keep your main thread alive so the consumer will keep receiving data + await asyncio.sleep(5) + + await myProducer.destroy() + await myConsumer.destroy() + await myStation.destroy() + await myfactory.destroy() except Exception as e: print("error" ,e) diff --git a/setup.py b/setup.py index 26345b1..1af633b 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,8 @@ keywords=['message broker', 'devtool', 'streaming', 'data'], install_requires=[ 'asyncio', - 'requests' + 'requests', + 'nats-py' ], classifiers=[ 'Development Status :: 4 - Beta', @@ -28,5 +29,4 @@ 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', ], -) - # 'nats-py', +) \ No newline at end of file From f015685a14b1c1e9a9d57ba0d7a68581b02cdb60 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 24 Aug 2022 17:58:26 +0300 Subject: [PATCH 4/8] update the producer example --- examples/{full_example.py => producer.py} | 9 ++--- memphis/producer.py | 46 ----------------------- 2 files changed, 4 insertions(+), 51 deletions(-) rename examples/{full_example.py => producer.py} (55%) delete mode 100644 memphis/producer.py diff --git a/examples/full_example.py b/examples/producer.py similarity index 55% rename from examples/full_example.py rename to examples/producer.py index 009c4de..0428d3f 100644 --- a/examples/full_example.py +++ b/examples/producer.py @@ -1,7 +1,6 @@ import asyncio from memphis.memphis import Memphis -# from memphis import Memphis async def main(): @@ -9,10 +8,10 @@ async def main(): memphis = Memphis() await memphis.connect(host="localhost", username="root", connection_token="memphis") - # producer = memphis.producer( - # station_name="", producer_name="") - # for i in range(100): - # await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) + producer = memphis.producer( + station_name="", producer_name="") + for i in range(100): + await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) except Exception as e: print(e) diff --git a/memphis/producer.py b/memphis/producer.py deleted file mode 100644 index f29200c..0000000 --- a/memphis/producer.py +++ /dev/null @@ -1,46 +0,0 @@ -import asyncio - -from memphis import Memphis - -async def main(): - - async def msg_handler(msgs, error): - try: - for msg in msgs: - print("message: ", msg.get_data()) - await msg.ack() - if error: - print(error) - except Exception as e: - print(e) - return - - try: - memphis = Memphis() - await memphis.connect(host="localhost", username="root", connection_token="memphis") - - myfactory = await memphis.factory("shoham_shay_123567891012345") - myStation = await memphis.station("station_shoham12345678911", myfactory.name) - myProducer = await memphis.producer(myStation.name, "producername123") - for i in range(1): - await myProducer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8')) - myConsumer = await memphis.consumer( - station_name=myStation.name, consumer_name="consumer_name", consumer_group="") - myConsumer.consume(msg_handler) - # Keep your main thread alive so the consumer will keep receiving data - await asyncio.sleep(5) - - await myProducer.destroy() - await myConsumer.destroy() - await myStation.destroy() - await myfactory.destroy() - - except Exception as e: - print("error" ,e) - - # finally: - # await memphis.close() - - -if __name__ == '__main__': - asyncio.run(main()) From 0cd5c2743895c8d759873e2ecfdc878512dd6c22 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 24 Aug 2022 18:26:22 +0300 Subject: [PATCH 5/8] revert producer example --- examples/producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/producer.py b/examples/producer.py index 0428d3f..b2a6dc0 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -1,6 +1,6 @@ import asyncio -from memphis.memphis import Memphis +from memphis import Memphis async def main(): From 08286a59cfaa115234c23aab36323f8cbc62d07b Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 24 Aug 2022 18:35:21 +0300 Subject: [PATCH 6/8] revert producer example --- examples/producer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/producer.py b/examples/producer.py index b2a6dc0..74a8a9e 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -1,12 +1,11 @@ import asyncio - from memphis import Memphis async def main(): try: memphis = Memphis() - await memphis.connect(host="localhost", username="root", connection_token="memphis") + await memphis.connect(host="", username="", connection_token="") producer = memphis.producer( station_name="", producer_name="") @@ -20,4 +19,4 @@ async def main(): await memphis.close() if __name__ == '__main__': - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file From 03c8d5b2d98d08618489198eade8e99e2245c474 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 24 Aug 2022 19:36:18 +0300 Subject: [PATCH 7/8] remove unnecessary fields and methods --- examples/producer.py | 2 +- memphis/memphis.py | 75 ++------------------------------------------ setup.py | 1 - 3 files changed, 4 insertions(+), 74 deletions(-) diff --git a/examples/producer.py b/examples/producer.py index 74a8a9e..042e20d 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -19,4 +19,4 @@ async def main(): await memphis.close() if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/memphis/memphis.py b/memphis/memphis.py index f908671..90f2ded 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -41,17 +41,14 @@ def cancel(self): class Memphis: def __init__(self): - self.client = socket.socket() - self.connected = False self.is_connection_active = False - async def connect(self, host, username, connection_token, management_port=5555, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): + async def connect(self, host, username, connection_token, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): """Creates connection with Memphis. Args: host (str): memphis host. username (str): user of type root/application. connection_token (str): broker token. - management_port (int, optional): management port. Defaults to 5555. port (int, optional): port. Defaults to 6666. reconnect (bool, optional): whether to do reconnect while connection is lost. Defaults to True. max_reconnect (int, optional): The reconnect attempt. Defaults to 3. @@ -61,7 +58,6 @@ async def connect(self, host, username, connection_token, management_port=5555, self.host = self.__normalize_host(host) self.username = username self.connection_token = connection_token - self.management_port = management_port self.port = port self.reconnect = reconnect self.max_reconnect = 9 if max_reconnect > 9 else max_reconnect @@ -163,15 +159,6 @@ async def close(self): if self.is_connection_active: await self.broker_manager.close() self.broker_manager = None - self.client.close() - self.client = None - self.t_keep_acess_token_fresh.cancel() - self.t_ping_server.cancel() - self.t_timeout.cancel() - self.access_token_timeout = None - self.ping_interval_ms = None - self.access_token_exp = None - self.access_token = None self.connection_id = None self.is_connection_active = False self.reconnect_attempts = 0 @@ -191,56 +178,6 @@ def __normalize_host(self, host): else: return host - def __keep_acess_token_fresh(self): - if self.is_connection_active: - self.client.send(json.dumps( - {"resend_access_token": True}).encode()) - data = self.client.recv(1024) - try: - data = json.loads(data) - if data['access_token']: - self.access_token = data['access_token'] - except: - raise Exception(data) - - def __ping_server(self): - if self.is_connection_active: - self.client.send(json.dumps( - {"ping": True}).encode()) - data = self.client.recv(1024) - try: - data = json.loads(data) - if data['ping_interval_ms']: - self.ping_interval_ms = data['ping_interval_ms'] - except: - raise Exception(data) - - async def __close(self): - if self.reconnect is True: - while self.reconnect_attempts < self.max_reconnect: - self.reconnect_attempts += 1 - try: - await self.connect(host=self.host, management_port=self.management_port, tcp_port=self.tcp_port, username=self.username, connection_token=self.connection_token, reconnect=self.reconnect, max_reconnect=self.max_reconnect, reconnect_interval_ms=self.reconnect_interval_ms, timeout_ms=self.timeout_ms) - print("Reconnect to memphis has been succeeded") - return - except Exception as e: - print("Failed reconnect to memphis") - await asyncio.sleep(self.reconnect_interval_ms/1000) - if self.reconnect is False or self.reconnect_attempts >= self.max_reconnect: - if self.is_connection_active is True: - self.client.destroy() - self.connected = False - self.t_keep_acess_token_fresh.cancel() - self.t_ping_server.cancel() - self.t_timeout.cancel() - self.is_connection_active = False - while True: - if self.broker_manager: - self.broker_manager.close() - await asyncio.sleep(0.5) - else: - break - async def producer(self, station_name, producer_name): """Creates a producer. Args: @@ -271,10 +208,7 @@ async def producer(self, station_name, producer_name): return Producer(self, producer_name, station_name) except Exception as e: - if str(e).find('already exist') != -1: - return Producer(self, producer_name.lower(), station_name.lower()) - else: - raise Exception(e) + raise Exception(e) async def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10): @@ -315,10 +249,7 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries) except Exception as e: - if str(e).find('already exist') != -1: - return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries) - else: - raise Exception(e) + raise Exception(e) class Factory: diff --git a/setup.py b/setup.py index 1af633b..b3d5b3b 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,6 @@ keywords=['message broker', 'devtool', 'streaming', 'data'], install_requires=[ 'asyncio', - 'requests', 'nats-py' ], classifiers=[ From f3ebbd5eff9347cf1b145f23c8b8fa2b1345906d Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 24 Aug 2022 19:55:06 +0300 Subject: [PATCH 8/8] add an exception in destroy methods --- memphis/memphis.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 90f2ded..0e82d4b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -161,7 +161,6 @@ async def close(self): self.broker_manager = None self.connection_id = None self.is_connection_active = False - self.reconnect_attempts = 0 except: return @@ -268,7 +267,8 @@ async def destroy(self): await self.connection.broker_connection.publish('$memphis_factory_destructions', factory_name) except Exception as e: - return + raise Exception(e) + class Station: @@ -287,7 +287,7 @@ async def destroy(self): await self.connection.broker_connection.publish('$memphis_station_destructions', station_name) except Exception as e: - return + raise Exception(e) class Producer: @@ -327,7 +327,7 @@ async def destroy(self): await self.connection.broker_connection.publish('$memphis_producer_destructions', producer_name) except Exception as e: - return + raise Exception(e) class Consumer: @@ -398,7 +398,7 @@ async def destroy(self): await self.connection.broker_connection.publish('$memphis_consumer_destructions', consumer_name) except Exception as e: - return + raise Exception(e) class Message: