Skip to content

Latest commit

 

History

History
100 lines (63 loc) · 9.88 KB

08-intro_to_storm+trident.asciidoc

File metadata and controls

100 lines (63 loc) · 9.88 KB

Intro to Storm+Trident

Enter the Dragon: C&E Corp Gains a New Partner

Dragons are fast and sleek, and never have to sleep. They exist in some ways out of time — a dragon can perform a thousand actions in the blink of an eye, and yet a thousand years is to them a passing moment

Intro: Storm+Trident Fundamentals

At this point, you have good familiarity with Hadoop’s batch processing power and the powerful inquiries it unlocks above and as a counterpart to traditional database approach. Stream analytics is a third mode of data analysis and, it is becoming clear, one that is just as essential and transformative as massive scale batch processing has been.

Storm is an open-source framework developed at Twitter that provides scalable stream processing. Trident draws on Storm’s powerful transport mechanism to provide exactly once processing of records in windowed batches es for aggregating and persisting to an external data store.

The central challenge in building a system that can perform fallible operations on billions of records reliably is how to do so without yourself producing so much bookkeeping that it becomes its own scalable Stream processing challenge. Storm handles all details of reliable transport and efficient routing for you, leaving you with only the business process at hand. (The remarkably elegant way Storm handles that bookkeeping challenge is one of its principle breakthroughs; you’ll learn about it in the later chapter on Storm Internals.)

This takes Storm past the mere processing of records to Stream Analytics — with some limitations and some advantages, you have the same ability to specify locality and write arbitrarily powerful general-purpose code to handle every record. A lot of Storm+Trident’s adoption is in application to real-time systems. [1]

But, just as importantly, the framework exhibits radical tolerance of latency. It’s perfectly reasonable to, for every record, perform reads of a legacy data store, call an internet API and the like, even if those might have hundreds or thousands of milliseconds worst-case latency. That range of timescales is simply impractical within a batch processing run or database query. In the later chapter on the Lambda Architecture, you’ll learn how to use stream and batch analytics together for latencies that span from milliseconds to years.

As an example, one of the largest hard drive manufacturers in the world ingests sensor data from its manufacturing line, test data from quality assurance processes, reports from customer support and post mortem analysis of returned devices. They have been able to mine the accumulated millisecond scale sensor data for patterns that predict flaws months and years later. Hadoop produces the "slow, deep" results, uncovering the patterns that predict failure. Storm+Trident produces the fast, relevant results: operational alerts when those anomalies are observed.

Things you should take away from this chapter:

Understand the type of problems you solve using stream processing and apply it to real examples using the best-in-class stream analytics frameworks. Acquire the practicalities of authoring, launching and validating a Storm+Trident flow. Understand Trident’s operators and how to use them: Each apply `CombinerAggregator`s, `ReducerAggregator`s and `AccumulatingAggregator`s (generic aggregator?) Persist records or aggregations directly to a backing database or to Kafka for item-potent downstream storage. (probably not going to discuss how to do a streaming join, using either DRPC or a hashmap join)

Note
This chapter will only speak of Storm+Trident, the high level and from the outside. We won’t spend any time on how it’s making this all work until (to do ref the chapter on Storm+Trident internals)

Your First Topology

Topologies in Storm are analogous to jobs in Hadoop - they define the path data takes through your system and the operations applied along the way. Topologies are compiled locally and then submitted to a Storm cluster where they run indefinitely until stopped. You define your topology and Storm handles all the hard parts — fault tolerance, retrying, and distributing your code across the cluster among other things.

For your first Storm+Trident topology, we’re going to create a topology to handle a typical streaming use case: accept a high rate event stream, process the events to power a realtime dashboard, and then store the records for later analysis. Specifically, we’re going to analyze the Github public timeline and monitor the number of commits per language.

A basic logical diagram of the topology looks like this:

  1. 89-intro-to-storm-topo.png …​

Each node in the diagram above represents a specific operation on the data flow. Initially JSON records are retrieved from Github and injected into the topology by the Github Spout, where they are transformed by a series of operations and eventually persisted to an external data store. Trident spouts are sources of streams of data — common use cases include pulling from a Kafka queue, Redis queue, or some other external data source. Streams are in turn made up of tuples which are just lists of values with names attached to each field.

