Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
- Yaniv Ben Hemo <yaniv@memphis.dev> [@yanivbh1](https://github.com/yanivbh1)
- Sveta Gimpelson <sveta@memphis.dev> [@SvetaMemphis](https://github.com/SvetaMemphis)
- Shay Bratslavsky <shay@memphis.dev> [@shay23b](https://github.com/shay23b)
- Or Grinberg <or@memphis.dev> [@orgrMmphs](https://github.com/orgrMmphs)
- Shoham Roditi <shoham@memphis.dev> [@shohamroditimemphis](https://github.com/shohamroditimemphis)
- Shoham Roditi <shoham@memphis.dev> [@shohamroditimemphis](https://github.com/shohamroditimemphis)
- Ido Naaman <ido@memphis.dev> [idonaaman123](https://github.com/idonaaman123)
69 changes: 51 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
<img src="https://img.shields.io/github/last-commit/memphisdev/memphis-broker?color=61dfc6&label=last%20commit">
</p>

**[Memphis](https://memphis.dev)** is a next-generation message broker.<br>
**[Memphis](https://memphis.dev)** is a next-generation alternative to traditional message brokers.<br><br>
A simple, robust, and durable cloud-native message broker wrapped with<br>
an entire ecosystem that enables fast and reliable development of next-generation event-driven use cases.<br><br>
Memphis enables building modern applications that require large volumes of streamed and enriched data,<br>
modern protocols, zero ops, rapid development, extreme cost reduction,<br>
and a significantly lower amount of dev time for data-oriented developers and data engineers.
an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases.<br><br>
Memphis enables the building of modern queue-based applications that require<br>
large volumes of streamed and enriched data, modern protocols, zero ops, rapid development,<br>
extreme cost reduction, and a significantly lower amount of dev time for data-oriented developers and data engineers.

## Installation

Expand All @@ -55,7 +55,7 @@ $ pip3 install memphis-py

```python
from memphis import Memphis, Headers
from memphis import retention_types, storage_types
from memphis.types import Retention, Storage
```

### Connecting to Memphis
Expand All @@ -69,7 +69,7 @@ async def main():
await memphis.connect(
host="<memphis-host>",
username="<application-type username>",
connection_token="<broker-token>",
connection_token="<broker-token>", # you will get it on application type user creation
port="<port>", # defaults to 6666
reconnect=True, # defaults to True
max_reconnect=3, # defaults to 3
Expand Down Expand Up @@ -108,9 +108,9 @@ _If a station already exists nothing happens, the new configuration will not be
station = memphis.station(
name="<station-name>",
schema_name="<schema-name>",
retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
retention_type=Retention.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
retention_value=604800, # defaults to 604800
storage_type=storage_types.DISK, # storage_types.DISK/storage_types.MEMORY. Defaults to DISK
storage_type=Storage.DISK, # Storage.DISK/Storage.MEMORY. Defaults to DISK
replicas=1, # defaults to 1
idempotency_window_ms=120000, # defaults to 2 minutes
send_poison_msg_to_dls=True, # defaults to true
Expand All @@ -124,35 +124,46 @@ station = memphis.station(
Memphis currently supports the following types of retention:

```python
memphis.retention_types.MAX_MESSAGE_AGE_SECONDS
memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS
```

Means that every message persists for the value set in retention value field (in seconds)

```python
memphis.retention_types.MESSAGES
memphis.types.Retention.MESSAGES
```

Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted

```python
memphis.retention_types.BYTES
memphis.types.Retention.BYTES
```

Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted


### Retention Values

The `retention values` are directly related to the `retention types` mentioned above, where the values vary according to the type of retention chosen.

All retention values are of type `int` but with different representations as follows:

`memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS` is represented **in seconds**, `memphis.types.Retention.MESSAGES` in a **number of messages** and finally `memphis.types.Retention.BYTES` in a **number of bytes**.

After these limits are reached oldest messages will be deleted.

### Storage types

Memphis currently supports the following types of messages storage:

```python
memphis.storage_types.DISK
memphis.types.Storage.DISK
```

Means that messages persist on disk

```python
memphis.storage_types.MEMORY
memphis.types.Storage.MEMORY
```

Means that messages persist on the main memory
Expand Down Expand Up @@ -213,9 +224,9 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py',
```


Creating a producer first
With creating a producer
```python
await prod.produce(
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
ack_wait_sec=15) # defaults to 15
```
Expand Down Expand Up @@ -295,6 +306,28 @@ async def msg_handler(msgs, error, context):
consumer.consume(msg_handler)
```

### Fetch a single batch of messages
```python
msgs = await memphis.fetch_messages(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
generate_random_suffix=False
start_consume_from_sequence=1 # start consuming from a specific sequence. defaults to 1
last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station))
)
```

### Fetch a single batch of messages after creating a consumer
```python
msgs = await consumer.fetch(batch_size=10) # defaults to 10
```


### Acknowledge a message

Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
Expand All @@ -306,7 +339,7 @@ await message.ack()
### Get headers
Get headers per message

``python
```python
headers = message.get_headers()
```

Expand All @@ -328,4 +361,4 @@ consumer.destroy()

```python
memphis.is_connected()
```
```
1 change: 1 addition & 0 deletions examples/consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations

import asyncio

from memphis import Memphis, MemphisConnectError, MemphisError, MemphisHeaderError
Expand Down
1 change: 1 addition & 0 deletions examples/producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations

import asyncio

from memphis import (
Expand Down
9 changes: 3 additions & 6 deletions memphis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import memphis.retention_types
import memphis.storage_types
from memphis.memphis import (
Headers,
Memphis,
from memphis.exceptions import (
MemphisConnectError,
MemphisError,
MemphisHeaderError,
MemphisSchemaError,
)
from memphis.headers import Headers
from memphis.memphis import Memphis
190 changes: 190 additions & 0 deletions memphis/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from __future__ import annotations

import asyncio
import json

from memphis.exceptions import MemphisError
from memphis.utils import default_error_handler, get_internal_name
from memphis.message import Message


class Consumer:
def __init__(
self,
connection,
station_name: str,
consumer_name,
consumer_group,
pull_interval_ms: int,
batch_size: int,
batch_max_time_to_wait_ms: int,
max_ack_time_ms: int,
max_msg_deliveries: int = 10,
error_callback=None,
start_consume_from_sequence: int = 1,
last_messages: int = -1,
):
self.connection = connection
self.station_name = station_name.lower()
self.consumer_name = consumer_name.lower()
self.consumer_group = consumer_group.lower()
self.pull_interval_ms = pull_interval_ms
self.batch_size = batch_size
self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms
self.max_ack_time_ms = max_ack_time_ms
self.max_msg_deliveries = max_msg_deliveries
self.ping_consumer_invterval_ms = 30000
if error_callback is None:
error_callback = default_error_handler
self.t_ping = asyncio.create_task(self.__ping_consumer(error_callback))
self.start_consume_from_sequence = start_consume_from_sequence
self.last_messages = last_messages
self.context = {}
self.dls_messages = []
self.dls_current_index = 0
self.dls_callback_func = None
self.t_dls = asyncio.create_task(self.__consume_dls())

def set_context(self, context):
"""Set a context (dict) that will be passed to each message handler call."""
self.context = context

def consume(self, callback):
"""Consume events."""
self.dls_callback_func = callback
self.t_consume = asyncio.create_task(self.__consume(callback))

async def __consume(self, callback):
subject = get_internal_name(self.station_name)
consumer_group = get_internal_name(self.consumer_group)
self.psub = await self.connection.broker_connection.pull_subscribe(
subject + ".final", durable=consumer_group
)
while True:
if self.connection.is_connection_active and self.pull_interval_ms:
try:
memphis_messages = []
msgs = await self.psub.fetch(self.batch_size)
for msg in msgs:
memphis_messages.append(
Message(msg, self.connection, self.consumer_group)
)
await callback(memphis_messages, None, self.context)
await asyncio.sleep(self.pull_interval_ms / 1000)

except asyncio.TimeoutError:
await callback(
[], MemphisError("Memphis: TimeoutError"), self.context
)
continue
except Exception as e:
if self.connection.is_connection_active:
raise MemphisError(str(e)) from e
else:
return
else:
break

async def __consume_dls(self):
subject = get_internal_name(self.station_name)
consumer_group = get_internal_name(self.consumer_group)
try:
subscription_name = "$memphis_dls_" + subject + "_" + consumer_group
self.consumer_dls = await self.connection.broker_manager.subscribe(
subscription_name, subscription_name
)
async for msg in self.consumer_dls.messages:
index_to_insert = self.dls_current_index
if index_to_insert >= 10000:
index_to_insert %= 10000
self.dls_messages.insert(
index_to_insert, Message(msg, self.connection, self.consumer_group)
)
self.dls_current_index += 1
if self.dls_callback_func != None:
await self.dls_callback_func(
[Message(msg, self.connection, self.consumer_group)],
None,
self.context,
)
except Exception as e:
await self.dls_callback_func([], MemphisError(str(e)), self.context)
return

async def fetch(self, batch_size: int = 10):
"""Fetch a batch of messages."""
messages = []
if self.connection.is_connection_active:
try:
self.batch_size = batch_size
if len(self.dls_messages) > 0:
if len(self.dls_messages) <= batch_size:
messages = self.dls_messages
self.dls_messages = []
self.dls_current_index = 0
else:
messages = self.dls_messages[0:batch_size]
del self.dls_messages[0:batch_size]
self.dls_current_index -= len(messages)
return messages

durableName = ""
if self.consumer_group != "":
durableName = get_internal_name(self.consumer_group)
else:
durableName = get_internal_name(self.consumer_name)
subject = get_internal_name(self.station_name)
consumer_group = get_internal_name(self.consumer_group)
self.psub = await self.connection.broker_connection.pull_subscribe(
subject + ".final", durable=durableName
)
msgs = await self.psub.fetch(batch_size)
for msg in msgs:
messages.append(Message(msg, self.connection, self.consumer_group))
return messages
except Exception as e:
if not "timeout" in str(e):
raise MemphisError(str(e)) from e
else:
return messages

async def __ping_consumer(self, callback):
while True:
try:
await asyncio.sleep(self.ping_consumer_invterval_ms / 1000)
consumer_group = get_internal_name(self.consumer_group)
await self.connection.broker_connection.consumer_info(
self.station_name, consumer_group, timeout=30
)

except Exception as e:
callback(MemphisError(str(e)))

async def destroy(self):
"""Destroy the consumer."""
if self.t_consume is not None:
self.t_consume.cancel()
if self.t_dls is not None:
self.t_dls.cancel()
if self.t_ping is not None:
self.t_ping.cancel()
self.pull_interval_ms = None
try:
destroyConsumerReq = {
"name": self.consumer_name,
"station_name": self.station_name,
"username": self.connection.username,
}
consumer_name = json.dumps(destroyConsumerReq, indent=2).encode("utf-8")
res = await self.connection.broker_manager.request(
"$memphis_consumer_destructions", consumer_name, timeout=5
)
error = res.data.decode("utf-8")
if error != "" and not "not exist" in error:
raise MemphisError(error)
self.dls_messages.clear()
internal_station_name = get_internal_name(self.station_name)
map_key = internal_station_name + "_" + self.consumer_name.lower()
del self.connection.consumers_map[map_key]
except Exception as e:
raise MemphisError(str(e)) from e
Loading