Skip to content

Creating Custom Inbound Channel

shmurthy62 edited this page Feb 12, 2015 · 5 revisions

We will start with showing you how to build an InboundChannel called SampleInboundChannel. All InboundChannel implementations must extend a framework provides class named AbstractInboundChannel. Follow along the code attached below. The comments tell you all you need to know.

The Spring Bean definition and wiring for these two beans is shown below

<bean id="SampleInboundChannelBinder" scope="singleton"
	class="com.ebay.jetstream.event.support.channel.ChannelBinding">
	<property name="channel" ref="sampleinboundchannel" />
</bean>

<bean id="sampleinboundchannel"
		class="com.ebay.jetstream.samples.SampleChannel">
	<property name="address" ref="SampleChannelAddress" />
	<property name="eventSinkList" ref="sampleinboundchannelsinks" />
</bean>

<bean id="sampleinboundchannelsinks" class="com.ebay.jetstream.event.EventSinkList">
	<property name="sinks">
		<list>				
			<ref bean="somebeanthatisasink" />		
		</list>
	</property>
</bean>	

<bean id="SampleChannelAddress"	class="com.ebay.jetstream.samples.SampleChannelAddress">
	<property name="addresses">
		<list>
			<value>someaddress</value>
		</list>
	</property>
</bean>
package com.ebay.jetstream.samples;

import com.ebay.jetstream.event.channel.AbstractInboundChannel;
import com.ebay.jetstream.config.ContextBeanChangedEvent;
import com.ebay.jetstream.event.EventException;
import com.ebay.jetstream.event.JetstreamEvent;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import com.ebay.jetstream.management.Management;

// The @ManagedResource annotation is needed to signal to management subsystem that this is a managed 
// resource and all stats for this resource is made available under the folder 'Event/Channl'

@ManagedResource(objectName = "Event/Channel", description = "SampleInboundChannel")
public class SampleInboundChannel extends AbstractInboundChannel {

        private ChannelAddress m_address;

        public void setAddress(ChannelAddress address) {
            m_address = address;
        }

        @Override
	public ChannelAddress getAddress() {
		
		return m_address; 
	}


	@Override
	public void flush() throws EventException {
		// add any flush logic here – will get called before close()
		
	}

         @Override
	public void setAuthenticator(ChannelAuthenticator authenticator) {
		// This is needed only if you want to inject an authenticator to your
                  // channel to authenticate callers. Expectation is you have an impementation
                  // of an authenticator
		
	}

        @Override
	public ChannelAuthenticator getAuthenticator() {
		// Implement this only if the channel enforces security
		return null; 
	}
	
        @Override
	public void close() throws EventException {
		// Channel Binder will issue this call to close the pipe.	

                 super.close();
                
                 // Now remove binding with the stats folder
                 Management.removeBeanOrFolder(getBeanName(), this);
	      
	}

	@Override
	public void open() throws EventException {


		// This is invoked by the ChannelBinder. This is a signal to 
                // open the pipe. 

                super.open();
		
                 // bind this bean to the stats folder /Event/Channel
                 Management.addBean(getBeanName(), this);
	}

	@Override
	public int getPendingEvents() {
		// This method belongs to ShutDownable interface and is
		// invoked by the Jetstream Framework Shutdown Orchestrator
		// when the application receives a termination signal.
		// This method must return a count of pending events in its 
                // queues. The orchestrator will wait till this count goes to 0
                // before shutting down the next stage
		return 0; // The returned value must be the pending events held
			    // in internal queues
	}

	@Override
	public void shutDown() {

		// This method belongs to ShutDownable interface and is
		// invoked by the Jetstream Framework Shutdown Orchestrator
		// when the application receives a termination signal.
		// Use this method as a signal to release all open resources 
		// held by this class.
		
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		// This method belongs to InitializingBean interface and 
                // is invoked by SpringFramework after all setters for this
                // class have been called by Spring.  
	}

	@Override
	protected void processApplicationEvent(ApplicationEvent event) {
		// This is an abstract method in AbstractEventSource
		// this is invoked by AbstractEventSource when it receives
		// an application event from Spring framework. Application
		// Events are issued when there is a change in config
		// you might do something like this to check if it is meant for 
                // this bean instance.
		
		if (event instanceof ContextBeanChangedEvent) {

      		ContextBeanChangedEvent bcInfo = (ContextBeanChangedEvent) event;

      		// check changes
      		if (bcInfo.isChangedBean(getAddress())) {
       			try {
         	 			close();
        			}
        			catch (Throwable e) {
         
         				// handle error
        			}
        			setAddress((ChannelAddress) bcInfo.getChangedBean());

        			try {
          				open();
        			}
        			catch (Throwable e) {
          			// handle error

        			}
      		     }
    		}
		
	}

	@Override
        @ManagedOperation
	public void pause() {
		// This is called when a downstream stage raises a pause. This is an indication to
                // not send events to downstream any more. 
		
                if (isPaused())
			return;
		changeState(ChannelOperationState.PAUSE);
		
		// do needful to handle pause signal here
	}

	
	@Override
        @ManagedOperation
	public void resume() {
		// This is called when a downstream stage raises a resume. This is an indication to
                // start sending events to downstream stages. 
		
		if (!isPaused()) {
			return; // nothing to do
		}
		
		changeState(ChannelOperationState.RESUME);
		
		// do needful to handle resume signal here
	}


        // implement your upstream handler from where you will be receiving events here.
        // This is custom to your class. It is only shown here for illustration. However
        // it shows how to forward the event to the framework which will distribute it to
        // downstream components.

         public void onReceivingEvent() {

               incrementEventRecievedCounter(); // do this before anything else so pipeline can
                                                // be monitored correctly.
		
	        // next transform your data in to a JetstreamEvent
                			
		event = new JetstreamEvent(map); // assuming your data is in a map
		
		setLastEvent(event); // do this so we can see the last event in Monitoring page
                                     // useful for debugging.
		try {
			fireSendEvent(event);  // call this framework method to forward event to framework
			incrementEventSentCounter(); // do this for pipeline monitoring
		} catch (Throwable t) {
			// handle error here
			incrementEventDroppedCounter(); // do this for pipeline monitoring
		}
         }

}

Address

Next implement an Address implementation for SampleChannel

package com.ebay.jetstream.samples;

import java.util.concurrent.CopyOnWriteArrayList;
import com.ebay.jetstream.event.channel.ChannelAddress;

public class SampleChannelAddress extends ChannelAddress {
	
	private CopyOnWriteArrayList<String> m_addresses = new CopyOnWriteArrayList<String>();
	  
	  
	public SampleChannelAddress() {

	}

	public void setAddresses(List<String> addresses) {
	    m_addresses.clear();
	    m_addresses.addAll(addresses);
	}
	  
	public List<String> getAddresses() {
		  return Collections.unmodifiableList(m_addresses);
	}
}
Clone this wiki locally