Skip to content

PDP 47 (Pravega Streams Consumption Based Retention)

Prajakta Belgundi edited this page Feb 21, 2022 · 3 revisions

Motivation

Message Deletion in Pravega

  • No event level deletes possible.
  • Stream truncation happens at periodic intervals along Stream-Cuts.
  • At the time of this writing, Stream truncation is agnostic of read positions of Reader Groups and can only be done based on space/time limits.

A Consumption based retention mechanism for a Pravega Stream would ensure:

  1. The stream carries only unconsumed data and is space efficient.

  2. Messages are not deleted before they are consumed by all consumer(subscribers). 

This is particularly useful when using a Pravega Stream as a message queue and in use-cases where it may not be necessary to keep data in the Stream after it has been consumed.

Terminology

Reader Group (RG) - A Pravega Reader Group that reads from one or more Pravega Streams. A Reader Group can have multiple Readers each reading from 1 or more Segments in the Stream.

Retention Policy (RP) - A Stream Configuration that tells on what basis the Stream would be truncated. Currently supported policies are Time and Space based policies. These truncate data in a Stream if it is older than a configured time interval or if Stream size is beyond a certain configured size limit.

Consumption Based Retention (CBR) - A Retention Policy on a Pravega Stream that requires data to be truncated from the Stream only after all Subscribers have read and acknowledged it. A Stream can have both "subscribing" and "non-subscribing" Reader-Groups and only subscribing Reader Groups publish truncation Stream-Cuts to Controller.

Position - A 'Position' object representing the position of a Reader in a Pravega Stream. It is a map of segments owned by the Reader and their read offsets. A Stream-Cut can be generated from a set of "Position" objects for all Readers in the Reader Group.

Stream-Cut - A set of pairs for a single stream that represent a consistent position in the Stream. A Stream-Cut covers the complete key-space at a given point in time. The offset always points to the event boundary and hence there will be no offset pointing to an incomplete event.

Checkpoint - A system event that causes all Readers in a Reader Group to persist their current read positions to State Synchronizer. A Checkpoint is necessary for a Reader Group to be able to resume reading from the Stream after it goes down. With ESR, Check-pointing is also a prerequisite for assignment of unassigned segments in the Stream. For a Reader Group, checkpoints can be generated automatically or explicitly on user request. Automated check-pointing is enabled by default. The lastCheckpoint is stored in ReaderGroupState.

Automated Checkpoint - An automated checkpoint is a checkpoint generated by the Pravega Client at periodic intervals to ensure that Readers persist their read positions. This process is done without any user interaction. Auto-generated checkpoints are not returned to users.

Manual Checkpoint - A manual checkpoint is generated on user request and it is returned to the User when it completes.

Subscriber Reader Group - A "Subscriber RG" is a Reader Group whose read positions are considered for computing the truncation point for a Stream with Consumption Based Retention. When such a Subscribing Reader Group goes down, the system will truncate the Stream based on last published position of this Reader Group and will not truncate any data _not _read by this Reader Group.

Non-Subscriber Reader Group - A Non-Subscriber Reader Group (default) is the one that wants to read from a Stream, but does not want the consumption to impact Stream truncation. When this Reader Group goes down and comes back up, it may have lost some messages in the queue because those were deleted while it was away.

Design

New Concepts

A Stream can be read by one or more Reader Groups and they can start/stop reading from a Stream at any point during its lifetime. The Reader Groups' state, including read positions, is maintained on the Client using the State Synchronizer. Stream truncation, on the other hand, is handled by the Controller but it is not aware of Reader Groups or their read positions.

Stream Subscription

Currently, Pravega Streams do not have a concept of “Subscription”. A Reader Group or set of Reader Groups may be the primary consumers of a Stream, such that once data is read by them, it can be discarded from the Stream. In Consumption based retention, we want that read/processed positions of such a Reader Group must impact the Stream's truncation. This intent is expressed with "Subscription".

A Stream can be read by both "Subscriber" and "Non-Subscriber" reader groups, but the reads of only Subscriber Reader Groups would impact Streams' truncation.

There may be transient Reader Groups that read a Stream temporarily (between Stream Cuts) but we would not want these reads to impact the Stream's truncation policy and hence these should be configured as "Non-Subscriber" Reader Groups.

Acknowledgement using Truncation Stream-Cuts

Users acknowledge till what position in the Stream they have “processed” data and so all data before that point in the Stream can be deleted. With EventStreamReader, since everything till the last checkpoint is considered “processed”, the last checkpoint can be considered as the processed boundary on the Stream, once all Readers have read past it.

Reference: http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/EventRead.html

