Skip to content

Commit

Permalink
Fixed send retry policy in TCPDataSender #2198
Browse files Browse the repository at this point in the history
1. Fixed retry policy in TCPDataSender is not working properly.
2. Changed log level
  • Loading branch information
koo-taejin authored and emeroad committed Oct 28, 2016
1 parent 9471f72 commit 702de7a
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,80 @@

package com.navercorp.pinpoint.profiler.sender;

import org.apache.commons.lang.StringUtils;

/**
* @author emeroad
*/
public class RetryMessage {
private int retryCount;
private byte[] bytes;

public RetryMessage(int retryCount, byte[] bytes) {
private int retryCount = 0;
private final int maxRetryCount;

private final byte[] bytes;
private final String messageDescription;

public RetryMessage(int maxRetryCount, byte[] bytes) {
this(0, maxRetryCount, bytes, "");
}

public RetryMessage(int retryCount, int maxRetryCount, byte[] bytes) {
this(retryCount, maxRetryCount, bytes, "");
}

public RetryMessage(int maxRetryCount, byte[] bytes, String messageDescription) {
this(0, maxRetryCount, bytes, messageDescription);
}

public RetryMessage(int retryCount, int maxRetryCount, byte[] bytes, String messageDescription) {
if (retryCount < 0) {
throw new IllegalArgumentException("retryCount:" + retryCount + " must be positive number");
}
if (maxRetryCount < 0) {
throw new IllegalArgumentException("maxRetryCount:" + maxRetryCount + " must be positive number");
}
if (retryCount > maxRetryCount) {
throw new IllegalArgumentException("maxRetryCount(" + maxRetryCount + ") must be greater than retryCount(" + retryCount + ")");
}

this.retryCount = retryCount;
this.maxRetryCount = maxRetryCount;
this.bytes = bytes;
this.messageDescription = messageDescription;
}

public int getRetryCount() {
return retryCount;
}

public int getMaxRetryCount() {
return maxRetryCount;
}

public boolean isRetryAvailable() {
return retryCount < maxRetryCount;
}

public byte[] getBytes() {
return bytes;
}

public int fail() {
return ++retryCount;
}

@Override
public String toString() {
StringBuffer toString = new StringBuffer();
toString.append("RetryMessage{");
if (!StringUtils.isEmpty(messageDescription)) {
toString.append("message:" + messageDescription + ", ");
}
toString.append("size=" + bytes.length + ", ");
toString.append("retry=" + retryCount + "/" + maxRetryCount);
toString.append("}");

return toString.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public class RetryQueue {
// But PriorityQueue of JDK has no size limit, so let's do it without priority for now.
private final BlockingQueue<RetryMessage> queue;
private final int capacity;
private final int maxRetry;
private final int maxRetryCount;
private final int halfCapacity;


public RetryQueue(int capacity, int maxRetry) {
public RetryQueue(int capacity, int maxRetryCount) {
this.queue = new LinkedBlockingQueue<RetryMessage>();
this.capacity = capacity;
this.halfCapacity = capacity / 2;
this.maxRetry = maxRetry;
this.maxRetryCount = maxRetryCount;
}

public RetryQueue() {
Expand All @@ -51,9 +51,13 @@ public void add(RetryMessage retryMessage) {
throw new NullPointerException("retryMessage must not be null");
}

final int retryCount = retryMessage.getRetryCount();
if (retryCount >= this.maxRetry) {
logger.warn("discard retry message. retryCount:{}", retryCount);
if (!retryMessage.isRetryAvailable()) {
logger.warn("discard retry message({}).", retryMessage);
return;
}
int retryCount = retryMessage.getRetryCount();
if (retryCount >= this.maxRetryCount) {
logger.warn("discard retry message({}). queue-maxRetryCount:{}", retryMessage, maxRetryCount);
return;
}
final int queueSize = queue.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@
package com.navercorp.pinpoint.profiler.sender;


import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.thrift.TBase;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;
Expand All @@ -28,18 +41,6 @@
import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializerFactory;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializerFactory;
import org.apache.thrift.TBase;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author emeroad
Expand Down Expand Up @@ -142,7 +143,7 @@ protected void sendPacket(Object message) {
if (copy == null) {
return;
}

if (futureListener != null) {
doRequest(copy, futureListener);
} else {
Expand All @@ -162,7 +163,8 @@ private void doSend(byte[] copy) {
write.setListener(writeFailFutureListener);
}

private void doRequest(final byte[] requestPacket, final int retryCount, final Object targetClass) {
// Separate doRequest method to avoid creating unnecessary objects. (Generally, sending message is successed when firt attempt.)
private void doRequest(final byte[] requestPacket, final int maxRetryCount, final Object targetClass) {
FutureListener futureListener = (new FutureListener<ResponseMessage>() {
@Override
public void onComplete(Future<ResponseMessage> future) {
Expand All @@ -175,27 +177,61 @@ public void onComplete(Future<ResponseMessage> future) {
if (result.isSuccess()) {
logger.debug("result success");
} else {
logger.warn("request fail. clazz:{} Caused:{}", targetClass, result.getMessage());
retryRequest(requestPacket, retryCount, targetClass.getClass().getSimpleName());
logger.info("request fail. request:{} Caused:{}", targetClass, result.getMessage());
RetryMessage retryMessage = new RetryMessage(1, maxRetryCount, requestPacket, targetClass.getClass().getSimpleName());
retryRequest(retryMessage);
}
} else {
logger.warn("Invalid ResponseMessage. {}", response);
logger.warn("Invalid respose:{}", response);
// This is not retransmission. need to log for debugging
// it could be null
// retryRequest(requestPacket);
}
} else {
logger.warn("request fail. clazz:{} Caused:{}", targetClass, future.getCause().getMessage(), future.getCause());
retryRequest(requestPacket, retryCount, targetClass.getClass().getSimpleName());
logger.info("request fail. request:{} Caused:{}", targetClass, future.getCause().getMessage(), future.getCause());
RetryMessage retryMessage = new RetryMessage(1, maxRetryCount, requestPacket, targetClass.getClass().getSimpleName());
retryRequest(retryMessage);
}
}
});

doRequest(requestPacket, futureListener);
}

private void retryRequest(byte[] requestPacket, int retryCount, final String className) {
RetryMessage retryMessage = new RetryMessage(retryCount, requestPacket);
// Separate doRequest method to avoid creating unnecessary objects. (Generally, sending message is successed when firt attempt.)
private void doRequest(final RetryMessage retryMessage) {
FutureListener futureListener = (new FutureListener<ResponseMessage>() {
@Override
public void onComplete(Future<ResponseMessage> future) {
if (future.isSuccess()) {
// Should cache?
HeaderTBaseDeserializer deserializer = HeaderTBaseDeserializerFactory.DEFAULT_FACTORY.createDeserializer();
TBase<?, ?> response = deserialize(deserializer, future.getResult());
if (response instanceof TResult) {
TResult result = (TResult) response;
if (result.isSuccess()) {
logger.debug("result success");
} else {
logger.info("request fail. request:{}, Caused:{}", retryMessage, result.getMessage());
retryRequest(retryMessage);
}
} else {
logger.warn("Invalid response:{}", response);
// This is not retransmission. need to log for debugging
// it could be null
// retryRequest(requestPacket);
}
} else {
logger.info("request fail. request:{}, caused:{}", retryMessage, future.getCause().getMessage(), future.getCause());
retryRequest(retryMessage);
}
}
});

doRequest(retryMessage.getBytes(), futureListener);
}

private void retryRequest(RetryMessage retryMessage) {
retryQueue.add(retryMessage);
if (fireTimeout()) {
timer.newTimeout(new TimerTask() {
Expand All @@ -209,7 +245,7 @@ public void run(Timeout timeout) throws Exception {
return;
}
int fail = retryMessage.fail();
doRequest(retryMessage.getBytes(), fail, className);
doRequest(retryMessage);
}
}
}, 1000 * 10, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2016 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.profiler.sender;

import org.junit.Assert;
import org.junit.Test;

/**
* @author Taejin Koo
*/
public class RetryMessageTest {

@Test
public void availableTest1() throws Exception {
RetryMessage retryMessage = new RetryMessage(1, new byte[0]);
Assert.assertTrue(retryMessage.isRetryAvailable());

retryMessage.fail();
Assert.assertFalse(retryMessage.isRetryAvailable());
}

@Test
public void availableTest2() throws Exception {
RetryMessage retryMessage = new RetryMessage(1, 2, new byte[0]);
Assert.assertTrue(retryMessage.isRetryAvailable());

retryMessage.fail();
Assert.assertFalse(retryMessage.isRetryAvailable());
}

@Test
public void availableTest3() throws Exception {
RetryMessage retryMessage = new RetryMessage(2, 2, new byte[0]);
Assert.assertFalse(retryMessage.isRetryAvailable());
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgumentTest1() {
RetryMessage retryMessage = new RetryMessage(-1, new byte[0]);
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgumentTest2() {
RetryMessage retryMessage = new RetryMessage(-1, 5, new byte[0]);
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgumentTest3() {
RetryMessage retryMessage = new RetryMessage(10, 9, new byte[0]);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,19 @@
package com.navercorp.pinpoint.profiler.sender;

import org.junit.Assert;

import org.junit.Test;

import com.navercorp.pinpoint.profiler.sender.RetryMessage;
import com.navercorp.pinpoint.profiler.sender.RetryQueue;

/**
* @author emeroad
*/
public class RetryQueueTest {
@Test
public void size() {

RetryQueue retryQueue = new RetryQueue(1, 1);
retryQueue.add(new RetryMessage(0, new byte[0]));
retryQueue.add(new RetryMessage(0, new byte[0]));
retryQueue.add(new RetryMessage(1, new byte[0]));
retryQueue.add(new RetryMessage(1, new byte[0]));

Assert.assertEquals(1, retryQueue.size());

}

@Test
Expand All @@ -48,7 +42,6 @@ public void size2() {

@Test
public void maxRetryTest() {

RetryQueue retryQueue = new RetryQueue(3, 2);
RetryMessage retryMessage = new RetryMessage(0, new byte[0]);
retryMessage.fail();
Expand All @@ -62,12 +55,25 @@ public void maxRetryTest() {
}

@Test
public void add() {
public void maxRetryTest2() {
RetryQueue retryQueue = new RetryQueue(3, 1);
RetryMessage retryMessage = new RetryMessage(5, new byte[0]);
retryMessage.fail();
retryMessage.fail();


retryQueue.add(retryMessage);
retryQueue.add(retryMessage);

Assert.assertEquals(retryQueue.size(), 0);
}

@Test
public void add() {
RetryQueue retryQueue = new RetryQueue(3, 2);
retryQueue.add(new RetryMessage(0, new byte[0]));
retryQueue.add(new RetryMessage(1, new byte[0]));
// If we add a failed message and it makes the queue filled more than half, the queue must discard it.
RetryMessage retryMessage = new RetryMessage(0, new byte[0]);
RetryMessage retryMessage = new RetryMessage(1, new byte[0]);
retryMessage.fail();
retryQueue.add(retryMessage);

Expand Down
Loading

0 comments on commit 702de7a

Please sign in to comment.