-
Notifications
You must be signed in to change notification settings - Fork 31
Batch Event Processing
Jetstream provides a whole set of interfaces and classes for batch event processing. It not only supports events processing in batches, but also defined generic operations and metadata for the processing. Here is the class diagram for batch processing.
BatchEventSink is a basic interface designed for batch event processing. It extends EventSink, so it can be connected to any EventSource. It provides the method for the user to send events in batches.
void sendEvents(Collection<JetstreamEvent> events, EventMetaInfo meta) throws EventException;
This method receives a list of Jetstream events and an EventMetaInfo. The EventMetaInfo is designed for the communication between the EventSource and BatchEventSink. The EventMetaInfo contains the following information:
- BatchSourceCommand: it's an enum set by the EventSource, telling the BatchEventSink what action it should do. Currently there are 5 commands which can just be mapped to the methods of AbstractBatchEventProcessor.
- BatchSource: the source information passed from the EventSource to the BatchEventSink during the processing of 1 batch. The default definition will be described later. Users can extends the class to include their own source information.
- BatchReponse: the response sent from the BatchEventSink back to the EventSource during the processing of 1 batch. The default definition will be described later. Users can extends the class to include more information.
Usually when an user wants to create a processor which supports batch event processing, he doesn't need to implement BatchEventSink directly. The AbstractBatchEventProcessor (which will be introduced later) is a template which provides more clear and straightforward interface. But the user can also implement BatchEventSink directly in these use cases:
- The logic is very simple. The information provided by the EventMetaInfo is not needed.
- The user wants to create a pass-through processor. He just want to pass the events and the meta info to the downstream sinks, with very little addtional logics or even no addtional logics.
The BatchSource is source information that sent from the EventSource to it's downstream processors. The default implementation of BatchSource contains the following properties.
- topic: string, the Jetstream topic.
- partition: Object, the generic partition of the patch source. A typical example would be Kafka partition
- headOffset: long, the head offset of the batch stream in each partition. A typical example would be Kafka offset.
The BatchResponse contains the response information that sent from the batch processor to it's upstream EventSource. The default implementation of BatchResponse contains the following properties.
- BatchEventSinkRequest: enum, the next action which the BatchEventSink requests the upstream EventSource to do. Currently there are 3
- GetNextBatch: Normally get the next batch. No special addtional actions
- AdvanceAndGetNextBatch: First advance the offset to the offset store. Then get the next batch.
- RevertAndGetNextBatch: First revert the offset to the last saved one from the offset store (or from the one that specified in the 'offset' parameter). Then get the next batch.
-
offset: long, user can specify an offset override the current offset of one partition. So that when it's get the next batch it could use the specified offset.
If the action is 'AdvanceAndGetNextBatch', it can store the specified offset to the offset store first.
If the action is 'RevertAndGetNextBatch', leave this value empty will let the EventSource revert to the last saved offset, while setting this value will make the action just the same as GetNextBatch with an offset.
There is an useful case of setting the offset. If the processor need to drop a batch because it's not the time to send the batch. It can tell the source to resend this batch a while later, but specifying the head offset of the dropped batch. - waitTimeInMs: long, if the processor need the EventSource to wait for a while before sending the next batch. It could set this value to > 0.
- batchSizeBytes: long, user can specify the batch size of the next batch.
The AbstractBatchEventSource is a batch-like implementation of AbstractEventSource. It's for the user to create an event source that supports batch event processing. It can be connected to one or more BatchEventSinks. And it provides the method fireSendEvents() for the subclass to call, which accepts exactly the same parameters as the BatchEventSink. It just dispatch the events and metadata to it's downstream BatchEventSinks.
This is a batch-like implementation of the AbstractInboundChannel. The difference with the AbstractInboundChannel is that it provides some batch-like counters. If the user need to create an inbound channel which supports batch event processing, he should extends this class.
InboundKafkaChannel is the inbound channel which loads data from Kafka. Detail descriptions can be found here
The AbstractBatchEventProcessor is a template class for the user to create an event processor more easily. It implement the sendEvents() of BatchEventSink and dispatch the action to 5 different methods.
public abstract BatchResponse onNextBatch(BatchSource source, List<JetstreamEvent> events) throws EventException;
public abstract void onBatchProcessed(BatchSource source);
public abstract BatchResponse onException(BatchSource source, Exception ex);
public abstract BatchResponse onIdle(BatchSource source);
public abstract BatchResponse onStreamTermination(BatchSource source) throws EventException;
These 5 operations are typical operations during batch processing.
- onNextBatch: It's invoked when the EventSource finishes receiving one batch of events. The EventSource passes the event list and the BatchSource to this method. The processor can determine what to do next via returning corresponding BatchResponse.
- onBatchProcessed: It's invoked when the EventSource finishes advancing offset. The EventSource notifies the processor and then the processor can do some relevant processing about offset advancement. It's only an notice so returns void. With this method, the processor can choose to advance offset before or after the EventSource's advancing.
- onException: It's invoked when the EventSource meets some problem so that it needs to notify the downstream processor.
- onIdle: It's invoked when there is no events from the same BatchSource getting into the EventSource for a long time. The EventSource will notify the processor in this case. So that the processor can take some action, like close the stream.
- onStreamTermination: It's invoked when the EventSource stopped consuming one kind of stream. E.g. when the task is transferred to other nodes in the cluster.
Note: When an user extends the AbstractBatchEventProcessor. He only needs to implement these 5 abstract methods. He doens't need to override the sendEvents as it's already implemented in the super class.
[SimpleKafkaProcessor] (https://github.com/pulsarIO/Jetstream/blob/master/jetstream-messaging/src/main/java/com/ebay/jetstream/event/processor/kafka/SimpleKafkaProcessor.java) is another reference implementation of AbstractBatchEventProcessor. It provides a simple way to consume events which doesn't care about offset handling. For example, it's used by KafkaReplayer. Detail descriptions of the processor can be found [here] (../wiki/Kafka-to-Jetstream-:-How-it-works#simplekafkaprocessor).