How CBR would work

  • The Streams' Retention Policy can be set to SPACE/TIME based Retention. It specifies a min/max limit for Retention.
  • Consumption Based Retention gets triggered when at-least one Subscriber Reader Group publishes a Stream-Cut, indicating read positions in the Stream to Controller.
  • If no Reader Groups reading from a Stream are "Subscribers” and there are no pre-existing Subscriber Stream-Cuts in Stream metadata, the Stream would be truncated based on min/max limits specified in the policy.
  • Each Subscriber Reader Group would periodically publish a truncation Stream-Cut to Controller based on its read/processed position in the Stream. This truncation Stream-Cut could be published automatically (Stream-Cut corresponding to last Checkpoint) or as/when specified by the User.
  • On Controller, one truncation Stream-Cut would be stored per Subscriber in Stream metadata. When the truncation cycle runs, a common truncation Stream-Cut would be computed based on truncation Stream-Cuts of all Subscribers and the Stream would be truncated at this Stream-Cut.

Note that publishing a Stream-Cut from Client to Controller does not cause the Stream to be truncated at that Stream-Cut. Truncation would happen "asynchronously" and "eventually" based on the common truncation Stream-Cut as well as min/max limits for the CBR Retention Policy.

image

Client Changes

A Stream with "Consumption" based retention policy can be read by Subscriber as well as non-Subscriber Reader Groups.

Subscriber Reader Groups

A Subscriber Reader Group would periodically publish truncation Stream-Cuts to Controller. Non-Subscriber would not publish these, but would just read from the Stream. An existing Reader Group would be a non-subscriber RG by default.

The following 2 new boolean fields should be added to ReaderGroupConfiguration:

  1. isSubscriber : A reader group can be made a "Subscriber" by setting this flag to true. Default would be false. Setting this flag to true, should invoke Controller "addSubscriber" API to add this ReaderGroup name as "subscriber" for each Stream in the RGConfiguration. Similarly, when a ReaderGroup is deleted, or the isSubscriber flag is flipped to "false", a "removeSubscriber" API call should be made to remove the ReaderGroup from the list of Stream Subscribers.

Note: Reader Group Subscription is a "none" vs "all" concept. A Subscriber RG needs to publish truncation StreamCuts for every Stream in its Configuration, regardless of the Stream's truncation policy on Controller.

  1. autoTruncateAtLastCheckpoint: When this flag is set, we publish the StreamCut corresponding to the lastCheckpoint from Client to Controller as "truncation StreamCut" for the Reader Group.

Updating Reader Group Configuration

ReaderGroupConfiguration can be updated in the following ways:

  1. The set of Streams to read from can be updated in RGConfiguration:
  • If a Stream is removed from RG Configuration, the RG should correspondingly be removed from its list of Subscribers by invoking the removeSubscriber API on Controller.
  • If a Stream is added to RG Configuration, the RG should be added to Subscriber list for the Stream by invoking addSubscriber API on Controller for the new Stream
  1. If a "Subscriber" RG becomes "Non-Subscriber" , the Client should invoke removeSubscriber API on all Streams in the configuration for this RG.

  2. If a "Non-Subscriber" RG becomes "Subscriber, the Client should invoke addSubscriber API on all Streams in the RG configuration.

  3. When a "Subscriber" ReaderGroup is deleted, Client should invoke deleteSubscriber API on Controller to remove the corresponding subscriber from Stream metadata for all Streams in Subscriber Configuration.

Pushing Truncation Stream-Cut to Controller

The Controller should know truncation Stream-cut for each "Subscriber" on the Stream, to be able to arrive at a common truncation Stream-Cut. This Truncation StreamCut needs to be published by each Subscriber ReaderGroup on Client periodically.

If auto-checkpointing is enabled or manual checkpointing is happening and the flag autoTruncateAtLastCheckpoint = true, then Subscriber RG should publish the last generated checkpoint StreamCut as truncationStreamCut to Controller using controller API updateTruncationStreamCut.

If checkpointing is not enabled and there is no manual check-pointing being done, users need to identify Stream-Cuts upto which processing has completed in the Stream and publish this StreamCut to Controller. The Client side API updateTruncationStreamCut can be invoked for this. It would internally invoke the Controller API updateSubscriberStreamCut.

Controller Changes

Controller should support these 3 new APIs :

New APIs

Boolean addSubscriber (String RGName, String scopeName, String streamName, long generation)

Boolean deleteSubscriber (String RGName, String scopeName, String streamName, long generation)

Boolean updateSubscriberStreamCut(String RGName, String scopeName, String streamName, Map<long, long> streamCut)

Add/Delete Subscriber API generations

The 'generation' parameter is added to make sure old API calls from Client do not overwrite the newer requests for adding/removing a Subscriber. When adding or deleting a Subscriber, the Controller would process the operation only if the incoming generation value is greater than the stored value for that Subscriber, else the operation would be a no-op.

Consumption Based Retention

Controller should support Consumption Based Retention Policy on Streams. For this, it would need to maintain a list of Subscribers and their last published truncation Stream-Cuts. At every Truncation Cycle runs, if a Stream has Consumption based retention configured, Controller should pull up Stream-Cuts corresponding to all Subscribers and arrive at a common truncation Stream-Cut and then truncate the Stream at this point.

