Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Monitoring

Christian Kreutzfeldt edited this page May 27, 2015 · 37 revisions

Tracking and monitoring the behavior of distributed stream processing applications is one of the real challenges aside from their implementation. To prevent devops and ops teams from operating a SPQR infrastructure blindfolded the framework provides tunable monitoring features which support in-depth as well as high-level overview on different layers.

Basic requirements / setup

Aside from any type of metrics gathered from SPQR components (infrastructure as well as pipelines) a unified layer is provided to receive and transport monitoring data. Technically spoken this is implemented by specialized topics on the default communication layer (eg. Apache Kafka) that are accessible from any processing node and the resource manager.

Monitoring Setup (Overview)

A data processor attached to the communication layer reads out all monitoring events and handles them according to its configuration, eg. simple forward to Graphite.

Albeit each processing node is capable of forwarding its own monitoring data directly to a centralized handler (eg. Graphite), the framework requires all components to ingest their monitoring data into a central transport mechanism. Through this approach modifications on how monitoring data is processed is more centralized and by far easier to handle - instead of touching each processing node on changes.

Monitoring layers

To provide a full view on all aspects of a SPQR setup the monitoring differentiates between two layers:

  • infrastructure (technical view)
  • pipelines (functional view)

Infrastructure monitoring

This layer focuses on monitoring infrastructure components like

  • operating system (cpu, memory, disk, network, ...)
  • JVM

All infrastructure monitoring features are turned on by default and must be deactivated explicitly as the provide a first indicator for any type of issues.

Each processing node continuously collects monitoring data, adds node specific identifiers to differentiate them among other monitoring data and forwards them into the transportation layer.

Pipeline monitoring

This layer focuses on monitoring components that contribute to pipelines running on top of SQPR like

  • number of messages inside a pipeline
  • component specific processing duration
  • arrival rate
  • ...

Implementation

Instead of re-inventing the wheel and to keep things simple, SPQR utilizes the metrics library provided by dropwizard to gather metrics on all layers mentioned above.

The following paragraphs describe for each component type what kind of metrics are gathered and how this is implemented.

Note: although the metric instances could be provided by component constructors, they are attached to them through setters. This way it is possible to attach and detach the metrics during runtime, turning them on and off as required.

Queue

Each queue provides these metrics:

  • number of inserted into a queue (measured between to collection cycles)
  • number of elements consumed from a queue (measured between to collection cycles)

To export these values, the queue interface provides two setter methods for attaching counters to track the number of messages inserted to and retrieved from a queue.

As both, the queue consumer and the queue producer, may be fully detached from the underlying queue they are provided a setter for their counter as well.

On each message insertion and each message retrieval the associated counter is incremented:

/**
 * @see StreamingMessageQueueConsumer#next()
 */
public StreamingDataMessage next() {
		
  // check if a new message is available and read it from chronicle if possible
  if(queueReader.nextIndex()) {
    long timestamp = queueReader.readLong();
    int bytes = queueReader.readInt();
    byte[] body = new byte[bytes];
    queueReader.read(body);
    queueReader.finish();
	
    if(this.messageRetrievalCounter != null)
      this.messageRetrievalCounter.inc();
    
    return new StreamingDataMessage(body, timestamp);
  }
  // otherwise return null;
  return null;
}
/**
 * @see StreamingMessageQueueProducer#insert(StreamingDataMessage)
 */
public boolean insert(StreamingDataMessage message) {

  if(message != null) {
    synchronized (queueProducer) {
      queueProducer.startExcerpt();
      queueProducer.writeLong(message.getTimestamp());
      queueProducer.writeInt(message.getBody().length);
      queueProducer.write(message.getBody());
      queueProducer.finish();
 
      if(this.messageInsertionCounter != null)
        this.messageInsertionCounter.inc();
			
      return true;
    }
  }
  return false;
}

To enable metrics on queue extend the configuration by adding attachRetrievalCounter and attachInsertionCounter:

{
  "id" : "webtrends-to-kafka",
  "queues" : [ {
    "id" : "webtrends-content",
    "attachInsertionCounter": true,
    "attachRetrievalCounter": true,
    "queueSettings" : null
  } ],
  "components" : [ { ... } 
  ]
}

Source

Each source provides these metrics: (gathered by the surrounding source runtime environment)

  • number of processed messages (measured between to collection cycles)

To measure the number of processed messages between to collection cycles, the source environment receives a counter which is provided through the associated setter method.

If a counter is attached (may it be during initialization or later) the environment increments it for each message received from the source:

public void onMessage(StreamingDataMessage message) {
  this.queueProducer.insert(message);
  this.queueProducer.getWaitStrategy().forceLockRelease();
  
  if(this.messageCounter != null)
    this.messageCounter.inc();
}

To enable metrics on sources extend the configuration by adding attachMessageCounter:

