Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Build your own SPQR Operator (delayed response)

Alexander Kolb edited this page Apr 13, 2015 · 10 revisions

This tutorial will show you how to implement, annotate and deploy your own delayed response operator component. The snippets down below show how to write a simple content aggregator. You can find the complete listing in our repository.

The

The simple stuff

Like all component implementations you are requested to provide getter and setter methods for accessing the component identifier and its type. Additionally, each operator must provide a getter for reading the total number of processed messages.

The delayed response operator adds another method which returns the number of messages processed since the last result retrieval. This value is required by wait strategies based upon message count.

Lifecycle

Each component provides two lifecycle methods that are - according to their name - invoked on component initialization and during its shutdown.

initialize(Properties)

The initialize method does .. what it's name says: it may be used to initialize the component instance. It receives all key/value pairs provided to the component configuration which lives inside a pipeline definition.

In this case we retrieve the paths to aggregate content from:

public void initialize(Properties properties) throws RequiredInputMissingException,
    ComponentInitializationFailedException {
		
  /////////////////////////////////////////////////////////////////////////////////////
  // assign and validate properties
  if(StringUtils.isBlank(this.id))
    throw new RequiredInputMissingException("Missing required component identifier");
		
  this.pipelineId = StringUtils.trim(properties.getProperty(CFG_PIPELINE_ID));		
  this.documentType = StringUtils.trim(properties.getProperty(CFG_DOCUMENT_TYPE));
  if(StringUtils.equalsIgnoreCase(properties.getProperty(CFG_FORWARD_RAW_DATA), "false"))
    this.storeForwardRawData = false;
	
  for(int i = 1; i < Integer.MAX_VALUE; i++) {
    String name = properties.getProperty(CFG_FIELD_PREFIX + i + ".name");
    if(StringUtils.isBlank(name))
      break;
    String path = properties.getProperty(CFG_FIELD_PREFIX + i + ".path");
    String valueType = properties.getProperty(CFG_FIELD_PREFIX + i + ".type");
    this.fields.add(new JsonContentAggregatorFieldSetting(
      name, path.split("\\."), 
      StringUtils.equalsIgnoreCase("STRING", valueType) ? 
           JsonContentType.STRING : JsonContentType.NUMERICAL));
  }
  /////////////////////////////////////////////////////////////////////////////////////
		
  if(logger.isDebugEnabled())
    logger.debug("json content aggregator [id="+id+"] initialized");
}

shutdown

The shutdown method is invoked by the surrounding micro pipeline. Typically this happens when the micro pipeline itself is shut down. But in case the micro pipeline tries to handle error situations on its own it may shut down and restart selected components on its own.

When the shutdown method get triggered it must ensure that all consumed resources are freed and notified about the shutdown.

In the case of our content aggregator there are no resources to free and thus the shutdown method looks as follows

public boolean shutdown() {
  return true;
}

Message processing

The operator is invoked for each incoming message, from which it extracts all values, that are referenced inside the previously provided configuration. To extract values it provides two methods:

protected String getTextFieldValue(final JsonNode jsonNode, final String[] fieldPath) {
  int fieldAccessStep = 0;
  JsonNode contentNode = jsonNode;
  while(fieldAccessStep < fieldPath.length) {
    contentNode = contentNode.get(fieldPath[fieldAccessStep]);
    fieldAccessStep++;
  }	
  return contentNode.textValue();
}

protected long getNumericalFieldValue(final JsonNode jsonNode, final String[] fieldPath) {
  int fieldAccessStep = 0;
  JsonNode contentNode = jsonNode;
  while(fieldAccessStep < fieldPath.length) {
    contentNode = contentNode.get(fieldPath[fieldAccessStep]);
    fieldAccessStep++;
  }	
  return contentNode.asLong();
}

Next to content extraction, the operator aggregates values for either a number of messages or a given timespan. To store values found so far, it holds a variable which serves as temporary storage:

public class JsonContentAggregator implements DelayedResponseOperator {
   /** result document - reset after specified duration */
  private JsonContentAggregatorResult resultDocument = null;
  ...  
}

