Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @idanasulinmemphis @shohamroditimemphis
* @idanasulinmemphis @rnowling-memphis
45 changes: 39 additions & 6 deletions memphis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,40 @@ def set_context(self, context):
self.context = context

def consume(self, callback):
"""Consume events."""
"""
This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received.

Parameters:
callback (function): A function that will be called with each batch of messages received. The function should have the following signature:
- `callback(messages: List[Message], error: Optional[MemphisError], context: Dict) -> Awaitable[None]`
- `messages`: A list of `Message` objects representing the batch of messages received.
- `error`: An optional `MemphisError` object if there was an error while consuming the messages.
- `context`: A dictionary representing the context that was set using the `set_context()` method.

Example:
import asyncio
from memphis import Memphis

async def message_handler(messages, error, context):
if error:
print(f"Error occurred: {error}")
return

for message in messages:
print(f"Received message: {message}")

async def main():
memphis = Memphis()
await memphis.connect(host='localhost', username='user', password='pass')
consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group')
consumer.set_context({'key': 'value'})
consumer.consume(message_handler)

# Keep the event loop running
while True:
await asyncio.sleep(1)
asyncio.run(main())
"""
self.dls_callback_func = callback
self.t_consume = asyncio.create_task(self.__consume(callback))

Expand Down Expand Up @@ -129,7 +162,7 @@ async def fetch(self, batch_size: int = 10):

from memphis import Memphis

async def main(/, host, username, password, station):
async def main(host, username, password, station):
memphis = Memphis()
await memphis.connect(host=host,
username=username,
Expand All @@ -149,10 +182,10 @@ async def main(/, host, username, password, station):
await memphis.close()

if __name__ == '__main__':
asyncio.run(main(host=host,
username=username,
password=password,
station=station))
asyncio.run(main(host,
username,
password,
station))

"""
messages = []
Expand Down
5 changes: 2 additions & 3 deletions memphis/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,12 @@ async def produce(
schemaverse_fail_alert_type,
)
raise e
except Exception as e:
except Exception as e: # pylint: disable-next=no-member
if hasattr(e, "status_code") and e.status_code == "503":
raise MemphisError(
"Produce operation has failed, please check whether Station/Producer still exist"
)
else:
raise MemphisError(str(e)) from e
raise MemphisError(str(e)) from e

async def destroy(self):
"""Destroy the producer."""
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
setup(
name="memphis-py",
packages=["memphis"],
version="1.0.5",
version="1.0.6",
license="Apache-2.0",
description="A powerful messaging platform for modern developers",
long_description=long_description,
Expand All @@ -17,7 +17,7 @@
author="Memphis.dev",
author_email="team@memphis.dev",
url="https://github.com/memphisdev/memphis.py",
download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.5.tar.gz",
download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.6.tar.gz",
keywords=["message broker", "devtool", "streaming", "data"],
install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"],
classifiers=[
Expand Down
2 changes: 1 addition & 1 deletion version-beta.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.7
1.0.8
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.5
1.0.6