Skip to content

Creating custom Outbound Channel

shmurthy62 edited this page Feb 11, 2015 · 2 revisions

Next we will show you how to build an OutboundChannel called SampleOutboundChannel

<bean id="SampleOutboundChannelBinder" class="com.ebay.jetstream.event.support.channel.ChannelBinding">
		<property name="channel" ref="outboundSampleChannel" />
	</bean>

<bean id="outboundSampleChannel" class="com.ebay.jetstream.samples.SampleOutboundChannel">
	<property name="address" ref="SampleChannelAddress" />
</bean>

<bean id="SampleChannelAddress" class="com.ebay.jetstream.samples.SampleChannelAddress">
	<property name="addresses">
		<list>
			<value>someaddress</value>
		</list>
	</property>
</bean>
package com.ebay.jetstream.samples;

import com.ebay.jetstream.event.AbstractOutboundChannel ;
import com.ebay.jetstream.config.ContextBeanChangedEvent;
import com.ebay.jetstream.event.EventException;
import com.ebay.jetstream.event.JetstreamEvent;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import com.ebay.jetstream.management.Management;

// The @ManagedResource annotation is needed to signal to management subsystem that this is a managed 
// resource and all stats for this resource is made available under the folder 'Event/Channl'

@ManagedResource(objectName = "Event/Channel", description = "SampleOutboundChannel")
public class SampleOutboundChannel extends AbstractOutboundChannel {

        private SampleChannelAddress m_address;

        public void setAddress(SampleChannelAddress address) {
            m_address = address;
        }

        @Override
	public ChannelAddress getAddress() {
		return m_address;
	}

        @Override
	public void open() throws EventException {
                // This is invoked by the ChannelBinder. This is a signal to 
                // open the pipe. 				

                super.open();
                  
                 // Now add binding to the stats folder
                 Management.addBean(getBeanName(), this);
	}


	@Override
	public void close() throws EventException {
			// This is invoked by the ChannelBinder. This is a signal to 	
                        // to close the channel

                super.close();

                 // Now remove binding with the stats folder
                 Management.removeBeanOrFolder(getBeanName(), this);
	}

	@Override
	public void flush() throws EventException {
		// This is invoked by the ChannelBinder. This is a signal to 
                // flush all queues, will be called before close 			
	}

	

	
      
        @Override
	public void sendEvent(JetstreamEvent event) throws EventException {
	
              // This method is called when event is delivered to this class
              incrementEventRecievedCounter(); // needed for pipeline monitoring
		
		try {

                   // do processing here and forward to downstream interface
                   // outbound channels do not have a sink

		   // we will print it to console
		   System.out.println(event.toString());
		
		   incrementEventSentCounter(); // needed for pipeline monitoring

                 } catch (Throwable t) {

                   // handle exception
                   incrementEventDroppedCounter(); // needed for pipeline monitoring

                 }
              // do processing here and forward to downstream interface
              // outbound channels do not have a sink
			
	}

	@Override
	public int getPendingEvents() {
		// This method belongs to ShutDownable interface and is
		// invoked by the Jetstream Framework Shutdown Orchestrator
		// when the application receives a termination signal.
		// This method must return a count of pending events in its 
                // queues. The orchestrator will wait till this count goes to 0
                // before shutting down the next stage
		return 0;
	}

	@Override
	public void shutDown() {

		// This method belongs to ShutDownable interface and is
		// invoked by the Jetstream Framework Shutdown Orchestrator
		// when the application receives a termination signal.
		// Use this method as a signal to release all open resources 
		// held by this class.

		
	}

	@Override
	public void processApplicationEvent(ApplicationEvent event) {
		if (event instanceof ContextBeanChangedEvent) {

      		
			ContextBeanChangedEvent bcInfo = (ContextBeanChangedEvent) event;

      		// check changes
      		if (bcInfo.isChangedBean(getAddress())) {

       			try {
     
         	 			close();
        			}
        			catch (Throwable e) {
         
         				// handle error

        			}
        			try {
						setAddress((ChannelAddress) bcInfo.getChangedBean());
					} catch (Exception e1) {
						// TODO Auto-generated catch block
						e1.printStackTrace();
					}

        			try {
          				open();
        			}
        			catch (Throwable e) {
          			// handle error

        			}
      		     }
    		}
		
	}


	@Override
	public void afterPropertiesSet() throws Exception {
		// This method belongs to InitializingBean interface and 
                // is invoked by SpringFramework after all setters for this
                // class have been called by Spring.  
		
	}

	// Here we show you how to raise an alarm so upstream stages see you in alarm state
        // and stop sending events to you. This method is shown for pure illustration purposes.
        // You should implement this based on your needs. It is assumed that processState is 
        // called from somewhere in your processing logic.

	public void processState() {
		switch(m_state) {
                    case trouble:
                       getAlarmListener().alarm(ChannelAlarm.OVERRUN);
                       break;
                    case recovered:
                       getAlarmListener().alarm(ChannelAlarm.CLEAR);
                       break;
                    default:
                       break;
		}		
	}

	

}

Address implementation for SampleChannel is same as Inbound Channel. The Spring bean definition and wiring for SampleOutboundChannel is shown below

Clone this wiki locally