Skip to content

PDP 47 (Pravega Consumption Based Retention)

Derek Moore edited this page Jul 9, 2021 · 1 revision

Introduction

Message Queues vs Event logs

Messaging systems are at the heart of event driven and micro-services based architectures.

Messaging systems can be classified as queue-based vs log-based.

Messaging queues Message queues enable asynchronous application to application communication.

A queue holds a "sequence of messages" generated by one application, waiting to be processed by one or more downstream applications.

A message typically contains information about a finished task and would be considered by the downstream systems as a trigger to start processing the next task.

Messages are transient, they need to be stored on the queue only until they are processed and can then be deleted.

In most cases, the very act of reading from a queue removes the message.

The sole purpose of these messages is to facilitate application to application communication and hence there is no need for historical storage or processing.

Message Queues help decouple heavyweight processing by buffering or batching work, and thus help smooth spiky workloads.

Message Queues can be point to point or publish-subscribe (one-to-many, many-to-many).

Example Queuing Systems: RabbitMQ, ActiveMQ, MSMQ, AWS SQS, JMQ and many more.

Event Logs Event Logs store streaming data that is persistent in nature. Reading from logs does not remove the data.

Multiple consumers can read from the same log at the same time from different positions. Data needs to be stored over longer term for historical processing.

Examples of event log data include data emitted by an IOT sensor or component logs aggregated from multiple hosts.

Event logs typically have a publish-subscribe (many-to-many) architecture.

Example Event-Log Systems: Apache Kakfa, Apache Pulsar, AWS Kinesis, Azure Event Hubs and many more.

Objective

A Pravega Stream is inherently designed to be an event log for real-time and historical processing of data.

Though it can be used as a "queue", its efficacy in this case is lower.

The objective of this feature is to provide enhanced support for the use of Pravega Streams as Message Queues, including in low-footprint environments (IoT Edge), without adversely impacting its use as an "event-log". The idea is not to build a full-featured MOM (Message Oriented Middleware) or compete with existing MOMs (RabbitMQ, SQS-SNS, etc.....) 

Push vs Pull based Message Queues

Messaging queuing systems can be push based (brokers 'push' messages from the queue to the consumer) or pull based (consumers 'pull' messages from the queue).

Most traditional messaging queuing systems are "push" based. Example: Rabbit MQ, Active MQ

"Kafka" is a pull based system where some newer systems like Pulsar and Google Pub-Sub provide both options of "pull" as well as "push" based message queues.

Requirements

Message Queue features provided by Pravega Streams today:

Ability to asynchronously send/receive messages between a publisher and subscriber. Fan-out/fan-in options: 1-to-1, 1-to-many, Many-to-1 and many-to-many. Tracking read positions in the queue.  (State Synchronizer)  Durable writes.  Durable subscriptions - If the consumer is down, it’ll still receive messages after it comes back up.  Message delivery guarantee – at least once.  Message delivery mechanism – pull based.   Real-time message delivery with high availability and consistent performance at scale.  Batching Support: Messages can be published and consumed in batches.  Message Ordering – ordered by routing key.  Most Message Brokers do not store messages for long-term storage (archival) and messages are deleted after all subscribers have consumed them. 

This is a feature Pravega Streams lack today. 

In Scope

Address two major gaps in the use of Pravega Streams as an MQ: 

Consumption Based Retention Pravega Streams provide support for time and space-based retention policies. With these, we do not track if the messages being truncated have been consumed by Readers (Subscribers). However, in case of MQs, messages need to be retained at least till all Durable-Subscribers have consumed them.

Enabling consumption-based retention on Pravega Streams ensures: 

 The stream carries only unconsumed data and is space efficient.  Messages are not deleted before they are consumed by all Subscribers.  Optional Long-Term Storage LTS is necessary for Pravega to operate. MQs do not need Long Term Storage and so mandating it works against the use of Pravega Streams as MQs in low-resource environments (IoT Edge Deployments).   

Out of Scope

Here are some advance features offered by Message Queuing systems that are currently out of scope for Pravega MQ but could be considered in the future:

  • Publisher Rate Limiting
  • AMQP Support - https://www.amqp.org/product/solve 
  • Seek and replay: Rewind your backlog to any point in time or a snapshot, giving the ability to reprocess the messages and Fast forward to discard outdated data. 
  • Delay Queues - Ability to delay the delivery of messages for a certain time period since they were published. (Provided by SQS and Pulsar)
  • Tagging support: ex: cost allocation tags in case of SQS. - It should be possible to implement this with KVTables.
  • Support for large messages - In the works - PDP-43-(Large-Events)
  • Design Currently Pravega is a pull based system, and moving to a push based architecture does not seem to provide any advantages with Message Queuing use-cases.

