Certain stream topologies such as ones calling HTTP client or mapAsyncUnordered
allow stream elements to go out of order. The bounded ordering stage allows streams to get back in order when needed. This ordering is best effort using a sliding window algorithm. If a stream is out of order within the given bounded sliding window size, total ordering can be achieved as a result. However, if elements are out of order beyond the sliding window size, the stream cannot and will not wait. The out-of-order elements will either be skipped, or will be passed down out-of-order if it arrives late beyond the window. This bounded sliding window limit is to prevent stream starvation and memory leaks from buffers building up.
Add the following dependency to your build.sbt
or scala build file:
"org.squbs" %% "squbs-ext" % squbsVersion
The bounded ordering functionality is provided by the BoundedOrdering
component exposed as a Flow
component that can be connected to a stream Source
or another Flow
component using the via
operator. The elements passing through BoundedOrdering
are expected to have a predictable, orderable, and continuous id we can derive from the element. This id is commonly a Long
but could be any other type provided it's order is predictable.
The BoundedOrdering
creation takes a few inputs:
- The
maxBounded
parameter defines the sliding window size for whichBoundedOrdering
will wait for the out-of-order element. - The
initialId
parameter is the expected initial idid for the first element. This way the stream would know if the first element went missing or out-of-order. - The
nextId
parameter specifies the function to derive the subsequent element's id from the current element's id. - The
getId
parameter specifies the function used to extract the id from the element. - Optionally, a
Comparator
may be passed if the system does not know how to compare and order the id type.
Initialization and usage can be seen in the following examples:
// This is the sample message type.
case class Element(id: Long, content: String)
// Create the BoundedOrdering component.
val boundedOrdering = BoundedOrdering[Element, Long](maxBounded = 5, 1L, _ + 1L, _.id)
// Then just use it in the stream.
Source(input).via(boundedOrdering).to(Sink.ignore).run()
// This is the sample message type.
static class Element {
final Long id;
final String content;
// Omitting constructor, hashCode, equals, and toString for brevity.
// Do not forget to implement.
}
// Create the BoundeOrdering component.
Flow<Element, Element, NotUsed> boundedOrdering = BoundedOrdering.create(5, 1L, i -> i + 1L, e -> e.id);
// Then just use it in the stream.
Source.from(input).via(boundedOrdering).to(Sink.ignore()).run(mat);
This stage may detach downstream and upstream demand to some extent, temporarily. This will happen when elements arrive out of order. Out-of-order elements will be buffered till its order has arrived, or the bounds have been reached. When this happens demand is generated upstream from this component without emitting elements downstream until given conditions have been reached.