Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 39 additions & 35 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Memphis:

def __init__(self):
self.is_connection_active = False

async def connect(self, host, username, connection_token, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000):
"""Creates connection with Memphis.
Args:
Expand All @@ -72,20 +72,19 @@ async def connect(self, host, username, connection_token, port=6666, reconnect=T
self.timeout_ms = timeout_ms
self.connection_id = self.__generateConnectionID()
try:
self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port),
allow_reconnect=self.reconnect,
reconnect_time_wait=self.reconnect_interval_ms/1000,
connect_timeout=self.timeout_ms/1000,
max_reconnect_attempts=self.max_reconnect,
token=self.connection_token,
name=self.connection_id + "::" + self.username, max_outstanding_pings=1)
self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port),
allow_reconnect=self.reconnect,
reconnect_time_wait=self.reconnect_interval_ms/1000,
connect_timeout=self.timeout_ms/1000,
max_reconnect_attempts=self.max_reconnect,
token=self.connection_token,
name=self.connection_id + "::" + self.username, max_outstanding_pings=1)

self.broker_connection = self.broker_manager.jetstream()
self.is_connection_active = True
except Exception as e:
raise Exception(e)


async def station(self, name, retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0):
"""Creates a station.
Args:
Expand All @@ -102,7 +101,7 @@ async def station(self, name, retention_type=retention_types.MAX_MESSAGE_AGE_SEC
try:
if not self.is_connection_active:
raise Exception("Connection is dead")

createStationReq = {
"name": name,
"retention_type": retention_type,
Expand All @@ -111,13 +110,14 @@ async def station(self, name, retention_type=retention_types.MAX_MESSAGE_AGE_SEC
"replicas": replicas,
"dedup_enabled": dedup_enabled,
"dedup_window_in_ms": dedup_window_ms
}
create_station_req_bytes = json.dumps(createStationReq, indent=2).encode('utf-8')
err_msg = await self.broker_manager.request("$memphis_station_creations", create_station_req_bytes)
err_msg = err_msg.data.decode("utf-8")
}
create_station_req_bytes = json.dumps(
createStationReq, indent=2).encode('utf-8')
err_msg = await self.broker_manager.request("$memphis_station_creations", create_station_req_bytes)
err_msg = err_msg.data.decode("utf-8")

if err_msg != "":
raise Exception(err_msg)
raise Exception(err_msg)
return Station(self, name)

except Exception as e:
Expand Down Expand Up @@ -165,25 +165,25 @@ async def producer(self, station_name, producer_name):
try:
if not self.is_connection_active:
raise Exception("Connection is dead")

createProducerReq = {
"name": producer_name,
"station_name": station_name,
"connection_id": self.connection_id,
"producer_type": "application"
}
create_producer_req_bytes = json.dumps(createProducerReq, indent=2).encode('utf-8')
err_msg = await self.broker_manager.request("$memphis_producer_creations", create_producer_req_bytes)
err_msg = err_msg.data.decode("utf-8")
}
create_producer_req_bytes = json.dumps(
createProducerReq, indent=2).encode('utf-8')
err_msg = await self.broker_manager.request("$memphis_producer_creations", create_producer_req_bytes)
err_msg = err_msg.data.decode("utf-8")

if err_msg != "":
raise Exception(err_msg)
raise Exception(err_msg)
return Producer(self, producer_name, station_name)

except Exception as e:
raise Exception(e)


async def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10):
"""Creates a consumer.
Args:.
Expand All @@ -202,7 +202,7 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in
if not self.is_connection_active:
raise Exception("Connection is dead")
cg = consumer_name if not consumer_group else consumer_group

createConsumerReq = {
'name': consumer_name,
"station_name": station_name,
Expand All @@ -211,18 +211,19 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in
"consumers_group": consumer_group,
"max_ack_time_ms": max_ack_time_ms,
"max_msg_deliveries": max_msg_deliveries
}
}

create_consumer_req_bytes = json.dumps(createConsumerReq, indent=2).encode('utf-8')
err_msg = await self.broker_manager.request("$memphis_consumer_creations", create_consumer_req_bytes)
err_msg = err_msg.data.decode("utf-8")
create_consumer_req_bytes = json.dumps(
createConsumerReq, indent=2).encode('utf-8')
err_msg = await self.broker_manager.request("$memphis_consumer_creations", create_consumer_req_bytes)
err_msg = err_msg.data.decode("utf-8")

if err_msg != "":
raise Exception(err_msg)
raise Exception(err_msg)
return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries)

except Exception as e:
raise Exception(e)
raise Exception(e)


class Station:
Expand All @@ -235,7 +236,7 @@ async def destroy(self):
"""
try:
nameReq = {
"station_name":self.name
"station_name": self.name
}
station_name = json.dumps(nameReq, indent=2).encode('utf-8')
res = await self.connection.broker_manager.request('$memphis_station_destructions', station_name)
Expand Down Expand Up @@ -279,7 +280,7 @@ async def destroy(self):
"name": self.producer_name,
"station_name": self.station_name
}

producer_name = json.dumps(destroyProducerReq).encode('utf-8')
res = await self.connection.broker_manager.request('$memphis_producer_destructions', producer_name)
error = res.data.decode('utf-8')
Expand Down Expand Up @@ -347,13 +348,16 @@ async def __ping_consumer(self):
async def destroy(self):
"""Destroy the consumer.
"""
self.t_consume.cancel()
self.t_dlq.cancel()
self.pull_interval_ms = None
try:
destroyConsumerReq = {
"name": self.consumer_name,
"station_name":self.station_name
"station_name": self.station_name
}
consumer_name = json.dumps(destroyConsumerReq, indent=2).encode('utf-8')
consumer_name = json.dumps(
destroyConsumerReq, indent=2).encode('utf-8')
res = await self.connection.broker_manager.request('$memphis_consumer_destructions', consumer_name)
error = res.data.decode('utf-8')
if error != "" and not "not exist" in error:
Expand Down
2 changes: 0 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
[metadata]
description-file = README.md
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
setup(
name='memphis-py',
packages=['memphis'],
version='0.1.9',
version='0.2.0',
license='GPL',
description='A powerful messaging platform for modern developers',
long_description = 'file: README.md',
long_description_content_type = 'text/markdown; charset=UTF-8; variant=GF',
long_description='file: README.md',
long_description_content_type='text/markdown; charset=UTF-8; variant=GF',
readme="README.md",
author='Memphis.dev',
author_email='team@memphis.dev',
url='https://github.com/memphisdev/memphis.py',
download_url='https://github.com/memphisdev/memphis.py/archive/refs/tags/v0.1.8.tar.gz',
download_url='https://github.com/memphisdev/memphis.py/archive/refs/tags/v0.2.0.tar.gz',
keywords=['message broker', 'devtool', 'streaming', 'data'],
install_requires=[
'asyncio',
Expand Down