Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
76 changes: 45 additions & 31 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,18 @@ async def connect(self, host, username, connection_token, management_port=5555,
self.reconnect = reconnect
self.max_reconnect = 9 if max_reconnect > 9 else max_reconnect
self.reconnect_interval_ms = reconnect_interval_ms
self.timeout_ms = timeout_ms # check interval
self.timeout_ms = timeout_ms
try:
self.client.connect((self.host, self.tcp_port))
connection_details = {"username": self.username,
"broker_creds": self.connection_token, "connection_id": None}

except OSError as msg:
self.client = None
self.__close()

if self.client is None:
raise Exception("could not open socket")
connection_details = {"username": self.username,
"broker_creds": self.connection_token, "connection_id": None}
self.client.send(json.dumps(connection_details).encode())
data = self.client.recv(1024)
try:
Expand All @@ -82,19 +83,21 @@ async def connect(self, host, username, connection_token, management_port=5555,
self.access_token = data['access_token']
t_keep_acess_token_fresh = Thread(
target=self.__keep_acess_token_fresh)
t_keep_acess_token_fresh.daemon = True
t_keep_acess_token_fresh.start()

if data['ping_interval_ms']:
self.ping_interval_ms = data['ping_interval_ms']
t_ping_interval_ms = Thread(
target=self.__ping_server)
t_ping_interval_ms.daemon = True
t_ping_interval_ms.start()

if not self.connected:
try:
self.broker_manager = await broker.connect(servers=self.host+":"+str(self.data_port), allow_reconnect=True, reconnect_time_wait=2, connect_timeout=2, max_reconnect_attempts=60, token="memphis")
self.broker_connection = self.broker_manager.jetstream()
self.connected = True
print(self.broker_connection)
except Exception as e:
raise Exception(e)

Expand Down Expand Up @@ -126,7 +129,7 @@ async def factory(self, name, description=""):
raise Exception(e)

async def station(self, name, factory_name, retention_type="message_age_sec", retention_value=604800, storage_type="file", replicas=1, dedup_enabled=False, dedup_window_ms=0):
"""Creates a station.
"""Creates a station.

Args:
name (str): station name.
Expand Down Expand Up @@ -154,19 +157,23 @@ async def station(self, name, factory_name, retention_type="message_age_sec", re
raise Exception(e)

async def close(self):
"""Close Memphis connection.
"""Close Memphis connection.
"""
if self.is_connection_active:
self.client.close()
self.access_token_timeout = None
self.ping_interval_ms = None
self.access_token_exp = None
self.access_token = None
self.connection_id = None
self.is_connection_active = False
self.reconnect_attempts = 0
await self.broker_manager.close()
self.broker_manager = None
try:
if self.is_connection_active:
await self.broker_manager.close()
self.broker_manager = None
self.client.close()
self.client = None
self.access_token_timeout = None
self.ping_interval_ms = None
self.access_token_exp = None
self.access_token = None
self.connection_id = None
self.is_connection_active = False
self.reconnect_attempts = 0
except:
return

def __normalize_host(self, host):
if (host.startswith("http://")):
Expand All @@ -179,21 +186,21 @@ def __normalize_host(self, host):
def __keep_acess_token_fresh(self):
starttime = time.time()
while True:
if not self.access_token_exp:
break
time.sleep(self.access_token_exp/1000 -
((time.time() - starttime) % self.access_token_exp/1000))
if not self.access_token_exp or not self.client:
break
if self.is_connection_active:
self.client.send(json.dumps(
{"resend_access_token": True}).encode())

def __ping_server(self):
starttime = time.time()
while True:
if not self.ping_interval_ms:
break
time.sleep(self.ping_interval_ms/1000 -
((time.time() - starttime) % self.ping_interval_ms/1000))
if not self.ping_interval_ms or self.client:
break
if self.is_connection_active:
self.client.send(json.dumps(
{"ping": True}).encode())
Expand All @@ -208,6 +215,7 @@ async def __close(self):
try:
await self.connect(host=self.host, management_port=self.management_port, tcp_port=self.tcp_port, data_port=self.data_port, username=self.username, connection_token=self.connection_token, reconnect=self.reconnect, max_reconnect=self.max_reconnect, reconnect_interval_ms=self.reconnect_interval_ms, timeout_ms=self.timeout_ms)
print("Reconnect to memphis has been succeeded")
break
except Exception as e:
print("Failed reconnect to memphis")
return
Expand All @@ -223,6 +231,8 @@ async def __close(self):
time.sleep(0.5 - ((time.time() - starttime) % 0.5))
if self.broker_manager is True:
self.broker_manager.close()
else:
break

async def producer(self, station_name, producer_name):
"""Creates a producer.
Expand Down Expand Up @@ -322,11 +332,11 @@ async def produce(self, message, ack_wait_sec=15):
Exception: _description_
"""
try:
await self.connection.broker_connection.publish(self.station_name + ".final", message.encode('ascii'), headers={
await self.connection.broker_connection.publish(self.station_name + ".final", message, headers={
"Nats-Msg-Id": str(uuid.uuid4())
})
except Exception as e:
if e.status_code == '503':
if hasattr(e, 'status_code') and e.status_code == '503':
raise Exception(
"Produce operation has failed, please check wheether Station/Producer are still exist")
else:
Expand Down Expand Up @@ -358,17 +368,21 @@ async def consume(self):
"""Consume events.
"""
try:
durable_name = self.consumer_group if self.consumer_group else self.consumer_name
self.psub = await self.connection.broker_connection.pull_subscribe(
self.station_name + ".final", "psub")
self.station_name + ".final", durable=durable_name)
starttime = time.time()
while True:
time.sleep(self.pull_interval_ms/1000 -
((time.time() - starttime) % self.pull_interval_ms/1000))
msgs = await self.psub.fetch(self.batch_size)
for msg in msgs:
self.event.emit('message', Message(msg))
try:
time.sleep(self.pull_interval_ms/1000 -
((time.time() - starttime) % self.pull_interval_ms/1000))
msgs = await self.psub.fetch(self.batch_size)
for msg in msgs:
self.event.emit('message', Message(msg))
except Exception as e:
self.event.emit('error', e)
except Exception as e:
self.event.emit('error', e)
raise Exception(e)

async def destroy(self):
"""Destroy the consumer.
Expand All @@ -392,4 +406,4 @@ def ack(self):
self.message.ack()

def get_data(self):
return self.message.data.decode()
return self.message.data