Right now we stick to the **push ** based model for reasons mentioned in section 1.3.

At a later point, it should be possible also implement a "push" based system using existing building blocks, if we feel the need for the same.

Terminology Term Description

Reader Group (RG) A Pravega Reader Group that reads from one or more Pravega Streams.

A Reader Group can have multiple Readers each reading from 1 or more Segments in the Stream.

Retention Policy (RP) A Stream Configuration that tells on what basis the Stream will be truncated. Currently supported policies are Time and Space based policies. These truncate data in a Stream if it is older than a configured time interval or if Stream size is beyond a certain configured size limit. Consumption Based Retention Policy (CBR) A Retention Policy on a Pravega Stream that requires data to be truncated from the Stream only after all Subscribers have read it.

A Stream can have both "durable" and "non-durable" Subscribers and read positions of only "durable" subscribers will be tracked by CBR policy for truncation.

Stream-Cut A set of pairs for a single stream that represent a consistent position in the Stream.

A Stream-Cut covers the complete key-space at a given point in time. The offset always points to the event boundary and hence there will be no offset pointing to an incomplete event.

Checkpoint A special event that causes all Readers in a Reader  Group to persist its current read positions to State Synchronizer.

For a Reader Group, checkpoints can be generated automatically (periodically) or explicitly on user request.

Publisher An Pravega writer application that writes messages to a Pravega Stream, when it is used as a Message Queue. Durable Subscriber

or (Subscriber)

In "push" based messaging systems, a "durable subscriber" is a message consumer that is guaranteed to receive all messages published to a queue including messages published while it was inactive. The Message Broker stores messages (subject to space/time constraints) till the Subscriber is back and then delivers those messages to the Subscriber.

For Pravega MQ, a "durable subscriber" (referred to as "Subscriber" in the rest of this document) is a Reader Group whose read positions are considered for computing the truncation point for a Stream with Consumption Based Retention. When such a Subscribing Reader Group goes down, the system will truncate the Stream based on last published checkpoint of this Reader Group and will not truncate any data not read by this Reader Group. However if a new checkpoint is not updated by the Reader Group within a certain "timeout" time units, it last updated checkpoint will be ignored and truncation will happen based on read positions of rest of the Subscribers.

Non-Durable Subscriber

or (Non-Subscriber)

In "push" based messaging systems, a "non-durable" subscriber is a message consumer that receives only those messages that published to the queue when it is online. Messages published while the subscriber was inactive are lost.

For Pravega MQ, a non-durable subscriber is a Reader Group that can pull messages as long as it is up. However, the read position of this Reader Group won't be considered when computing the truncation point for a Stream with Consumption based retention policy. When this Reader Group goes down and comes back up, it may have lost some messages in the queue because those were deleted while it was away. In a way, all current Reader Groups are non-durable subscribers of Pravega Streams.

Consumption Based Retention A Consumption Based Retention Policy on Streams, guarantees that a message will not be deleted until all subscribed Reader Groups have read it.

Currently, Pravega Streams do not have a concept of “Subscription”.

A Stream can be read by one or more Reader Groups and they can start/stop reading from a Stream at any point during its lifetime.

The Reader Groups' state, including read positions, is maintained on the Client using the State Synchronizer.

Stream truncation, on the other hand, is handled by the Controller but it is not aware of Reader Groups or their read positions.

If we want a Stream to be truncated based on read/acknowledged positions of its Reader Group(s), it is necessary that these positions be published to the Controller at regular intervals.

Subscriber vs. Non-Subscriber Reader Groups

A Reader Group or set of Reader Groups may be the primary consumers of a Stream, such that once data is read by them, it can be discarded from the Stream. 

In Consumption based retention, we want that read/processed positions of such a Reader Group must impact the Stream's truncation.

This intent is expressed with "Subscription".

A Stream can be read by both "Subscriber" and "Non-Subscriber" reader groups, but the reads of only Subscriber Reader Groups would impact Streams' truncation.

There may be transient Reader Groups that read a Stream temporarily (between Stream Cuts) but we would not want these reads to impact the Stream's truncation policy and hence these should be configured as "Non-Subscriber" Reader Groups.

Reader Group Subscription Creating New Subscriber Reader Groups When creating a new Reader Group, the ReaderGroupConfiguration currently specifies :-

Streams & StreamCuts from which the RG would read messages/events. A flag indicating if automatic-checkpointing is enabled. Two new boolean fields would be added to Reader Group Configuration:

