Skip to content

AbstractEventProcessor

rmuthupandian edited this page Feb 11, 2015 · 1 revision

The first example shows how to implement an Event Processor by extending AbstractEventProcessor

Inside appwiring.xml:

<bean id="SampleProcessor" class="com.ebay.jetstream.samples.SampleProcessor">
		<property name="config" ref="SampleProcessorConfig" />
		<property name="eventSinkList" ref="sampleprocessorsinks" />		
</bean>
<bean id="SampleProcessorConfig" class="com.ebay.jetstream.samples.SampleProcessorConfig">		
		<property name="x" value="1" />
</bean>    
    
<bean id="sampleprocessorsinks" class="com.ebay.jetstream.event.EventSinkList">
		<property name="sinks">
			<list>
				<ref bean="outboundSampleChannel" />				
			</list>
		</property>
</bean>
package com.ebay.jetstream.samples;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;


import com.ebay.jetstream.config.ContextBeanChangedEvent;
import com.ebay.jetstream.event.EventException;
import com.ebay.jetstream.event.JetstreamEvent;

import com.ebay.jetstream.event.support.AbstractEventProcessor;
import com.ebay.jetstream.event.processor.EventProcessRequest;
import com.ebay.jetstream.event.processor.ratelimiter.RateLimiterProcessorConfig;
import com.ebay.jetstream.spring.beans.factory.BeanChangeAware;
import com.ebay.jetstream.xmlser.XSerializable;

import com.ebay.jetstream.config.ContextBeanChangedEvent;
import com.ebay.jetstream.event.EventException;
import com.ebay.jetstream.event.JetstreamEvent;
import com.ebay.jetstream.event.support.AbstractEventProcessor;
import com.ebay.jetstream.xmlser.XSerializable;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import com.ebay.jetstream.management.Management;

@ManagedResource(objectName = "Event/Processor", description = "SampleProcessor")
public class SampleProcessor extends AbstractEventProcessor implements XSerializable {

        private AtomicBoolean m_isPaused = new AtomicBoolean(false);
        private SampleProcessorConfig m_config;
        

        public void setConfig(SampleProcessorConfig config) {
             m_config = config;
        }

        public SampleProcessorConfig getConfig() {
             return m_config;
        }

	@Override
	public int getPendingEvents() {
		// This method belongs to ShutDownable interface and is
		// invoked by the Jetstream Framework Shutdown Orchestrator
		// when the application receives a termination signal.
		return 0;
	}

	@Override
	public void shutDown() {
		// This method belongs to ShutDownable interface and is
		// invoked by the Jetstream Framework Shutdown Orchestrator
		// when the application receives a termination signal.
		// Use this method as a signal to release all open resources 
		// held by this class.

		
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		// This will be called by spring framework after all properties have been set
                // for this class

                
                 // bind this bean to the stats folder.
                 Management.addBean(getBeanName(), this);		
                              
	}

	@Override
	protected void processApplicationEvent(ApplicationEvent event) {
		if (event instanceof ContextBeanChangedEvent) {
			ContextBeanChangedEvent bcInfo = (ContextBeanChangedEvent) event;
			// Calculate changes
			if (bcInfo.isChangedBean(getConfig())) { // check if your config changed
				SampleProcessorConfig newConfig = (SampleProcessorConfig) bcInfo.getChangedBean(); 
				setConfig(newConfig);
				
			}
		}
		
	}

	@Override
        @ManagedOperation
	public void pause() {

                if (isPaused())
			return;
		
		changeState(ProcessorOperationState.PAUSE);
		
		// handle pause here

               
				
	}

	
	@Override
        @ManagedOperation
	public void resume() {

                // this is a good way to implement resume

		if (!isPaused()) {
		      LOGGER.log(Level.WARNING, getBeanName() + " could not be resumed. It is already in resumed state");
	   	      return;
		}
		
		changeState(ProcessorOperationState.RESUME);
		
		// handle resume here
		
		
	}

	@Override
	public void sendEvent(JetstreamEvent event) throws EventException {

                 if (isPaused())
		{
                        // if you are in paused state throw exception
			throw new Exception("In Paused State");
		}
	
                 // This method is called when event is delivered to this class.
                 incrementEventRecievedCounter(); // do this for pipeline monitoring
	
                 // do your processing here

                 // if you want to then send events to your sinks do the following
  		
		
		try {
			fireSendEvent(event); // this will deliver event to framework for distribution
			incrementEventSentCounter(); // should do this for pipeline monitoring
		} catch (Throwable t) {
			incrementEventDroppedCounter(); // do this for pipeline monitoring
			
		}
                 
		
	}
}
Clone this wiki locally