For each message, the onMessage method is invoked. It reads out values for all configured paths and creates aggregates according to its configuration. The result is temporarily stored in the result document shown above.

For all string values, the aggregator simply counts the occurrences, for integer values, it sums values and identifies min. and max. values over a given timespan (or number of messages).

The implementation looks as follows:

public void onMessage(StreamingDataMessage message) {		
  this.messageCount++;
  this.messagesSinceLastResult++;
		
  // do nothing if either the event or the body is empty
  if(message == null || message.getBody() == null || message.getBody().length < 1)
    return;
		
  JsonNode jsonNode = null;
  try {
    jsonNode = jsonMapper.readTree(message.getBody());
  } catch(IOException e) {
    logger.error("Failed to read message body to json node. Ignoring message. Error: " + e.getMessage());
  }
		
  // return null in case the message could not be parsed into 
  // an object representation - the underlying processor does
  // not forward any NULL messages
  if(jsonNode == null)
    return;
		
  // initialize the result document if not already done
  if(this.resultDocument == null)
    this.resultDocument = new JsonContentAggregatorResult(this.pipelineId, this.documentType);
	
  Map<String, Object> rawData = new HashMap<>();
  // step through fields considered to be relevant, extract values and apply aggregation function
  for(final JsonContentAggregatorFieldSetting fieldSettings : fields) {
    // switch between string and numerical field values
    // string values may be counted only
    // numerical field values must be summed, min and max computed AND counted 
	
    // string values may be counted only
    if(fieldSettings.getValueType() == JsonContentType.STRING) {
      try {
        // read value into string representation and add it to raw data dump
	String value = getTextFieldValue(jsonNode, fieldSettings.getPath());
	if(storeForwardRawData)
	  rawData.put(fieldSettings.getField(), value);
					
	// count occurrences of value
	try {
	  this.resultDocument.incAggregatedValue(fieldSettings.getField(), value, 1);
	} catch (RequiredInputMissingException e) {
	  logger.error("Field '"+fieldSettings.getField()+"' not found in event. Ignoring value. Error: " +e.getMessage());
        }
      } catch(Exception e) {
      }
    } else if(fieldSettings.getValueType() == JsonContentType.NUMERICAL) {			
      try {
        // read value into numerical representation and add it to raw data map
	long value = getNumericalFieldValue(jsonNode, fieldSettings.getPath());
	if(storeForwardRawData)
	  rawData.put(fieldSettings.getField(), value);
			
	// compute min, max and sum and add these values to result document
	try {
	  this.resultDocument.evalMinAggregatedValue(fieldSettings.getField(), "min", value);
	  this.resultDocument.evalMaxAggregatedValue(fieldSettings.getField(), "max", value);
	  this.resultDocument.incAggregatedValue(fieldSettings.getField(), "sum", value);
	} catch(RequiredInputMissingException e) {
	  logger.error("Field '"+fieldSettings.getField()+"' not found in event. Ignoring value. Error: " +e.getMessage());
	}				
      } catch(Exception e) {
      }
    }			
  }
	
  // add raw data to document
  if(storeForwardRawData)
    this.resultDocument.addRawData(rawData);
}

Compared to direct response operators these operators do not return any value but values are requested by the surrounding runtime environment. The request interval depends on the selected wait strategy and its configuration, eg. after n messages or x seconds. Therefore the implementation provides an access method to its aggregated content:

public StreamingDataMessage[] getResult() {
  this.messagesSinceLastResult = 0;		
  StreamingDataMessage message = null;
  try {			
    message = new StreamingDataMessage(
        jsonMapper.writeValueAsBytes(this.resultDocument), System.currentTimeMillis());
  } catch (JsonProcessingException e) {
    logger.error("Failed to convert result document into JSON");
  }
  this.resultDocument = new JsonContentAggregatorResult(this.pipelineId, this.documentType);
  return new StreamingDataMessage[]{message};
}
Clone this wiki locally