Skip to content

PDP 26 (Ingestion Watermarks)

Derek Moore edited this page Jul 9, 2021 · 1 revision

Introduction

This proposal introduces watermarks to Pravega in the ingestion time domain. A watermark is a coarse-grained indication of when event data was written to the stream, made available to readers to perform time-aware processing. A key goal is to support event time processing in Apache Flink, which works best with connector-provided watermarks.

Event time processing is considered an important/strategic feature to support. Existing solutions that defer watermark handling to the Flink application (ref) are inadequate, because:

  1. the reader group may process stream events out of order (with respect to the ingestion time of the event), complicating the watermark assigner.
  2. watermark assigners do not have access to the Flink state backend, further limiting complexity.
  3. the reader group may rebalance segment assignment, leading to major jumps (forward or backwards) in time; Flink requires that the watermark be strictly ascending for a given subtask.

Watermarks effectively encapsulate system internals, e.g. regarding the processing order of the segments that constitute a stream. Though Pravega guarantees the ordering of events for a given routing key, a total temporal ordering is not guaranteed. Events with older timestamps may arrive after those with newer timestamps, especially in historical stream processing scenarios. Thus an event time clock may not be based solely on event timestamps; the clock is best regulated with a watermark.

For more information on the concept of event time, see Streaming 101 by Tyler Akidau and the Flink documentation.

Status: Under discussion

Table of Contents

Concepts

Ingestion Time

Temporal Operations

Certain operations on a segment (e.g. creates, appends, merges) constitute temporal operations whose time of occurrence is significant. The timestamp of the temporal operation is drawn from the so-called ingestion time domain, defined as the real-world time on the segment server at the time of occurrence.

Watermarking

The significance of the time of occurrence of a temporal operation is that it serves as a marker for the progression of time as a stream is written. A reader may then use such time information to advance a watermark. Since streams are an append-only structure, a watermark with timestamp X indicates that no event shall follow with an ingestion time before or at time X. Watermarks are useful for stream processing applications where the temporal order of events is significant.

The phrase "(exclusive) minimum ingestion time" is sometimes used to describe the watermark. Implementation-wise, this implies that recorded timestamps should have 1L subtracted to formulate a watermark (e.g. to accommodate numerous operations occurring within one millisecond).

Watermark Coordination

A reader group coordinates its reads to process each stream event exactly once and to formulate a group-level watermark. The group-level watermark indicates the exclusive minimum ingestion time of any subsequent event across the entire group. Processors should use the group-level watermark, rather than the segment-level watermark, to advance the event time clock. This approach ensures that segment assignment may change dynamically without causing the watermark to jump back on any reader instance (a situation that Flink does not support).

Stream Idleness

It is important to advance the watermark even when no writes occur on a given segment, i.e. in tail reads. Otherwise, events may remain buffered indefinitely within the processor as it awaits the further passage of time.

Event Time

The concept of event time refers to the time when a given event occurred rather than when the event was committed to a Pravega stream. Stream processing frameworks now provide a sophisticated programming model based on the event time domain. For example, time-based aggregations may be produced based on the advancement of the event time clock (as opposed to the processing time clock or wall-clock on the processor). This capability is seen as the foundation for a unified programming model for historical and real-time stream processing with correct (rather than speculative) results.

The event time of a given event is typically stored as a component of the event data and is opaque to Pravega.

Relating Event Time to Ingestion Time

In concept the ingestion time of an event is wholly unrelated to its event time since event data is opaque. In practical applications they're likely closely related, the delta for a given event attributable to a combination of buffering and transmission time. For example, an IOT sensor might produce sensor readings that are sent to Pravega within a few minutes.

In such a scenario, a stream processor may easily drive an event-time clock using the ingestion time watermarks encountered in the course of reading the stream. A concrete approach is to subtract an application-specific event-time lag from the ingestion timestamp, thus asserting an upper bound on the buffering and transmission time of the event. For example, a lag time of five minutes would set the event time clock to 11:55 when the ingestion time clock reaches 12:00, thus asserting that all events prior to 11:55 have been read. Late-record processing logic could compensate for erroneous assertions.

Proposed Changes

Overview

The basic idea is to record the time of last occurrence of a segment operation (segment creation, append, merge) as a metadata attribute, to convey that information as a watermark upon segment read, to track it in the reader group state, and to emit a group-wide minimum watermark.