The meat of the Java code that constructs this topology is as follows:

IBlobStore bs = new FileBlobStore("~/dev/github-data/test-data"); OpaqueTransactionalBlobSpout spout = new OpaqueTransactionalBlobSpout(bs, StartPolicy.EARLIEST, null); TridentTopology topology = new TridentTopology(); topology.newStream("github-activities", spout) .each(new Fields("line"), new JsonParse(), new Fields("parsed-json")) .each(new Fields("parsed-json"), new ExtractLanguageCommits(), new Fields("language", "commits")) .groupBy(new Fields("language")) .persistentAggregate(new VisibleMemoryMapState.Factory(), new Count(), new Fields("commit-sum"));

The first two lines are responsible for constructing the spout. Instead of pulling directly from Github, we’ll be using a directory of downloaded json files so as not to a) unnecessarily burden Github and b) unnecessarily complicate the code. You don’t need to worry about the specifics, but the OpaqueTransactionalBlobSpout reads each json file and feeds it line by line into the topology.

After creating the spout we construct the topology by calling new TridentTopology(). We then create the topology’s first (and only) stream by calling newStream and passing in the spout we instantiated earlier along with a name, “github-activities”. We can then chain a series of method calls off newStream() to tack on our logic after the spout.

The each method call, appropriately, applies an operation to each tuple in the stream. The important parameter in the each calls is the second one, which is a class that defines the operation to be applied to each tuple. The first each uses the JsonParse class which parses the JSON coming off the spout and turns it into an object representation that we can work with more easily. Our second each uses ExtractLanguageCommits.class to pull the statistics we’re interested in from the parsed JSON objects, namely the language and number of commits. ExtractLanguageCommits.class is fairly straightforward, and it is instructive to digest it a bit:

public static class ExtractLanguageCommits extends BaseFunction { private static final Logger LOG = LoggerFactory.getLogger(ExtractLanguageCommits.class); public void execute(TridentTuple tuple, TridentCollector collector){ JsonNode node = (JsonNode) tuple.getValue(0); if(!node.get("type").toString().equals("\"PushEvent\"")) return; List values = new ArrayList(2); //grab the language and the action values.add(node.get("repository").get("language").asText()); values.add(node.get("payload").get("size").asLong()); collector.emit(values); return; } }

There is only one method, execute, that excepts a tuple and a collector. The tuples coming into ExtractLanguageCommits have only one field, parsed-json, which contains a JsonNode, so the first thing we do is cast it. We then use the get method to access the various pieces of information we need.

At the time of writing, the full schema for Github’s public stream is available here, but here are the important bits:

{ “type”: “PushEvent”, // can be one of .. finish JSON… }

… finish this section …​

At this point the tuples in our stream might look something like this:

(“C”, 2), (“JavaScript”, 5), (“CoffeeScript”, 1), (“PHP”, 1), (“JavaScript”, 1), (“PHP”, 2)

We then group on the language and sum the counts, giving our final tuple stream which could look like this:

(“C”, 2), (“JavaScript”, 6), (“CoffeeScript”, 1), (“PHP”, 3)

The group by is exactly what you think it is - it ensures that every tuple with the same language is grouped together and passed through the same thread of execution, allowing you to perform the sum operation across all tuples in each group. After summing the commits, the final counts are stored in a database. Feel free to go ahead and try it out yourself.

So What?

You might be thinking to yourself “So what, I can do that in Hadoop in 3 lines…​” and you’d be right — almost. It’s important to internalize the difference in focus between Hadoop and Storm+Trident — when using Hadoop you must have all your data sitting in front of you before you can start, and Hadoop won’t provide any results until processing all of the data is complete. The Storm+Trident topology you just built allows you to update your results as you receive your stream of data in real time, which opens up a whole set of applications you could only dream about with Hadoop.


1. for reasons you’ll learn in the Storm internals chapter, it’s not suitable for ultra-low latency (below, say, 5s of milliseconds), Wall Street-type applications, but if latencies above that are real-time enough for you, Storm+Trident shines.