diff --git a/memphis/memphis.py b/memphis/memphis.py index 94e3c32..0e82d4b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -11,17 +11,18 @@ # See the License for the specific language governing permissions and # limitations under the License +import random import socket import json + import nats as broker -from nats.errors import TimeoutError + import uuid -from memphis.http_request import http_request from threading import Timer import asyncio -import memphis.retention_types -import memphis.storage_types +import retention_types +import storage_types class set_interval(): @@ -40,19 +41,15 @@ def cancel(self): class Memphis: def __init__(self): - self.client = socket.socket() - self.connected = False self.is_connection_active = False - - async def connect(self, host, username, connection_token, management_port=5555, tcp_port=6666, data_port=7766, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): + + async def connect(self, host, username, connection_token, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): """Creates connection with Memphis. Args: host (str): memphis host. username (str): user of type root/application. connection_token (str): broker token. - management_port (int, optional): management port. Defaults to 5555. - tcp_port (int, optional): tcp port. Defaults to 6666. - data_port (int, optional): data port. Defaults to 7766. + port (int, optional): port. Defaults to 6666. reconnect (bool, optional): whether to do reconnect while connection is lost. Defaults to True. max_reconnect (int, optional): The reconnect attempt. Defaults to 3. reconnect_interval_ms (int, optional): Interval in miliseconds between reconnect attempts. Defaults to 200. @@ -61,60 +58,27 @@ async def connect(self, host, username, connection_token, management_port=5555, self.host = self.__normalize_host(host) self.username = username self.connection_token = connection_token - self.management_port = management_port - self.tcp_port = tcp_port - self.data_port = data_port + self.port = port self.reconnect = reconnect self.max_reconnect = 9 if max_reconnect > 9 else max_reconnect self.reconnect_interval_ms = reconnect_interval_ms self.timeout_ms = timeout_ms + self.connection_id = self.__generateConnectionID() try: - self.client.connect((self.host, self.tcp_port)) - except OSError as msg: - self.client = None - self.__close() - raise Exception(msg) - - connection_details = {"username": self.username, - "broker_creds": self.connection_token, "connection_id": None} - self.client.send(json.dumps(connection_details).encode()) - data = self.client.recv(1024) - try: - data = json.loads(data) + self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port), + allow_reconnect=self.reconnect, + reconnect_time_wait=self.reconnect_interval_ms/1000, + connect_timeout=self.timeout_ms/1000, + max_reconnect_attempts=self.max_reconnect, + token=self.connection_token, + name=self.connection_id + "::" + self.username, max_outstanding_pings=1) + + self.broker_connection = self.broker_manager.jetstream() + self.is_connection_active = True except Exception as e: - raise Exception(data) - self.connection_id = data['connection_id'] - self.access_token_exp = data['access_token_exp'] - - self.is_connection_active = True - self.reconnect_attempts = 0 - - if data['access_token']: - self.access_token = data['access_token'] - self.t_keep_acess_token_fresh = set_interval( - self.__keep_acess_token_fresh, self.access_token_exp/1000) - - if data['ping_interval_ms']: - self.ping_interval_ms = data['ping_interval_ms'] - self.t_ping_server = set_interval( - self.__ping_server, self.ping_interval_ms/1000) - - if not self.connected: - try: - self.broker_manager = await broker.connect(servers=self.host+":"+str(self.data_port), allow_reconnect=self.reconnect, reconnect_time_wait=self.reconnect_interval_ms/1000, connect_timeout=self.timeout_ms/1000, max_reconnect_attempts=self.max_reconnect, token=self.connection_token) - self.broker_connection = self.broker_manager.jetstream() - self.connected = True - except Exception as e: - raise Exception(e) - - self.t_timeout = set_interval( - self.__handel_reconnect_timeout, self.timeout_ms/1000) - - def __handel_reconnect_timeout(self): - if(not self.reconnect or self.reconnect_attempts == self.max_reconnect or not self.is_connection_active): - raise Exception("Connection timeout has reached") + raise Exception(e) - def factory(self, name, description=""): + async def factory(self, name, description=""): """Creates a factory. Args: name (str): factory name. @@ -128,17 +92,25 @@ def factory(self, name, description=""): try: if not self.is_connection_active: raise Exception("Connection is dead") + createFactoryReq = { + "factory_name": name, + "factory_description": description + } + create_factory_req_bytes = json.dumps(createFactoryReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_factory_creations", create_factory_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) + return Factory(self, name) - response = http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/factories/createFactory', - headers={"Authorization": "Bearer " + self.access_token}, body_params={"name": name, "description": description}) - return Factory(self, json.loads(response)['name']) except Exception as e: if str(e).find('already exist') != -1: return Factory(self, name.lower()) else: raise Exception(e) - def station(self, name, factory_name, retention_type=memphis.retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=memphis.storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0): + async def station(self, name, factory_name, retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0): """Creates a station. Args: name (str): station name. @@ -155,9 +127,25 @@ def station(self, name, factory_name, retention_type=memphis.retention_types.MAX try: if not self.is_connection_active: raise Exception("Connection is dead") - response = http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/stations/createStation', headers={"Authorization": "Bearer " + self.access_token}, body_params={ - "name": name, "factory_name": factory_name, "retention_type": retention_type, "retention_value": retention_value, "storage_type": storage_type, "replicas": replicas, "dedup_enabled": dedup_enabled, "dedup_window_in_ms": dedup_window_ms}) - return Station(self, json.loads(response)['name']) + + createStationReq = { + "name": name, + "factory_name": factory_name, + "retention_type": retention_type, + "retention_value": retention_value, + "storage_type": storage_type, + "replicas": replicas, + "dedup_enabled": dedup_enabled, + "dedup_window_in_ms": dedup_window_ms + } + create_station_req_bytes = json.dumps(createStationReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_station_creations", create_station_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) + return Station(self, name) + except Exception as e: if str(e).find('already exist') != -1: return Station(self, name.lower()) @@ -171,21 +159,16 @@ async def close(self): if self.is_connection_active: await self.broker_manager.close() self.broker_manager = None - self.client.close() - self.client = None - self.t_keep_acess_token_fresh.cancel() - self.t_ping_server.cancel() - self.t_timeout.cancel() - self.access_token_timeout = None - self.ping_interval_ms = None - self.access_token_exp = None - self.access_token = None self.connection_id = None self.is_connection_active = False - self.reconnect_attempts = 0 except: return + def __generateConnectionID(self): + lst = [random.choice('0123456789abcdef') for n in range(24)] + s = "".join(lst) + return s + def __normalize_host(self, host): if (host.startswith("http://")): return host.split("http://")[1] @@ -194,57 +177,7 @@ def __normalize_host(self, host): else: return host - def __keep_acess_token_fresh(self): - if self.is_connection_active: - self.client.send(json.dumps( - {"resend_access_token": True}).encode()) - data = self.client.recv(1024) - try: - data = json.loads(data) - if data['access_token']: - self.access_token = data['access_token'] - except: - raise Exception(data) - - def __ping_server(self): - if self.is_connection_active: - self.client.send(json.dumps( - {"ping": True}).encode()) - data = self.client.recv(1024) - try: - data = json.loads(data) - if data['ping_interval_ms']: - self.ping_interval_ms = data['ping_interval_ms'] - except: - raise Exception(data) - - async def __close(self): - if self.reconnect is True: - while self.reconnect_attempts < self.max_reconnect: - self.reconnect_attempts += 1 - try: - await self.connect(host=self.host, management_port=self.management_port, tcp_port=self.tcp_port, data_port=self.data_port, username=self.username, connection_token=self.connection_token, reconnect=self.reconnect, max_reconnect=self.max_reconnect, reconnect_interval_ms=self.reconnect_interval_ms, timeout_ms=self.timeout_ms) - print("Reconnect to memphis has been succeeded") - return - except Exception as e: - print("Failed reconnect to memphis") - await asyncio.sleep(self.reconnect_interval_ms/1000) - if self.reconnect is False or self.reconnect_attempts >= self.max_reconnect: - if self.is_connection_active is True: - self.client.destroy() - self.connected = False - self.t_keep_acess_token_fresh.cancel() - self.t_ping_server.cancel() - self.t_timeout.cancel() - self.is_connection_active = False - while True: - if self.broker_manager: - self.broker_manager.close() - await asyncio.sleep(0.5) - else: - break - - def producer(self, station_name, producer_name): + async def producer(self, station_name, producer_name): """Creates a producer. Args: station_name (str): station name to produce messages into. @@ -258,16 +191,28 @@ def producer(self, station_name, producer_name): try: if not self.is_connection_active: raise Exception("Connection is dead") - - http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/producers/createProducer', headers={"Authorization": "Bearer " + self.access_token}, body_params={ - "name": producer_name, "station_name": station_name, "connection_id": self.connection_id, "producer_type": "application"}) + + createProducerReq = { + "name": producer_name, + "station_name": station_name, + "connection_id": self.connection_id, + "producer_type": "application" + } + create_producer_req_bytes = json.dumps(createProducerReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_producer_creations", create_producer_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) return Producer(self, producer_name, station_name) + except Exception as e: raise Exception(e) - def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10): + + async def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10): """Creates a consumer. - Args: + Args:. station_name (str): station name to consume messages from. consumer_name (str): name for the consumer. consumer_group (str, optional): consumer group name. Defaults to the consumer name. @@ -283,9 +228,25 @@ def consumer(self, station_name, consumer_name, consumer_group="", pull_interval if not self.is_connection_active: raise Exception("Connection is dead") cg = consumer_name if not consumer_group else consumer_group - http_request("POST", 'http://'+self.host+':' + str(self.management_port)+'/api/consumers/createConsumer', headers={"Authorization": "Bearer " + self.access_token}, body_params={ - "name": consumer_name, "station_name": station_name, "connection_id": self.connection_id, "consumer_type": "application", "consumers_group": cg, "max_ack_time_ms": max_ack_time_ms, "max_msg_deliveries": max_msg_deliveries}) + + createConsumerReq = { + 'name': consumer_name, + "station_name": station_name, + "connection_id": self.connection_id, + "consumer_type": 'application', + "consumers_group": consumer_group, + "max_ack_time_ms": max_ack_time_ms, + "max_msg_deliveries": max_msg_deliveries + } + + create_consumer_req_bytes = json.dumps(createConsumerReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_consumer_creations", create_consumer_req_bytes) + err_msg = err_msg.data.decode("utf-8") + + if err_msg != "": + raise Exception(err_msg) return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries) + except Exception as e: raise Exception(e) @@ -295,14 +256,19 @@ def __init__(self, connection, name): self.connection = connection self.name = name.lower() - def destroy(self): + async def destroy(self): """Destroy the factory. """ try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/factories/removeFactory', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"factory_name": self.name}) + nameReq = { + "factory_name":self.name + } + factory_name = json.dumps(nameReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_factory_destructions', factory_name) + except Exception as e: raise Exception(e) + class Station: @@ -310,12 +276,16 @@ def __init__(self, connection, name): self.connection = connection self.name = name.lower() - def destroy(self): + async def destroy(self): """Destroy the station. """ try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/stations/removeStation', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"station_name": self.name}) + nameReq = { + "station_name":self.name + } + station_name = json.dumps(nameReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_station_destructions', station_name) + except Exception as e: raise Exception(e) @@ -345,14 +315,19 @@ async def produce(self, message, ack_wait_sec=15): else: raise Exception(e) - def destroy(self): + async def destroy(self): """Destroy the producer. """ try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/producers/destroyProducer', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"name": self.producer_name, "station_name": self.station_name}) - except: - return + destroyProducerReq = { + "name": self.producer_name, + "station_name":self.station_name + } + producer_name = json.dumps(destroyProducerReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_producer_destructions', producer_name) + + except Exception as e: + raise Exception(e) class Consumer: @@ -410,15 +385,20 @@ async def __consume_dlq(self, callback): async def __ping_consumer(self): x = await self.connection.broker_connection.consumer_info(self.station_name, durable=self.consumer_group) - def destroy(self): + async def destroy(self): """Destroy the consumer. """ self.pull_interval_ms = None try: - http_request("DELETE", 'http://'+self.connection.host+':'+str(self.connection.management_port)+'/api/consumers/destroyConsumer', headers={ - "Authorization": "Bearer " + self.connection.access_token}, body_params={"name": self.consumer_name, "station_name": self.station_name}) + destroyConsumerReq = { + "name": self.consumer_name, + "station_name":self.station_name + } + consumer_name = json.dumps(destroyConsumerReq, indent=2).encode('utf-8') + await self.connection.broker_connection.publish('$memphis_consumer_destructions', consumer_name) + except Exception as e: - return + raise Exception(e) class Message: diff --git a/setup.py b/setup.py index bd85f84..b3d5b3b 100644 --- a/setup.py +++ b/setup.py @@ -13,9 +13,8 @@ download_url='https://github.com/memphisdev/memphis.py/archive/refs/tags/v0.1.7.tar.gz', keywords=['message broker', 'devtool', 'streaming', 'data'], install_requires=[ - 'nats-py', 'asyncio', - 'requests' + 'nats-py' ], classifiers=[ 'Development Status :: 4 - Beta', @@ -29,4 +28,4 @@ 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', ], -) +) \ No newline at end of file