Sl

No

Feild Name Data Type Description Default Value

  1. isSubscriber boolean A flag to indicate if the RG should be a "Subscriber" for all CBR Streams false
  2. truncateAtLatestCheckpoint boolean A flag that indicates if Client should auto-publish all check-points (automated & manual) as truncation Stream-Cuts to Controller. This will mean as soon as data is read by an RG it will be eligible for truncation.

Most applications may want to retain data till each read event is also processed and then make it available for truncation. Such applications should instead do manual check-pointing and choose a certain manual checkpoint as truncation checkpoint.

false

When a new RGs is created with the "isSubscriber" flag set, the Client would invoke addSubscriber API on Controller, for each Stream in the RG Configuration :

void addSubscriber (String RGName, String scopeName, String streamName) On Controller, the RG name would be stored as a "Subscriber" in the Stream's metadata tables.

Note that this would be stored irrespective of the Streams current Retention Policy, as the policy could change to CBR in the future and adding RGs as Subscribers at that point is not possible.

When a Reader Group is deleted, its entry should be removed from the Subscribers' list in Stream metadata for each Stream in RG configuration.

This can be done by invoking this Controller API from Client:

void removeSubscriber (String RGName, String scopeName, String streamName) Updating Reader Group Configuration ReaderGroupConfiguration can be updated in the following ways:

The set of Streams to read from can be updated. If a Stream is removed from RG Configuration, the RG should correspondingly be removed from its list of Subscribers by invoking the removeSubscriber API on Controller. If a Stream is added to RG Configuration, the RG should be added to Subscriber list for the Stream by invoking addSubscriber API on Controller for the new Stream 2. If a "Subscriber" RG becomes "Non-Subscriber" , the Client should invoke removeSubscriber API on all Streams in the configuration for this RG.

  1. If a "Non-Subscriber" RG becomes "Subscriber, the Client should invoke addSubscriber API on all Streams in the RG configuration.

Updating Truncation Stream-Cut on Controller The Controller should know truncation Stream-cut for each "Subscriber" on the Stream, to be able to arrive at a common truncation Stream-Cut.

RGs regularly update their "State" (including read positions) in the State Synchronizer. The set of "read positions" for each Reader in the RG are written as a Checkpoint in RG State.

Checkpoints can be updated periodically using automatic check-pointing or generated explicitly using the initiateCheckpoint() API (manual checkpoints) by the user.

Automated Check-pointing Checkpoints are events that help flush "read" positions of a Reader Group to State Synchronizer. So truncating a Stream based on Stream-Cut generated from the most recent checkpoint would indicate we are fine with truncating all data that ihas been "read".

Note that choosing to truncate the Stream at every checkpoint, could cause events to be deleted after they have been read but before they have been processed.

With the RG Configuration truncateAtLatestCheckpoint=true set, the StreamCut corresponding to each new checkpoint would be published as "Truncation StreamCut" to the Controller and this Stream-Cut will be used by the Controller in truncating the Stream.

Manual Check-pointing For a lot of MQ usecases, users may want to make an event eligible for truncation only after it has been completely processed and not just after reading it. These applications should set truncateAtEveryCheckpoint=false and regularly generate manual checkpoints on the Stream.

Once an RG has completed processing messages beyond a certain manual checkpoint (the user application needs to track this), that checkpoint can be explicitly updated as truncationCheckpoint using below Client API:

updateTruncationCheckpoint(String RGName,  String checkpointName) Currently, only the last checkpoint is persisted in State Synchronizer.

This needs to change. When initiating a manual checkpoint the user can indicate if the checkpoint should be persisted. Only "persisted" checkpoints can be candidates for truncation.

At every checkpoint update, manual checkpoints that have expired (lie before current Stream head) would need to be deleted from the State Synchronizer, to ensure this list does not grow forever.

Internally updateTruncationCheckpoint() API would update the stream-cut corresponding to this Checkpoint as "truncation stream-cut" on Controller.

Controller API for updating truncation Stream-Cut

Both user generated and automated checkpoints can be updated to Controller using this Controller API :

boolean updateTruncationStreamCut(String RGName, Map<ScopedStreamName, StreamCut sc> streamCutPerStream) This API would be idempotent. Controller would store the timestamp of this Checkpoint along with above information in its metadata tables.

For "non-subscriber" RGs, the Client would skip this API call, as its read positions are not required to be known to Controller.

Stream Truncation The truncation process on Controller runs at regular intervals.

For every Stream that has a Retention Policy set, the truncation process does the following:

