Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ consumer = await memphis.consumer(
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
generate_random_suffix=False
start_consume_from_sequence=1 # start consuming from a specific sequence. defaults to 1
last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station)
)
```

Expand Down Expand Up @@ -283,6 +285,13 @@ Get headers per message
headers = message.get_headers()
```

### Get message sequence number
Get message sequence number

```python
sequence_number = msg.get_sequence_number()
```

### Destroying a Consumer

```python
Expand Down
29 changes: 25 additions & 4 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ async def start_listen_for_schema_updates(self, station_name, schema_update_data
station_name, self.schema_updates_subs[station_name].messages))
self.schema_tasks[station_name] = task

async def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10, generate_random_suffix=False):
async def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10, generate_random_suffix=False, start_consume_from_sequence=1, last_messages=-1):
"""Creates a consumer.
Args:.
station_name (str): station name to consume messages from.
Expand All @@ -417,7 +417,8 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in
max_ack_time_ms (int, optional): max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it. Defaults to 30000.
max_msg_deliveries (int, optional): max number of message deliveries, by default is 10.
generate_random_suffix (bool): false by default, if true concatenate a random suffix to consumer's name

start_consume_from_sequence(int, optional): start consuming from a specific sequence. defaults to 1.
last_messages: consume the last N messages, defaults to -1 (all messages in the station).
Returns:
object: consumer
"""
Expand All @@ -429,6 +430,14 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in
consumer_name = self.__generateRandomSuffix(consumer_name)
cg = consumer_name if not consumer_group else consumer_group

if start_consume_from_sequence <= 0:
raise MemphisError("start_consume_from_sequence has to be a positive number")

if last_messages < -1:
raise MemphisError("min value for last_messages is -1")

if start_consume_from_sequence > 1 and last_messages > -1 :
raise MemphisError("Consumer creation options can't contain both start_consume_from_sequence and last_messages")
createConsumerReq = {
'name': consumer_name,
"station_name": station_name,
Expand All @@ -437,6 +446,9 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in
"consumers_group": consumer_group,
"max_ack_time_ms": max_ack_time_ms,
"max_msg_deliveries": max_msg_deliveries,
"start_consume_from_sequence": start_consume_from_sequence,
"last_messages": last_messages,
"req_version": 1,
"username": self.username
}

Expand All @@ -448,7 +460,7 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in
if err_msg != "":
raise MemphisError(err_msg)

return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries)
return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries, start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages)

except Exception as e:
raise MemphisError(str(e)) from e
Expand Down Expand Up @@ -745,7 +757,7 @@ async def default_error_handler(e):


class Consumer:
def __init__(self, connection, station_name, consumer_name, consumer_group, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries=10, error_callback=None):
def __init__(self, connection, station_name, consumer_name, consumer_group, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries=10, error_callback=None, start_consume_from_sequence=1, last_messages=-1):
self.connection = connection
self.station_name = station_name.lower()
self.consumer_name = consumer_name.lower()
Expand All @@ -759,6 +771,8 @@ def __init__(self, connection, station_name, consumer_name, consumer_group, pull
if error_callback is None:
error_callback = default_error_handler
self.t_ping = asyncio.create_task(self.__ping_consumer(error_callback))
self.start_consume_from_sequence = start_consume_from_sequence
self.last_messages= last_messages

def consume(self, callback):
"""Consume events.
Expand Down Expand Up @@ -882,6 +896,13 @@ def get_headers(self):
except:
return

def get_sequence_number(self):
"""Get message sequence number.
"""
try:
return self.message.metadata.sequence.stream
except:
return

def random_bytes(amount: int) -> str:
lst = [random.choice('0123456789abcdef') for n in range(amount)]
Expand Down