Skip to content

Commit

Permalink
Added Message.setRollbackOnly() method to rollback message
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Apr 3, 2013
1 parent 0a617eb commit 0f8c7fc
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ private boolean processReceiveMessage(final FetchRequest request, final MessageI
final Message msg = it.next();
MessageAccessor.setPartition(msg, partition);
listener.recieveMessages(msg);
// rollback message if it is in rollback only state.
if (MessageAccessor.isRollbackOnly(msg)) {
it.setOffset(prevOffset);
break;
}
if (partition.isAutoAck()) {
count++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,50 @@ public Executor getExecutor() {
}


@Test
public void testProcessRequestMessageRollbackOnly() throws Exception {
// this.mockConsumerReInitializeFetchManager();
final String topic = "topic1";
final int maxSize = 1024;
final Partition partition = new Partition("0-0");
final long offset = 12;
final Broker broker = new Broker(0, "meta://localhost:0");
final byte[] data =
MessageUtils.makeMessageBuffer(1111,
new PutCommand(topic, partition.getPartition(), "hello".getBytes(), null, 0, 0)).array();
final FetchRequest request =
new FetchRequest(broker, 0, new TopicPartitionRegInfo(topic, partition, offset), maxSize);

final FetchRequestRunner runner = this.fetchManager.new FetchRequestRunner();

EasyMock.expect(this.consumer.fetch(request, -1, null)).andReturn(new MessageIterator(topic, data));
EasyMock.expect(this.consumer.getMessageListener(topic)).andReturn(new MessageListener() {

@Override
public void recieveMessages(final Message message) {
System.out.println("Rollback current message");
message.setRollbackOnly();
}


@Override
public Executor getExecutor() {
return null;
}
});

final FetchRequest newRequest =
new FetchRequest(broker, this.consumerConfig.getMaxDelayFetchTimeInMills() / 10,
new TopicPartitionRegInfo(topic, partition, offset, 1111), maxSize);
newRequest.incrementRetriesAndGet();

EasyMock.replay(this.consumer);
runner.processRequest(request);
EasyMock.verify(this.consumer);
assertEquals(newRequest, this.fetchManager.takeFetchRequest());
}


@Test
public void testProcessRequestDelayed_IncreaseMaxSize() throws Exception {
// this.mockConsumerReInitializeFetchManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class Message implements Serializable {
// added by dennis<killme2008@gmail.com>,2012-06-14
private transient boolean readOnly;

private transient boolean rollbackOnly = false;


void setId(final long id) {
this.id = id;
Expand All @@ -66,6 +68,22 @@ void setFlag(final int flag) {
}


boolean isRollbackOnly() {
return this.rollbackOnly;
}


/**
* Set message to be in rollback only state.The state is transient,it's only
* valid in current message instance.
*
* @since 1.4.5
*/
public void setRollbackOnly() {
this.rollbackOnly = true;
}


/**
* Returns whether the message is readonly.
*
Expand All @@ -79,7 +97,8 @@ public boolean isReadOnly() {

/**
* Set the message to be readonly,but metamorphosis client and server could
* modify message's id,flag,partition.
* modify message's id,flag,partition.The readonly state is transient,it
* will not be persist in broker.
*
* @since 1.4.4
* @param readOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public static void setFlag(Message message, int flag) {
}


public static boolean isRollbackOnly(Message message) {
return message.isRollbackOnly();
}


public static int getFlag(Message message) {
return message.getFlag();
}
Expand Down

0 comments on commit 0f8c7fc

Please sign in to comment.