Skip to content

ivansun1010/storm-redis

 
 

Repository files navigation

kafka-storm-redis

This project implements an approach towards implementing stateful-ness of bolts in Storm.

###The project uses the following open-source projects:###

###Explanation###

The project can be divided into two parts. The first is the Spout-part which handles replaying of messages and the second is the Bolt-part which manages the intermediate-state of the main processing.

####Spout#### Kafka is used as the data source for the spout. This makes replaying of messages easy and handy. And with kafka there's no need for the spout(of the topology) to keep track of the messages by itself. Spout used here extends a BasePartitionedTrnsactionalSpout which implements an IPartitionedTransactionalSpout. Thus, only the TransactionMetadata is to be defined by the user as per need. Re-emitting of messages becomes very easy. The user can also specify the size of each batch and no. of partitions used.

####Bolts#### Redis is used as the inmemory database to store the intermediate state of the bolts. This project builds abstractions for bolts with fault-tolerant state, so if a task dies and gets reassigned to another machine it still has its state. The tuple trees that are made incomplete due to the bolt task failure will time-out and the spout will be able to replay the source tuple for that tree. Tuples that have already successfully completed will not be replayed. So generally you keep any persistent state in a database, oftentimes doing something like waiting to ack() tuples until you've done a batch update to the database. Stateful bolts will just be a much more efficient way of keeping a large amount of state at hand in a bolt.

public interface IPersistentMap(String serverURL) {
      public Object getState(byte[] key);
      public void setState(byte[] key, Object value);
}

The first implementation will target amounts of state that can fit into memory, so re-initialization time won't be a concern. But once we look at storing much larger amount of state we will need to consider this point. State of Bolts get persisted periodically in Redis. Redis is an in-memory database that persists on disk. The data model is key-value, but many different kind of values are supported: Strings, Lists, Sets, Sorted Sets, Hashes http://redis.io

##Dependencies## The project uses many dependencies for kafka and redis. All dependenices are provided as maven dependecies.

Kafka uses the following dependencies. To run the topology inside storm jars of the dependencies can be downloaded from maven repository and should be placed inside the lib directory of storm

	<dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
      			<artifactId>spring-core</artifactId>
      			<version>3.2.4.RELEASE</version>
    		</dependency>

    		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
      			<version>3.2.4.RELEASE</version>
    		</dependency>

    		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.9.2</artifactId>
      			<version>0.8.0</version>
    		</dependency>

    		<dependency>
			<groupId>javax.inject</groupId>
      			<artifactId>javax.inject</artifactId>
      			<version>1</version>
    		</dependency>

	        <dependency>
			<groupId>org.scala-lang</groupId>
      			<artifactId>scala-library</artifactId>
      			<version>2.9.2</version>
    		</dependency>

    		<dependency>
			<groupId>log4j</groupId>
      			<artifactId>log4j</artifactId>
      			<version>1.2.17</version>
    		</dependency>

    		<dependency>
			<groupId>com.101tec</groupId>
      			<artifactId>zkclient</artifactId>
      			<version>0.3</version>
    		</dependency>

		<dependency>
			<groupId>com.yammer.metrics</groupId>
			<artifactId>metrics-core</artifactId>
			<version>2.2.0</version>
		</dependency>

        </dependencies>

Jedis is a Java client used for Redis, which can be used as a Maven dependency

 <!-- Jedis Dependency -->
	<dependency>
		<groupId>redis.clients</groupId>
		<artifactId>jedis</artifactId>
		<version>2.2.1</version>
		<type>jar</type>
		<scope>compile</scope>
	</dependency>

##The topology to show the working##

###Explanation###

The topology follows the following schematic:

           ____asking to replay batch__ _____failed signal to spout__
          |                            |                             |
          V                            V                             V
.-----------------.       .-----------------.        .-----------------.     .-----------------.
|       kafka     |------>|       spout     |------->|      bolt       |---->|      redis      |
'-----------------'       '-----------------'        '-----------------'     '-----------------'
  for replaying          |--------------the topology-----------------|          stores state

The data in the kafka topic is put using a java project see kafka-starter which takes data from mongodb and puts them on a kafka queue(topic).

Data that is extracted from mongodb is in json format.

Data inside mongodb is put using a python code which uses the twitter api for eg see this

Thus, the combination of mongo-kafka helps in simulating real-time streaming data. Basically, mongodb is used so that a lot of data can be stored and then put on kafka so that the topology sees a lot of data. Thus, it basically resembles a firehose.

###Steps to Run the Topology###

  1. Mongo server on
  2. Start zookeeper
  3. Kafka server on
  4. Run MongoClientTest from kafka-starter
    • Puts data from mongo kafka.
  5. Mongo Server off as it is no longer needed.
  6. Start nimbus.
  7. Start supervisor.
  8. Submit toopology.

###Note###

  • For benchmarking use the branch : FailandBenchM
  • To understand the benchmarking workflow visit the Benchmarking Wiki Page

TODO

  • Writing the kafka producer so partitions have equal no. of messages.
  • Runtime failure
  • Restore State on failure

About

Storm Bolt State Management

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 100.0%