{
  "id" : "webtrends-to-kafka",
  "queues" : [ { ... } ],
  "components" : [ {
    "id" : "webtrends-stream-reader",
    "type" : "SOURCE",
    "name" : "webtrendsSource",
    "version" : "0.0.1",
    "attachMessageCounter": true,
    "settings" : { ...
    },
    ...
  },

Direct Response Operator

Each direct response operator provides these metrics: (gathered by the surrounding direct response operator runtime environment)

  • number of processed messages (measured between to collection cycles)
  • message processing duration

To measure the number of messages and their processing duration the direct response operator environment receives a counter and a timer. Both are provided through the associated setter methods.

If one them is attached the environment gathers values for it:

public void run() {
  while(running) {
    try {				
      StreamingDataMessage message = this.consumerQueueWaitStrategy.waitFor(this.queueConsumer);
      if(message != null && message.getBody() != null) {
        Timer.Context timerContext = (this.messageProcessingTimer != null ? 
                          this.messageProcessingTimer.time() : null);

        StreamingDataMessage[] responseMessages = this.directResponseOperator.onMessage(message);
        .
        .
        .		
        if(timerContext != null)
          timerContext.stop();
        
        if(this.messageCounter != null)
          this.messageCounter.inc();
      }
    } catch(InterruptedException e) {  
    } catch(Exception e) {
    }
  }		
}	

To enable metrics on direct response operators extend the configuration by adding attachMessageCounter and attachProcessingTimer:

{
  "id" : "webtrends-to-kafka",
  "queues" : [ { ... } ],
  "components" : [ {
    "id" : "webtrends-stream-filter",
    "type" : "DIRECT_RESPONSE_OPERATOR",
    "name" : "jsonContentFilter",
    "version" : "0.0.1",
    "attachMessageCounter": true,
    "attachProcessingTimer": true,
    "settings" : { ...
    },
    ...
  },

Delayed Response Operator

Each delayed response operator provides these metrics: (gathered by the surrounding delayed response operator runtime environment)

  • number of processed messages (measured between to collection cycles)

As the processing duration is somehow fixed for the this operator type, it measures only the number of received messages - compared to the direct response operator. In order to count messages between two collection cycles it receives a counter through the associated setter method.

If the counter is attached the environment gathers values for it:

public void run() {
  while(running) {
    try {
      StreamingDataMessage message = this.consumerQueueWaitStrategy.waitFor(this.queueConsumer); 
      if(message != null && message.getBody() != null) {
        this.delayedResponseOperator.onMessage(message);
        this.responseWaitStrategy.onMessage(message);
					
        if(this.messageCounter != null)
          this.messageCounter.inc();
      }
    } catch(InterruptedException e) {
    } catch(Exception e) {
    }
  }
}

To enable metrics on delayed response operators extend the configuration by adding attachMessageCounter

{
  "id" : "webtrends-to-kafka",
  "queues" : [ { ... } ],
  "components" : [ {
    "id" : "webtrends-stream-aggregator",
    "type" : "DELAYED_RESPONSE_OPERATOR",
    "name" : "jsonContentAggregator",
    "version" : "0.0.1",
    "attachMessageCounter": true,
    "settings" : { ...
    },
    ...
  },

Emitter

Each emitter provides these metrics: (gathered by the surrounding emitter runtime environment)

  • number of received messages (measured between to collection cycles)
  • message emit duration

To measure the number of messages and their emit duration the emitter environment receives a counter and a timer. Both are provided through the associated setter methods.

If one them is attached the environment gathers values for it:

public void run() {
  StreamingMessageQueueWaitStrategy queueWaitStrategy = this.queueConsumer.getWaitStrategy();
  while(running) {
    try {
      StreamingDataMessage message = queueWaitStrategy.waitFor(this.queueConsumer);
      if(message != null && message.getBody() != null) {
        Timer.Context timerContext = (this.messageEmitDurationTimer != null ? 
              this.messageEmitDurationTimer.time() : null);

        this.emitter.onMessage(message);
					
        if(timerContext != null)
          timerContext.stop();

        if(this.messageCounter != null)
          this.messageCounter.inc();
      } 
    } catch(InterruptedException e) {
    } catch(Exception e) {
    }
  }		
}

To enable metrics on emitters extend the configuration by adding attachMessageCounter and attachProcessingTimer:

{
  "id" : "webtrends-to-kafka",
  "queues" : [ { ... } ],
  "components" : [ {
    "id" : "kafka-emitter",
    "type" : "EMITTER",
    "name" : "kafkaEmitter",
    "version" : "0.0.1",
    "attachMessageCounter": true,
    "attachProcessingTimer": true,
    "settings" : { ...
    },
    ...
  },

Reporter Activation (on pipeline)

To activate the configured metrics collection, they need to be reported to a destination:

{
  "id" : "webtrends-to-kafka",
  "queues" : [ {
    "id" : "webtrends-content",
    "queueSettings" : null
  } ],
  "metricsReporter":[{
     "id":"kafka-export",
     "type":"KAFKA",
     "period":1,
     "settings":{
         "zookeeperConnect":"localhost:2181",
         "brokerList":"localhost:9092",
         "topicId":"testTopic",
         "clientId":"testClient"
     }
  }]
  ....

This configuration exports metrics (components and queues) to kafka with a period of 1 second.

Processing Node / Operating System Layer

Aside from pipeline component monitoring the framework supports gathering metrics from the underlying server implementation. These include metrics on

  • class loading
  • file descriptors
  • garbage collection
  • memory usage
  • thread state

To enable metrics on this layer, insert the following snippet to spqr-node.cfg underneath the spqrNode element:

spqrNode:
  ...
  spqrMetrics:
    attachMemoryUsageMetricCollector: true
    attachFileDescriptorMetricCollector: true
    attachClassLoadingMetricCollector: true
    attachGCMetricCollector: true
    attachThreadStateMetricCollector: true
    metricsReporter:
    - id: "kafka"
      type: "KAFKA"
      period: 5
      settings:
        zookeeperConnect: "localhost:2181"
        brokerList: "localhost:9092"
        topicId: "nodemetrics"
        clientId: "sampleClient"  

The snippet enables all five collectors and activates a reporter which exports all metrics to configured Kafka topic.

Monitoring data processing

Clone this wiki locally