Note: Additionally, for Streams with Consumption based retention, the truncation process should also generate a new StreamCut and add it to the Retention Set at every truncation cycle. Though this is not useful for CBR, it would be necessary to allow truncations if the policy changes from CBR to a Time/Space based retention policy later.

image

CBR Fallback Policy

Handling Subscriber Failures

  • If no Subscribers publish truncation Stream-Cuts for a very long time, the Stream size can grow unbounded.

  • A space or time based maximum limit [MAX_LIMIT] can be configured in the CBR policy.

  • For example, if space limit = 50G and after consumption based truncation, if Stream Size > 50G, the Stream will be truncated to retain not more than 50 GB.

  • Note: This can cause unconsumed messages to be deleted, and impact delivery guarantees, so specifying this limit is optional.

Handling late Subscribers

  • If a Stream has multiple subscribing RGs, and some subscribers join late, the early subscribers may truncate the Stream before the other Subscribers have had a chance to consume.

  • A space or time based minimum limit [MIN_LIMIT] can be configured in the CBR policy.

  • For example, if time limit = 30 min, prior to consumption based truncation, if the head of Stream is less than 30 mins old, consumption based truncation would not be attempted.

  • Note: While this can cause more data to be retained than necessary, specifying this limit is optional.

Consumption Based Retention Policy

The Consumption based Retention Policy should have the following fields:

Retention Policy Type: CONSUMPTION

Fallback_Limit_Type - NONE/SPACE/TIME Based.

MAX_LIMIT - value (long)

MIN_LIMIT - value (long)

Note that if both Min and Max limits are specified, both can be SPACE based or both can be TIME based.

Specifying a Fallback Policy with a Time based Min limit and a Space based Max limit or vice versa is currently not supported.

If the MAX or MIN limit is left unspecified, it will default certain values, which is 0 for min_limit and LONG_MAX for max_limit.

Changing the Retention policy for a Stream

Moving from NONE/Time/Space based Retention Policy to CBR

If the Retention Policy for a Stream is changed from None/Time/Space based to Consumption based, but RG Configuration is not changed to make it a "Subscriber", the Stream would not get truncated, since the Stream-Cuts would not get published to Controller unless a Reader Group is a Subscriber.

Here it is necessary to update both the RetentionPolicy on the Stream (Controller) as well as the RG configuration on Client for CBR to work. For Streams that were created using older versions of Pravega( prior to this feature) is necessary to first change the Retention Policy of the Stream on Controller to "Consumption" based and then addSubscribers to the Stream on Client.

Steps for moving an existing Stream to Consumption Based Retention Policy.

  • On Controller, Change Stream Retention Policy="CONSUMPTION" based (updateStream API). Specify min/max limits if required.
  • On Client, change configuration for at least one RG reading from the Stream to be a “Subscriber” and specify autoTruncateAtLastChecpoint=true/false depending on weather StreamCuts need to be auto published or provided by Users.

Moving from CBR to Time/Space based Retention

When changing the Retention Policy from CBR to Time/Space based, it may be good to update the RG configuration and switch off the "Subscriber" flag.

This can help reduce unnecessary API calls from Client to Controller for updating the truncation Stream Cut.

It however won't impact working of Time/Space based retention policy.

Since Consumption based retention continues to generate Stream-Cuts at every truncation cycle, after moving to a Time/Space based policy we should have Stream-Cuts in the Retention-Set to truncate at.

Manual Stream Truncation

As in case of Time/Space based Retention, a user can manually truncate a Stream with Consumption based policy.

The truncation would work, but policy related guarantees won't not hold good in this case.

Users are expected to not arbitrarily truncate a Stream manually, if it is being consumed by multiple Reader Groups and has a Retention Policy set.

To enforce this requirement we could have a security policy that allows only "admin" users to have rights to truncate a Stream manually.

FAQ

Can aggressive check-pointing overwhelm the system?

Aggressive checkpointing does not help truncate faster, as Controller truncates only when the truncation cycle runs (every truncation interval) Automatic checkpointing runs every 30 secs by default for each RG. The checkpointing interval is configurable and can be lower bounded to a certain value to prevent over aggressive checkpointing.

Do we support acknowledgments and deletes at per event level ?

No, we only support cumulative acknowledgements.

Can a new Subscriber read messages published prior to its joining?

Yes, a new Subscriber can read from anywhere in the Stream. This could be changed in the future if needed, to limit it to reading only messages published after the Subscriber has joined.

Can a slow subscriber slow down Stream Truncation?

Yes. This can happen. A space/time based upper limit would ensure the Stream does not grow unbounded. If it is important to never loose messages users can skip specifying the upper bound.

Do we have also have a TTL per message with Consumption based retention policy?

The time based upper bound works as a TTL.

Github Issue

https://github.com/pravega/pravega/issues/5108

Clone this wiki locally