For example: In the below diagram, the stream consists of two segments. The reader has encountered a watermark of 12:00 on segment 1, and 12:01 on segment 2. We can therefore be certain that no subsequent event (in any segment) will occur before 12:00.

            ║ create                                                                       
            ║                                                                              
            ║   ║ append                             │ W = 11:55                           
            ║   ║                                    │                                     
            ║   ║   ║  merge                         │        │ W = 12:00                  
            ║   ║   ║                                │        │                            
      ┌─────▼───▼───▼───┐                            │        │            │ W = 12:02     
      │                 │  updateAttribute           ▼┌────┐┌─▼──┐┃┌────┐┌─▼──┐            
      │                 │ (LAST_WRITE_TIME) Segment 1 │ A  ││ B  │┃│ C  ││ D  │            
      │  Operation Log  │  ══════════════►            └────┘└────┘┃└────┘└────┘  ════╗     
      │                 │                             ┌────┐┌────┐┌────┐┃┌────┐      ║     
      │                 │                   Segment 2 │ A  ││ B  ││ C  │┃│ .. │      ║     
      │                 │                             └────┘└────┘└─▲──┘┃└────┘      ║     
      └─────────────────┘                                           │                ║     
                                                                    │ W = 12:01      ║     
                                                                                     ║     
                          ╔════════════════════╗                                   chkpt   
                          ║                    ║          ┌───────────────────┐      ║     
                          ▼            updateGroupState   │   Reader Group    │      ║     
                ┌───────────────────┐          ║          │       State       │      ║     
                │   Event Reader    │          ║          │                   │      ║     
                │ W=12:00           │          ╚══════════│ S1(O=2,W=12:00)   │◄═════╝     
                └───────────────────┘                     │ S2(O=3,W=12:01)   │            
                          ║                               │                   │            
                          ╚══════════read═════════╗       └───────────────────┘            
                                                  ▼                                        
                                                  ┃                                        
┌────────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────────┐┃┌────┐┌────┐┌────────┐┌────┐            
│W(11:55)││ 1A ││ 2A ││ 1B ││ 2B ││ 2C ││W(12:00)│┃│ 1C ││ 1D ││W(12:01)││ .. │            
└────────┘└────┘└────┘└────┘└────┘└────┘└────────┘┃└────┘└────┘└────────┘└────┘            
                                                  ┃                                        

Specific changes are described in the next sections.

Protocol

SegmentRead

Enhance the SegmentRead response to include a watermark indicating the (exclusive) minimum ingestion time of any subsequent data. Subsequent means any data at an offset beyond the range covered by the SegmentRead or in a successive segment.

Note that SegmentRead may be returned with a zero length to communicate a watermark without associated data. This is useful for watermarking of tail reads.

StreamSegmentInfo

Enhance the StreamSegmentInfo response to include the segment creation time (in the ingestion time domain).

Note that StreamSegmentInfo contains a lastModified field that is unrelated to this proposal. Consider changing its semantics to be based on the ingestion clock.

Client

EventRead Interface

Return a watermark via new methods on the EventRead interface: isWatermark() and getWatermark().

Provide a new property isEvent() to improve the forward compatibility of client code.

Time Domain Configuration

For backwards compatibility reasons, the client should opt-in to watermark reads. Otherwise a client that isn't watermark-aware may erroneously treat a watermark as an event (since isEvent was not previously available).

Provide a reader configuration option for this purpose, expressing the 'time domain' that the client wishes to operate in. Expose two domains:

  • ProcessingTime reflecting current behavior (default value).
  • IngestionTime reflecting new behavior (watermarks).

SegmentInputStream

Enhance the SegmentInputStreamImpl to expose the current watermark, to be read after a call to readEventData. The semantics are that the watermark pertains to the most recent event (or EndOfSegmentException), and indicates the (exclusive) minimum ingestion time of any subsequent event.

Note that watermarks obtained via SegmentRead responses are understood to indicate the watermark after the read response is fully consumed. Since the response data is unaligned with respect to event data, the watermark must be internally buffered.

Segment Metadata Client

Enhance the metadata client to provide the creation time for a given segment. This information shall be used by the reader group manager to ascertain the minimum watermark of a segment (see Reader Group Manager section).

Reader Group State

Track the overall reader group watermark in reader group state:

  1. Store the per-segment watermark for all incomplete segments (i.e. all of unassigned, assigned, future).
  2. Calculate the global watermark as the minimum of all per-segment watermarks.
  3. Initialize watermarks from ReaderGroupStateInit
  4. Update the watermarks from CheckpointReader update (w/ updated watermarks for given position data)
  5. Update the watermarks from SegmentCompleted update (w/ successive watermarks)

