Skip to content

Creating Pipelines

shmurthy62 edited this page Feb 11, 2015 · 1 revision

In this section you will learn how to create a pipeline of stages. Jetstream relies on Spring IOC for pipeline creation - familiarize yourself with Spring IOC. You need to become familiar with this concept and syntax before you attempt to wire your application.

All stages in a Jetstream pipeline are implementations of Jetstream Channel and Processor abstractions which are nothing but Spring beans. Each bean needs to be defined through a declarative syntax that is understood by the Spring Container. Spring Container manages the lifecycle of these beans. Every bean definition is identified by a unique ID which is some meaning full name you assign to the bean, a specification of the bean’s implementation class and a set of properties required to be set by the implementation class.

Creating Pipelines in a Container:

The following example will illustrate how you can create a pipeline with two stages in a Jetstream Application container as shown below.

singlestagepipeline

The first stage in our pipeline is a traffic generator which generates events at a specified rate and outputs the generated events to a second stage which sends the events over the wire to a remote stage hosted in another container.

Let us first examine the class definition for GenericEventGenerator.

public class GenericEventGenerator extends AbstractEventSource implements InitializingBean {

	
	private int rate;

	public void setRate(int rate) {
		this.rate = rate;
	}
}

Next we define a bean definition for this class along with a simple pipeline beginning at this bean and ending at a bean directly connected to it called “OutboundMessageChannel”.

<bean id="EventGenerator" class="com.ebay.jetstream.devtools.GenericEventGenerator">
		<property name="rate" value="1000" />
		<property name="eventSinks" ref="eventgensinks" />
</bean>

<bean id="eventgensinks" class="com.ebay.jetstream.event.EventSinkList">
		<constructor-arg>
			<list>
				
				<ref bean="OutboundMessageChannel" />
				
			</list>
                </constructor-arg>
</bean>

<bean id="outboundMessageChannel"
		class="com.ebay.jetstream.event.channel.messaging.OutboundMessagingChannel"
		depends-on="MessageService">
		<property name="address" ref="outboundMessageChannelAddress" />
</bean>

In this example we have 3 bean definitions. The first bean is named “EventGenerator” and its implementation class is “com.ebay.jetstream.devtools.GenericEventGenerator”. This class has a property named “rate” and “sinkList” from the framework provided class AbstractEventSource. This property has a reference to another bean named “eventgensinks” whose implementation class is “com.ebay.jetstream.event.EventSinkList". This bean holds a list of sinks for the “EventGenerator” bean. In this example “outboundMessageChannel” bean is the sink for “EventGenerator” bean. The “sinkList” property defines the flow of events from “EventGenerator” bean to one or more downstream stages.

The downstream stages in turn will have similar bean definition which is not shown here.

Jetstream framework has EventSink and EventSource abstractions. In order to specify the path of event flow you need to set the “sinkList” property for all event sources in the pipeline specifying a list of event sinks to which events will be forwarded from an event source. Only Inbound Channels and Event Processors can have event sinks as they source events. Outbound Channels do not have any sinks. So far we have shown how one can define a pipeline within a Jetstream Application container.

Creating a Distributed Pipeline:

Pipelines can extend in to another Jetstream Application container as shown below.

twostagepipeline

This requires events to flow from one container to another container hosted either on same node or on separate nodes. This is achieved using Jetstream’s cluster messaging and two framework provided channel implementations viz. ” InboundMessagingChannel” and “OutboundMessagingChannel”. One can also achieve this using Kafka Channels also.

The Pipeline is wired with two framework supplied channel implementations viz. “InboundMessageChannel” and “OutboundMessageChannel”. These channel implementations layer on top of the framework message service using its pub/sub interface to send and receive messages over message topics as shown below.

Channels have a property named “address”. This property is made up of a list of message service topics. Events are published on these topics by the Outbound Message Channel and Inbound Message Channel subscribes to messages published on the same topics.

The following bean definition shows how an InboundMessageChannel and OutboundMessageChannel bean is defined in Spring syntax.

<bean id="InboundMessages"
		class="com.ebay.jetstream.event.channel.messaging.InboundMessagingChannel">
		<property name="address" ref="InboundChannelAddress" />

		<property name="eventSinks">
			<list>
				<!—add sinks here -->
			</list>
		</property>

</bean>
<bean id="InboundChannelAddress"
		class="com.ebay.jetstream.event.channel.messaging.MessagingChannelAddress">
		<property name="channelTopics">
			<list>
				<value>Jetstream.demo/rawEvent</value>

			</list>
		</property>
</bean>

An “InboundMessageChannel” bean can subscribe to zero or more topics. This bean has a property named “address” which has a reference to another bean named “InboundChannelAddress”. The “InboundChannelAddress” bean holds a list of message service topics subscribed to by the “InboundMessages” bean.

<bean id="outboundMessageChannel"
		class="com.ebay.jetstream.event.channel.messaging.OutboundMessagingChannel"
		depends-on="MessageService">
		<property name="address" ref="outboundMessageChannelAddress" />
</bean>

<bean id="outboundMessageChannelAddress"
		class="com.ebay.jetstream.event.channel.messaging.MessagingChannelAddress">
		<property name="channelTopics">
			<list>
				<value>Jetstream.demo/rawEvent</value>
			</list>
		</property>
</bean>

The bean definition shown above is an example of defining an outbound message channel in Spring syntax. This channel will publish an event on all provisioned topics unless the event contains hints to publish on one or more specific topics. If the topics specified through the hints match one of the provisioned topics then the event will be published on that topic.

In summary, the steps for defining the pipeline are the following:

  1. Create an application wiring.xml file for your application and add all static bean definitions to this file along with pipeline definition. You can also spread these bean definitions across multiple files.

  2. In order to set up messaging you need to provision MessageServiceProperties bean and it needs to refer to Zookeeper transport bean. If your application is using either Inbound Message Channel and or Outbound Message Channel, you will have to define a message service bean along with its properties. The convention is to create a file named messagecontext.xml file which would contain the MessageServiceProperties bean definition and zookeeper.xml which would contain the zookeeper transport bean definition.

    You may consider copying this file from another project and modify the context list for Netty transport to save some work as this is the only property that will be different across applications.

  • Create a folder named /buildrc/JetstreamConf under your project folder if one does not exist and drop all the files created in steps 1 & 2.
Clone this wiki locally