Skip to content

Sample_App

kishoreg edited this page · 15 revisions
Clone this wiki locally

RabbitMQ Consumer Group

RabbitMQ is a well known Open source software the provides robust messaging for applications.

One of the commonly implemented recipes using this software is a work queue. http://www.rabbitmq.com/tutorials/tutorial-four-java.html describes the use case where

  • A producer sends a message with a routing key.
  • The message goes to the queues whose binding key exactly matches the routing key of the message.
  • There are multiple consumers and each consumer is interested in processing only a subset of the messages by binding to the interested keys

The example provided here describes how multiple consumers can be started to process all the tasks.

While this works, in production systems one needs the following

  • Ability to handle failures: when a consumers fails another consumer must be started or the other consumers must start processing these messages that should have been processed by the failed consumer.
  • When the existing consumers cannot keep up with the task generation rate, new consumers will be added. The tasks must be redistributed among all the consumers.

In this sample app, we explain how these set of consumers can be grouped together and handle consumer failures and expansion automatically.

Mapping this usecase to Helix is pretty easy as the binding key/routing key is equivalent to a partition.

Lets take a real example. Lets say a topic has 6 partitions, and we have 2 consumers to process all the queues. What we want is all 6 queues to be evenly divided among 2 consumers. Eventually when the system scales, we add more consumers to keep up. This will make each consumer process tasks from 2 queues. Now let's say that a consumer fails and that the number of active consumers is now reduced to 2. This means each consumer must process 3 queues.

We showcase how such a dynamic App can be developed using Helix.

Try it

Before getting into the details on how to develop such an App using Helix, you can try the following steps to get a feel of it.

git clone git@github.com:linkedin/helix.git
cd helix
./build
export HELIX_PKG_ROOT=`pwd`/helix-core/target/helix-core-pkg

OR Download the latest 0.5.28 release tar ball from here

tar -xzvf helix-core-pkg-0.5.28.tar.gz
export HELIX_PKG_ROOT=`pwd`/helix-core-pkg

Download the rabbitmq-consumer-group recipe from here

tar -xzvf rabbitmq-consumer-group-0.5.28.tar.gz
export HELIX_RABBITMQ_ROOT=`pwd`/rabbitmq-consumer-group/
chmod +x $HELIX_PKG_ROOT/bin/*
chmod +x $HELIX_RABBITMQ_ROOT/bin/*

Install Rabbit MQ

Setting up RabbitMQ on a local box is straightforward. You can find the instructions here http://www.rabbitmq.com/download.html

Start ZK

Start zookeeper at port 2199

$HELIX_PKG_ROOT/bin/start-standalone-zookeeper 2199

Setup the consumer group cluster

This will setup the cluster by creating a "rabbitmq-consumer-group" cluster and adds a "topic" resource with "6" queues.

$HELIX_RABBITMQ_ROOT/bin/setup-cluster.sh localhost:2199 

Add consumers

Start 2 consumers in 2 different terminals. Each consumer is given a unique id.

//start-consumer.sh zookeeperAddress (e.g. localhost:2181) consumerId , rabbitmqServer (e.g. localhost)
$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 0 localhost 
$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 1 localhost 

Start HelixController

Now start a Helix controller that starts managing the "rabbitmq-consumer-group" cluster.

$HELIX_RABBITMQ_ROOT/bin/start-cluster-manager.sh localhost:2199

Send messages to the Topic

Start sending messages to the topic. This script randomly selects a routing key (1-6) and sends the message to topic. Based on the key, messages gets routed to the appropriate queue.

$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 20

After running this, you should see all 20 messages being processed by 2 consumers.

Add another consumer

Once a new consumer is started, helix detects it. In order to balance the load between 3 consumers, it deallocates 1 partition from the existing consumers and allocates it to the new consumer. We see that each consumer is now processing only 2 queues. Helix makes sure that old nodes are asked to stop consuming before the new consumer is asked to start consuming for a given partition. But the transitions for each partition can happen in parallel.

$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 2 localhost

Send messages again to the topic.

$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 100

You should see that messages are now received by all 3 consumers.

Stop a consumer

In any terminal press CTRL^C and notice that Helix detects the consumer failure and distributes the 2 partitions that were processed by failed consumer to the remaining 2 active consumers.

How does it work

Find the entire code here.

Cluster setup

This step creates znode on zookeeper for the cluster and adds the state model. We use online offline state model since there is no need for other states. The consumer is either processing a queue or it is not.

It creates a resource called "rabbitmq-consumer-group" with 6 partitions. The execution mode is set to AUTO_REBALANCE. This means that the Helix controls the assignment of partition to consumers and automatically distributes the partitions evenly among the active consumers. When a consumer is added or removed, it ensures that a minimum number of partitions are shuffled.

      zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
      ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);

      // add cluster
      admin.addCluster(clusterName, true);

      // add state model definition
      StateModelConfigGenerator generator = new StateModelConfigGenerator();
      admin.addStateModelDef(clusterName, "OnlineOffline",
          new StateModelDefinition(generator.generateConfigForOnlineOffline()));

      // add resource "topic" which has 6 partitions
      String resourceName = "rabbitmq-consumer-group";
      admin.addResource(clusterName, resourceName, 6, "OnlineOffline", "AUTO_REBALANCE");

Starting the consumers

The only thing consumers need to know is the zkaddress, cluster name and consumer id. It does not need to know anything else.

   _manager =
          HelixManagerFactory.getZKHelixManager(_clusterName,
                                                _consumerId,
                                                InstanceType.PARTICIPANT,
                                                _zkAddr);

      StateMachineEngine stateMach = _manager.getStateMachineEngine();
      ConsumerStateModelFactory modelFactory =
          new ConsumerStateModelFactory(_consumerId, _mqServer);
      stateMach.registerStateModelFactory("OnlineOffline", modelFactory);

      _manager.connect();

Once the consumer has registered the statemodel and the controller is started, the consumer starts getting callbacks (onBecomeOnlineFromOffline) for the partition it needs to host. All it needs to do as part of the callback is to start consuming messages from the appropriate queue. Similarly, when the controller deallocates a partitions from a consumer, it fires onBecomeOfflineFromOnline for the same partition. As a part of this transition, the consumer will stop consuming from a that queue.

 @Transition(to = "ONLINE", from = "OFFLINE")
  public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
  {
    LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);

    if (_thread == null)
    {
      LOG.debug("Starting ConsumerThread for " + _partition + "...");
      _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
      _thread.start();
      LOG.debug("Starting ConsumerThread for " + _partition + " done");

    }
  }

  @Transition(to = "OFFLINE", from = "ONLINE")
  public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
      throws InterruptedException
  {
    LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);

    if (_thread != null)
    {
      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");

      _thread.interrupt();
      _thread.join(2000);
      _thread = null;
      LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");

    }
  }
Something went wrong with that request. Please try again.