Sort the unassigned readers by watermark (ascending) to prioritize the oldest stream data.

Reader Group Manager

Use the stream metadata client to ascertain the watermark at a given position. Use the information to update the reader group state as mentioned above.

Use the creation time of the segment as a short-term alternative to obtain a pessimistic watermark. An improvement would be to use the ReadIndex to obtain a more accurate watermark.

EventReaderImpl

Track the watermarks obtained from segment input streams and checkpoint the watermarks.

Track the latest watermark and return the watermark at event read time whenever the value is increased due to a group state change.

Mock Segment Streams

Enhance the mock classes (e.g. MockSegmentIoStreams) to support watermarks based on a synthetic clock. This is to facilitate unit testing of the reader classes (e.g. to verify watermark progression).

Contracts

Segment Metadata Attributes

Introduce a LAST_WRITE_TIME attribute with which to record the timestamp of the most recent temporal operation.

Consider renaming this attribute to LAST_OP_TIME.

ReadResultEntry

Introduce a watermark property with implementation-specific semantics. For FutureReadResultEntry, the watermark indicates the watermark of the data immediately preceding the future read range, for use with tail reads (see below). For EndOfStreamSegmentReadResultEntry, the watermark indicates the watermark of the data at the tail of the segment.

ReadResultEntryContents

Introduce a watermark property indicating the watermark of subsequent (potentially future) content.

Segment Store

Time handling is encapsulated in the segment store, details follow.

Ingestion Time Clock

Introduce a clock to authoritatively determine the current time in the ingestion time domain. Construct the clock in the service builder, and provide it to the operation log and other components that process temporal operations.

Operation Processor

The operation processor serializes updates to a given segment and shall be responsible for assigning timestamps to temporal operations (using the ingestion time clock). A good reason to perform this function in the processor is to ensure that the timestamp monotonically increases. Details follow.

Appends

An append shall cause an update to the LAST_WRITE_TIME attribute.

Transaction Merges

A transaction merge is semantically equivalent to a single, large append at the merge point. The timestamp of operations that produced the transaction segment are irrelevant; record a new timestamp for LAST_WRITE_TIME when the merge is accepted.

Attribute Updates

An attribute update shall cause an update to the LAST_WRITE_TIME attribute. The watermark advancer (discussed below) shall use a trivial attribute update operation (discussed below).

Segment Container

The segment container is responsible for segment creation (including transaction segments). Segment creation is not handled by the operation log.

Initialize the CREATION_TIME and LAST_WRITE_TIME attributes for a new segment (using the ingestion time clock).

Watermark Advancer

The watermark must advance on any segment being read, even when no writes are forthcoming. We'll refer to such segments as 'idle', not to be confused with 'inactive' which relates to eviction (see below). A helper service shall be introduced to periodically drive this process.

Open Issue: How should the watermark of inactive segments be handled? Ideally the watermark would be advanced as soon as the segment is reactivated.

Configuration: Max Watermark Lag

Provide a container-level configuration option to control the maximum lag on the watermark of active segments. For example, a configured value of one minute indicates that the watermark should be advanced on any segment that has been idle for at least a minute. The choice of value will determine how much overhead is incurred for idle segments. Suggested default value: 10 seconds.

Configuration: Polling Period

Provide a container-level configuration option to control the rate at which active segments are evaluated for idleness. Suggested value: 1 second.

Identifying Candidates

The advancer shall examine the container's active segments to identify segments whose watermark should be advanced (i.e. 'idle' segments). The criteria shall be:

  1. non-transaction segments,
  2. that are not sealed, and
  3. whose LAST_WRITE_TIME timestamp is in the past (according to the ingestion time clock) by more than maxWatermarkLag.

Updating LAST_WRITE_TIME

To advance the watermark on a given idle segment, the container shall emit a trivial attibute update to the operation log. Since attribute updates are considered temporal operations, the LAST_WRITE_TIME shall be updated automatically.

Eviction of Inactive Segments

Segments that haven't been used (read or written) in a while are evicted from the container's in-memory state, to minimize resource usage. The advancement of watermarks on idle segments mustn't interfere with this.

Read Index

The read index integrates reads across cache and storage and tracks future reads. Future reads are typically tail reads to be completed when new data is appended, but are also used when the cache is backfilled from storage.

Watermark Index

