Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Build your own SPQR Emitter

Christian Kreutzfeldt edited this page Apr 8, 2015 · 4 revisions

This tutorial will show you how to implement, annotated and deploy your own emitter component. The snippets down below show how to write a simple Kafka writer. You can find the complete listing in our repository.

The simple stuff

Like all component implementations you are requested to provide getter and setter methods for accessing the component identifier and its type. Additionally, each emitter must provide a getter for reading the total number of processed messages.

Lifecycle

Each component provides two lifecycle methods that are - according to their name - invoked on component initialization and during its shutdown.

initialize(Properties)

The initialize method does .. what it's name says: it may be used to initialize the component instance. It receives all key/value pairs provided to the component configuration which lives inside a pipeline definition. In this case, we parse a value from the properties that set the number of values to generate. Additionally, the caller may provide an optional seed applied to the random number generator.

public void initialize(Properties properties) throws RequiredInputMissingException, 
   ComponentInitializationFailedException {

  /////////////////////////////////////////////////////////////////////////
  // input extraction and validation
  if(properties == null)
    throw new RequiredInputMissingException("Missing required properties");
		
  clientId = StringUtils.lowerCase(StringUtils.trim(properties.getProperty(CFG_OPT_KAFKA_CLIENT_ID)));
  if(StringUtils.isBlank(clientId))
    throw new RequiredInputMissingException("Missing required kafka client id");
		
  zookeeperConnect = StringUtils.lowerCase(StringUtils.trim(
    properties.getProperty(CFG_OPT_ZOOKEEPER_CONNECT)));
  if(StringUtils.isBlank(zookeeperConnect))
    throw new RequiredInputMissingException("Missing required zookeeper connect");
		
  topicId = StringUtils.lowerCase(StringUtils.trim(properties.getProperty(CFG_OPT_TOPIC_ID)));
  if(StringUtils.isBlank(topicId))
    throw new RequiredInputMissingException("Missing required topic");

  brokerList = StringUtils.lowerCase(StringUtils.trim(properties.getProperty(CFG_OPT_BROKER_LIST)));
  if(StringUtils.isBlank(brokerList))
    throw new RequiredInputMissingException("Missing required broker list");
		
  String charsetName = StringUtils.trim(properties.getProperty(CFG_OPT_CHARSET));
  try {
    if(StringUtils.isNotBlank(charsetName))
      charset = Charset.forName(charsetName);
    else
      charset = Charset.forName("UTF-8");
  } catch(Exception e) {
     throw new RequiredInputMissingException("Invalid character set provided: " + 
       charsetName + ". Error: " + e.getMessage());
  }
		
  messageAcking = StringUtils.equalsIgnoreCase(StringUtils.trim(
    properties.getProperty(CFG_OPT_MESSAGE_ACKING)), "true");
  //
  /////////////////////////////////////////////////////////////////////////

  if(logger.isDebugEnabled())
    logger.debug("kafka emitter[id="+this.id+", client="+clientId+", topic="+topicId+", 
      broker="+brokerList+", zookeeper="+zookeeperConnect+", charset="+charset.name()+", 
      messageAck="+messageAcking+"]");		
		
  // initialize the producer only if it is not already exist --- typically assigned through test case!
  if(kafkaProducer == null) { 
    Properties props = new Properties();		
    props.put(CFG_ZK_CONNECT, zookeeperConnect);
    props.put(CFG_BROKER_LIST, brokerList);
    props.put(CFG_REQUEST_REQUIRED_ACKS, (messageAcking ? "1" : "0"));
    props.put(CFG_CLIENT_ID, clientId);			
    this.kafkaProducer = new Producer<>(new ProducerConfig(props));
  }
}

shutdown

The shutdown method is invoked by the surrounding micro pipeline. Typically this happens when the micro pipeline itself is shut down. But in case the micro pipeline tries to handle error situations on its own it may shut down and restart selected components on its own.

When the shutdown method get triggered it must ensure that all consumed resources are freed and notified about the shutdown.

In the case of our kafka emitter, the shutdown method looks like this:

public boolean shutdown() {
  if(this.kafkaProducer != null) {
    try {
      kafkaProducer.close();
    } catch(Exception e) {
      logger.error("Failed to close kafka producer [id="+id+"]. Reason: "+e.getMessage());
    }
  }
  return true;
}

Data Export

Each emitter is executed within a surrounding thread which hands over incoming messages via Emitter#onMessage(StreamingDataMessage):

public boolean onMessage(StreamingDataMessage message) {
  if(message != null && message.getBody() != null && message.getBody().length > 0) {
    this.kafkaProducer.send(new KeyedMessage<byte[], byte[]>(this.topicId, message.getBody()));
    this.messageCounter++;
  }
  return true;
}
Clone this wiki locally