Creates new Stream-Cut that points to the tail of the stream. Looks at existing Stream Cuts for the Stream and finds the one that satisfies the retention policy. Truncates the Stream at the matching Stream Cut. For CBR, Step 2 would be replaced with 'Generating a new truncation Stream-Cut based on Stream-Cuts of all valid Subscribing RGs for the Stream' and the Stream would be truncated at this Stream-Cut.

Handling failing Subscribers If an RG does not update any checkpoints to the Controller for more than 'x' minutes, where 'x' is specified in the CBR Policy for the Stream, the last checkpoint for that RG will not be considered in computing the truncation Stream-Cut.

The RG however will not be removed from the list of Subscribers until an explicit "removeSubscriber()" API call is made on the Controller.

If such a failing RG comes back up in the future, it will be able to start consuming again from the Stream and once it starts updating its new checkpoints, these will be considered for arriving at the truncation Stream-Cut from that point on-wards.

When all Subscribers fail . . . In case of a Stream with CBR, when the truncation cycle runs, if there are no active subscribers (based on the configured timeout), the Stream will not be truncated in that cycle.

If this continues for multiple truncation cycles, the Stream Size could grow beyond the provisioned space limits.

There is clearly a trade-off between retaining messages for future consumption vs. space reclamation and different use-cases may choose one over the other.

Fall-back policy When space reclamation is more important than loosing unconsumed messages, a Time/Space based retention policy can be configured as a "fall back" policy to CBR.

When no Subscribers are active for over last 'y' minutes, where 'y' is configured in the CBR policy, a "fall-back" retention policy would kick-in and the Stream would get truncated as per this policy.

Note that Time/Space based retention policies require Stream-cuts to be present on the Stream.

Hence, at every truncation cycle, the Controller would generate Stream-Cuts for each Stream that has retention policy configured, including CBR Streams.

This is necessary because when a CBR Stream falls back on Time/Space based retention, that policy should have Stream-Cuts to truncate at.....

The fall-back policy configuration can skipped in cases when users never want to lose unconsumed data.

Here users can rely on metrics to understand if messages are piling up in the Stream and based on alerts the admin could choose to do the following:

Stop publishing messages to the Stream. This is just a manual way of doing, "publisher rate limiting" which is an out of scope feature at this point. Check why RGs are down and take necessary steps to bring them back up. Policy Configuration A Consumption Based Retention Policy on a Stream would have the following parameters:

Retention Type = "CONSUMPTION" Subscriber Timeout - 'x' minutes  - If a Reader group has not updated its checkpoint to the Controller in the last 'x' minutes, its Stream Cut will not be considered when generating the truncation Stream-Cut. Fallback Policy timeout - 'y' minutes If this Stream does not have any "active" readers for the last 'y' minutes, we disable CBR and fall back to "Time/Space" based retention policy.            Users can also skip configuring a fall-back policy.

      4. If a fall-back policy is opted for, fallback policy configuration which is essentially configuration for a Space/Time based Retention policy.

Summary For a CBR policy to work all of the following must be true:

a. Retention Policy on the Stream must be set to CBR.

b. At least 1 RG reading from the Stream must be a "Subscriber".

c. The Subscriber RG configuration should

(i) Have automatic check-pointing enabled with RG Configuration allowing automatic publishing of generated checkpoints

OR

(ii) Generate manual checkpoints with RG Configuration disabling automated publishing of checkpoints

User updates the Truncation Checkpoint using the  updateTruncationCheckpoint() API on Client.

Internally this API extracts the Stream-Cut from the said checkpoint and invokes updateTruncationStreamCut() on Controller for this Stream-Cut.

Changing the Retention Policy for a Stream Moving from Time/Space based Retention to CBR If the Retention Policy for a Stream is changed from Time/Space based to CBR, but RG Configuration is not updated to make them "Subscribers" , the Stream would not get truncated as per CBR, since the Stream configuration does not show any Subscribers with their checkpoints.

Here it is necessary to update both the RetentionPolicy on the Stream (Controller) as well as the RG configuration on the Client.

Moving from CBR to Time/Space based Retention When updating RetentionPolicy from CBR to Time/Space based Retention, it may be good to update the RG configuration and switch off the "Subscriber" flag.

This can help reduce unnecessary API calls from Client to Controller for checkpoint update. It however won't impact working of Time/Space based retention policy.

Metrics for CBR The following metrics would be useful for a Stream with CBR Policy:

Number of unconsumed messages in a CBR Stream. Number of CBR Streams with 0 Subscribers Per Subscriber read rate from the Stream (to know a slow Subscriber)

