Components for developing and testing streaming applications using Apache Storm.
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
.idea
src
.gitignore
LICENSE
README.md
pom.xml
storm-gadgets.iml

README.md

Storm Gadgets: Components for Developing and Testing Streaming Applications Using Apache Storm

Utility Spouts and Bolts

Bolts

Class Purpose
ProjectionBolt Filters all but the named fields from each tuple.
SplittingBolt Splits the fields of each tuple across a number of different output streams. Fields may be dropped entirely, reordered and/or duplicated across multiple streams.
CacheJoiningBolt Joins each of one or more streams with a distinguished stream that is used to populate a cache.

Common Characteristics

All of the components require their output field signatures (names of output fields) to be specified in the constructor parameter list. In the components where the names of output streams are flexible, they too need to be configured in the parameter list.

Since a bolt can take input from multiple other components, often with different sets of output fields, it is useful to keep in mind that its input tuples may not all have the same fields. Furthermore, fields may have null values.

All of the components attempt to play well with guaranteed delivery, if configured. They acknowledge all received tuples, and anchor all the tuples they generate.

ProjectionBolt

The ProjectionBolt re-publishes a new set of output fields, whose names are passed into the constructor, either as strings of a Fields instance. For each of the named fields, in the given order:

  • If a field of the same name appears in the input tuple, it is copied to the output.
  • If a field does not appear in the input tuple, it is given a null value in the output.
  • If any fields are present in the input that were not specified for the output, they are dropped.

There is no restriction on the number of times an input field can appear in the output, or on the order in which output fields appear.

Streams: No distinction is made among different input streams. All output uses the default stream.

Reliability: All emitted tuples are anchored by the corresponding input tuple. Each input tuple is acknowledged once the corresponding output tuple has been emitted.

Concurrency: There are no restrictions on concurrency and any grouping strategy can be used for inputs.

A number of obvious extensions may appear in the future:

  • Field renaming
  • New fields with constant values or calculated values (on the basis of named input fields)

Example:

ProjectionBolt projection = new ProjectionBolt("b", "d");

ProjectionBolt projection = new ProjectionBolt(new Fields("b", "d"));

Both of these forms are equivalent. Every tuple emitted will have fields "b" and "d", copied from the input tuple, or null if not present in the input tuple.

Splitting Bolt

The SplittingBolt splits the data in the input tuples across multiple output streams. The constructor takes a variable number of Fields instances, each specifying the field names for each output stream.

For each stream, for each of the named fields, in the given order:

  • If a field of the same name appears in the input tuple, it is copied to the output.
  • If a field does not appear in the input tuple, it is given a null value in the output.
  • If any fields are present in the input that were not specified for any of the output streams, they are dropped.

There is no restriction on the number of times an input field can appear in the output, or on the order in which output fields appear. A field may be repeated in the output for a particular output stream, and/or it may be repeated across multiple output streams.

Streams: No distinction is made among different input streams. The output tuples are emitted on different streams, named stream1, stream2 etc. in the order in which the groupings are given in the bolt constructor.

Reliability: Each output tuple is anchored by the input tuple from which it was constructed, and each input tuple is acknowledged once all of the corresponding output tuples have been emitted.

Concurrency: There are no restrictions on concurrency and any grouping strategy can be used for inputs.

Example:

SplittingBolt splitter =
    new SplittingBolt(
        new Fields("a", "b"),
        new Fields("c", "d"));

In this case stream1 would have fields a and b, and stream2 would have fields c and d.

CacheJoiningBolt

The CacheJoiningBolt creates a cache of data based on the input tuples received from a distinguished input component, specified in the constructor. The cache is keyed on one input field, also specified in the constructor, and the cache always keeps one tuple for each value of the key field -- the one received most recently.

Tuples received from all other components are joined with the cache tuples, using either an inner join or a left outer join -- specified in the constructor. The cache is always on the right side of the join. The join is always based on equality between the cache key and a specified field for all other tuples -- it can be a different field for each source component. The fields can be renamed in the output.

Streams: Input tuples are treated differently depending on the component from which they originate. One of these is considered as the source of cache records, while each of the others is used for the left side corresponding to a join specifier. The join specifier determines the name of the corresponding output stream. Tupels used to update the cache are also emitted on a separate output stream called "cache", with their original field names.

No distinction is made among different input streams. The output tuples are emitted on different streams, named stream1, stream2 etc. in the order in which the groupings are given in the bolt constructor.

