Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Message Pipelines: from Indexed Chronicle to Vanilla Chronicle

Christian Kreutzfeldt edited this page Jun 2, 2015 · 6 revisions

The SPQR default message queue implementation used for transferring data between operators is based on the OpenHFT chronicle queue.

The first approach used the IndexedChronicle which provides a single writer / multiple reader queue implementation optimized for speed and throughput. Although the pipelines showed great numbers for throughput and latency there was an imminent risk lurking in the background: chronicles are append only files which are never closed & removed unless they are told to do.

The issue came to daylight quite soon right after experimenting with SPQR on web tracking data produced from a majority of our sites. Our implementation performed well until ... until the hard disk ran out of space.

To solve this one two options were available:

  • switch to vanilla chronicle implementation which supports rolling files (and solves some issues with multiple writers as well)
  • stick with the indexed chronicle implementation and introduce rolling files handling on top

One technical aspect of SPQR is to provide high throughput and low latency message handling. Therefore sticking with the existing implementation and adding missing features seems to be the natural decision. As another goal is to provide a running service soon not to loose any ground the decision was made to use the vanilla chronicle but keeping in mind the extension to an indexed chronicle implementation for a later version.

Replacing Indexed Chronicle with Vanilla Chronicle

As the chronicle queue follows a strict interface based architecture replacing the existing indexed chronicle based implementation by a vanilla chronicle based one was as easy as this:

public class DefaultStreamingMessageQueue {
  ...
  public void initialize(Properties properties) throws RequiredInputMissingException {
    ...
    this.chronicle = ChronicleQueueBuilder.indexed(pathToChronicle).build();
    ...
  }
  ...
}

replaced by

public class DefaultStreamingMessageQueue {
  ...
  public void initialize(Properties properties) throws RequiredInputMissingException {
    ...
    this.chronicle = ChronicleQueueBuilder.vanilla(pathToChronicle).build();
    ...
  }
  ...
}
Clone this wiki locally