Manual Stream Truncation As in case of Streams with Time/Space based Retention policies, a user can manually truncate a Stream with CBR.

The truncation would work, but policy related guarantees won't not hold good in this case.

Users are not expected to arbitrarily truncate a Stream manually, if it is being consumed by multiple Reader Groups and has a Retention Policy set.

But to enforce this we could have a security policy (rights for "truncate Stream operation) that only an "admin" user can have. 

FAQ Q. Do we support acknowledgement and delete at per event level  ?

No, we only support Cumulative acknowledgement with the updateTruncationCheckpoint() call.

This would be a lot more performant and scalable than to doing it at per event level.

"Push" based systems need "ack" for each event because that is how they maintain the queue head pointer 

In our case the, Stream head and "truncation" Stream-Cut for an RG can be different.

Q. If a new Subscriber RG joins the Stream can it read from anywhere in the Stream, including events published prior to his joining the Stream?

Yes, currently a new Subscriber can read all/any subset of messages starting from the head of the Stream and not just the ones that were published after it subscribed to the Stream.

Some MQ systems have this behavior and it may be required in certain use-cases.

This feature is about "Consumption Based Retention" and we don't want to be getting  complexities of building a full-fledged MQ system as part of this.

Q. Can a slow subscriber or large subscriber timeout cause a very small portion of the Stream to be truncated and the Stream to grow to a large size?

Yes. This can happen. A slow subscriber will slow down event truncation in any system.

Setting a large subscriber timeout is needed by applications where data loss cannot be tolerated even if it means wasting some extra storage space on retaining messages.

Q. Do we have also have a TTL for messages with Consumption based retention policy ?

(Pulsar has this)

No. As long as the retention policy is set to CBR, and at-least 1 Subscriber is up, truncation will be based solely on stream-cuts published by the active subscriber.

Q. Can the positions persisted by Readers to State Synchronizer (on a checkpoint) be considered acknowledgement for events consumed by a Reader Group?

No. "Readers" in a reader group are Pravega constructs and publish only read positions.

There is no way for a user application to change/update the read positions of a Reader based on their processing of events.

So these are just plain "read" positions and cannot be considered "user acknowledgements" for having consumed and processed the event.

Limitations We support only cumulative acknowledgements and truncation. No support for acknowledgement or deletion at per event level.  (I don't see why this should be necessary...but with the current design this is not supported) No way to truncate a Stream based on user defined Stream-Cut. There is currently no way for a user create a custom Stream-Cut to deterministically mark a position in a Stream. Batch Clients do not have a concept of "Stream-Cuts" and read events in any order. Byte Streams also do not have an "event" concept and so checkpoints can't be created on them. Consumption Based Retention cannot work with both these type of Streams. Native Client needs to implement "Checkpoints" since these are necessary for Consumption Based Retention to work. Open Questions Can aggressive check-pointing and publishing of checkpoints overload the system? Auto-publishing automatic checkpoints every 30 secs can easily overwhelm the system if we have a large number of Streams and RGs. Most applications do not want to delete on reads which is why all push based systems need an explicit "ack" from consumers before they can delete messages.             We should Make "manual check-pointing" mandatory for CBR. Do not publish automatic checkpoints.

      3.  Should subscriber timeout be configurable per Stream or per RG? Right now it is per Stream

JIRA

Optional LTS

Milestones in the Pravega MQ Story Number

MQ Feature

Description

Release  

M1 

Consumption based Retention   Pravega Streams should have the ability to retain only unconsumed data, when retention policy is configured to be “consumption based” 

  

 Pravega 0.9

M2 

Optional LTS - Remove Pravega dependency on LTS    

  

  

Pravega should be able to operate as a Stream Store and an MQ with and without LTS. 

 

M3 

AMQP Support,   Publisher/Subscriber Rate limit   

TBD 

M4 

Support for Large Messages Seek and replay support   Tagging Support  TBD 

 

References: Why Message Queues?

https://stackify.com/message-queues-12-reasons/

MQ features:

https://aws.amazon.com/message-queue/features/

AMQP

https://www.cloudamqp.com/blog/2019-11-21-what-is-amqp-and-why-is-it-used-in-rabbitmq.html 

Pulsar Message Queues:

https://pulsar.apache.org/docs/en/cookbooks-message-queue/

Google Pub-Sub:

https://cloud.google.com/pubsub/architecture

https://cloud.google.com/pubsub/docs/subscriber

https://cloud.google.com/pubsub/docs/subscriber#delivery

Clone this wiki locally