Reliability: The result of each join is anchored only by the left side tuple that produced it. When a new cache tuple is consumed, if it results in a cache update (whenever the key is present in the tuple) the emitted tuple is anchored in the corresponding input tuple. Every consumed tuple is acknowledged, even if it is neither used to update the cache nor to join. This may not be the desired behavior, especially for the cached data, and anybody using this bolt in a topology that employs guaranteed message processing needs to think carefully about the cases where tuple replay is, or is not, desired. In particular, replay of cached tuples may be questionable in many scenarios.

Concurrency: If multiple instances of this bolt are to execute, the cache input should probably use the "All" grouping so that each instance gets all cache entries. Adventurous users may consider using a "fields" grouping instead, but then they will also have to use a "fields" grouping for ALL the other inputs, specifying the join key for that particular input as the grouping field. It's not clear that this will always result in the left side tuples being routed to the correct cache partition.

Example:

Assume a component (spout or bolt) called "prices" is supplying tuples containing stock quotes for symbols, which contain at least the fields "symbol" and "price". Another component, called "trades" is supplying stock trades, with each tuple containing exactly the fields "symbol" and "quantity". Every time a "trades" tuple comes along it makes sense to join on the latest price quote for that symbol. Thus it makes sense to store the most recent "prices" record for each "symbol" in the cache.

The setup code follows. Most of the setup for the cache is in the constructor for the CacheJoiningBolt -- the cache key field, the component whose tuples will be cached, the fields to include in the cache entry, and finally an array of join specifiers -- in this case there is only one. The constructor for a JoinSpec requires the join key, the component producing the tuples, the name of the resulting output stream, whether this is to be an outer join, the new names of the fields from the left side (including the join key) and the new names of the fields from the cache (excluding the cache, or join, key.)

CacheJoiningBolt.JoinSpec[] joins = {
    new CacheJoiningBolt.JoinSpec("symbol", "trades", "trade_with_price", false,
                                  new Fields("trade_symbol", "trade_quantity"),
                                  new Fields("trade_price))
    };

CacheJoiningBolt joiner =
    new CacheJoiningBolt("symbol", "prices",
                         new Fields("symbol", "price"), joins);

The joined tuples will be emitted on stream "trades_with_prices", which will have the fields "trade_symbol", "trade_quantity" and "trade_price".

In general it is possible to use a single CacheJoiningBolt to join several streams of data with the cache, in which case there needs to be one entry for each of them in the array of join descriptors.

Testing Utilities

Mock Spouts and Bolts

Class Purpose
QueueSpout A spout that reads from a queue.
QueueSinkBolt A bolt that writes to a queue, and doesn't emit any tuples.

QueueSinkBolt

The QueueSinkBolt has roughly the character of org.apache.storm.testing.TupleCaptureBolt in the Storm APIs. It is registered using the bolt name specified in the topology, so that clients can address specific bolts, and that results are stored in an AsyncTupleQueue, making it somewhat easier to test asynchronous behavior. Test code can make assertions about the tuples a bolt receives without being concerned about whether they have already been received or will be received in the future.

Example:

Here, two tuples are expected from the QueueSinkBolt whose name is "sink", both with fields names "b" and "d", with values "two" and "four" in the first tuple and "six" and "eight" in the second. Both tuples are expected within thirty seconds.

QueueSinkBolt.getQueue("sink")
             .waitForTuple(30,
                    new Fields("b", "d"),
                    new Values("two", "four"));
QueueSinkBolt.getQueue("sink")
             .waitForTuple(30,
                    new Fields("b", "d"),
                    new Values("six", "eight"));

QueueSpout

The QueueSpout takes somewhat the approach of QueueSinkBolt for feeding data into a topology, except that there is no need for an asynchronous interface.

Example:

Since the QueueSpout emits tuples, it is configured with a Fields object through its constructor:

QueueSpout spout = new QueueSpout(new Fields("b", "d"));

When added to a topology, in this case named "myspout", it can be fed tuples as follows:

QueueSpout.getQueue("myspout").add(new Values("two", "four"));
QueueSpout.getQueue("myspout").add(new Values("six", "eight"));

Testing Technique

As the components don't contain traditional business logic that is separable from their Storm API interactions, the testing approach used involves running them in "trivial" topologies.

Typically, a bolt is tested at least by creating a single instance with parallelism hint 1, a number of QueueSpout instances to feed it, and a number of QueueSinkBolt instances to examine the outputs -- one per output stream.