Skip to content
Browse files

Get serialization of events off main processing thread

  • Loading branch information...
1 parent cf685ab commit 49dc481a73afea8f69d6b9e429d253ae85b5468e Bruce Robbins committed Nov 17, 2011
Showing with 8 additions and 9 deletions.
  1. +8 −9 s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
View
17 s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
@@ -115,8 +115,7 @@ public void emit(int partitionId, EventWrapper eventWrapper) {
}
try {
- byte[] rawMessage = serDeser.serialize(eventWrapper);
- MessageHolder mh = new MessageHolder(partitionId, rawMessage);
+ MessageHolder mh = new MessageHolder(partitionId, eventWrapper);
queueMessage(mh);
} catch (RuntimeException rte) {
if (monitor != null) {
@@ -197,7 +196,7 @@ public void run() {
isSent = false;
try {
MessageHolder mh = messageQueue.take();
- byte[] rawMessage = mh.getRawMessage();
+ byte[] rawMessage = serDeser.serialize(mh.getEventWrapper());
if (listener == null) {
isSent = sender.send(rawMessage);
} else {
@@ -244,19 +243,19 @@ public void run() {
class MessageHolder {
private int partitionId;
- private byte[] rawMessage;
-
- MessageHolder(int partitionId, byte[] rawMessage) {
+ private EventWrapper eventWrapper;
+
+ MessageHolder(int partitionId, EventWrapper eventWrapper) {
this.partitionId = partitionId;
- this.rawMessage = rawMessage;
+ this.eventWrapper = eventWrapper;
}
int getPartitionId() {
return partitionId;
}
- byte[] getRawMessage() {
- return rawMessage;
+ EventWrapper getEventWrapper() {
+ return eventWrapper;
}
}
}

0 comments on commit 49dc481

Please sign in to comment.
Something went wrong with that request. Please try again.