The read index shall maintain a list of known watermarks, indexed by offset. The index entry at a given offset conveys the (exclusive) minimum ingestion time of any data existing after that offset. For example, an entry of (42L, 12:00) indicates that data at offset 43L or beyond has an ingestion time after 12:00. A special-case entry at offset -1 records the initial watermark of the segment.

Appends

The read index is updated as data is committed to the segment. Use the LAST_WRITE_TIME attribute (as updated by the operation log) to establish a new watermark at (lastOffset, LAST_WRITE_TIME - 1).

Transaction Merges

The merge operation produces a new value for the LAST_WRITE_TIME attribute, update the watermark index accordingly.

Attribute Updates

Enhance the read index to be informed of attribute updates, so as to read the LAST_WRITE_TIME attribute and update the watermark index.

This is necessarily only if AttributeUpdateOperation is used to advance the watermark on idle streams. Adapt accordingly.

Future Reads

Future reads should complete upon an update to the relevant watermark, to inform the reader which is waiting for more data. Introduce a watermark property on FutureReadResultEntry to track the watermark value that existed when the future read was initiated; compare this value to the updated watermark to determine whether the read should complete.

Keep in mind that not all future reads are tail reads.

Redirected Reads

Redirected reads obtain their content from a child (e.g. transaction) segment. The watermark to be associated with the content must be obtained from the parent segment, since the merge transaction updates the LAST_WRITE_TIME after the child segment is sealed.

Request Processor

readSegment

The processor produces SegmentRead responses from the ReadResult and its collection of entries. The processor must return the watermark associated with the last entry's content (i.e. the cache, storage, or future result). The ReadIndex determines the watermark value. When aggregating cache entries, use the last entry's watermark.

getStreamSegmentInfo

Enhance to return the creation time of the segment (in the ingestion time domain, as recorded with the CREATION_TIME metadata attribute).

createTransaction

Consolidate the logic related to establishing the CREATION_TIME attribute to the segment container.

Flink Streaming Data Source

The connector shall be enhanced to use the ingestion watermarks as the basis for event time watermarks.

Maximum Event Time Lag

The data source shall expose a configurable property called maxEventTimeLag, with which to specify the expected delta between the ingestion time (as provided by Pravega) and the event time of a given event (as obtained via a Flink TimestampAssigner).

The data source shall subtract the configured value from any received watermark, and then emit the adjusted watermark to Flink.

Timestamp Assigner

The data source shall accept a TimestampAssigner with which to assign timestamps to incoming events following deserialization. This is a convenience to eliminate the need for a transformation operator solely dedicated to assigning timestamps.

Prototype

A end-to-end prototype was developed to demonstrate the viability of the proposal, using Apache Flink as the stream processor.

Source Code

The prototype is split into two pull requests:

Incomplete Features and Potential Improvements

Segment Eviction

The mechanism by which the watermark is advanced on idle segments is probably undermining segment eviction.

System Streams

The watermark is advanced on all streams including the system streams (e.g. _system/_commitStream). Should the system streams be avoided?

Reader Checkpoint State (External)

  • Include watermarks in reader checkpoint state. Current solution is to fetch the watermarks at restore time.

Sealed Segments

The AsyncSegmentInputStream synthesizes a SegmentRead response when a SegmentSealed error is received, with a watermark value of Long.MAX_VALUE. Improve the behavior to use the actual watermark.

Recovery

Issue: Validate the design for potential recovery issues.

Prototype Output

For reference, output from the EventTimeITCase class (source code) within the Flink connector test suite. Selected output highlighting the handling of three specific events, from an on-time (A), early (B), and late (C) sensor. Observe that the events have been effectively reordered into event-time order.

DEBUG i.p.c.flink.FlinkPravegaReader - Advancing the event time watermark: Watermark @ 1510626708681
...
DEBUG i.p.c.f.u.AbstractStreamBasedWriter - Writing: (1510626750230,B)
DEBUG i.p.c.f.u.AbstractStreamBasedWriter - Writing: (1510626719197,A)
DEBUG i.p.c.f.u.AbstractStreamBasedWriter - Writing: (1510626691235,C)
DEBUG i.p.c.f.EventTimeOrderingOperator - Discarded a late record: (1510626691235,C)
...
DEBUG i.p.c.flink.FlinkPravegaReader - Advancing the event time watermark: Watermark @ 1510626726273
(1510626719197,A)
...
DEBUG i.p.c.flink.FlinkPravegaReader - Advancing the event time watermark: Watermark @ 1510626754349
(1510626750230,B)

Full output

Clone this wiki locally