Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/com/stackify/api/common/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class HttpClient {
/**
* READ_TIMEOUT
*/
private static final int READ_TIMEOUT = 5000;
private static final int READ_TIMEOUT = 15000;

/**
* API configuration
Expand Down
60 changes: 52 additions & 8 deletions src/main/java/com/stackify/api/common/http/HttpResendQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ public class HttpResendQueue {
*/
private static final Logger LOGGER = LoggerFactory.getLogger(HttpResendQueue.class);

/**
* Try posting message 3 times before skipping it
*/
private static final int MAX_POST_ATTEMPTS = 3;

/**
* The queue of requests to be retransmitted
*/
private final Queue<byte[]> resendQueue;
private final Queue<HttpResendQueueItem> resendQueue;

/**
* Constructor
* @param maxSize Maximum size of the queue
*/
public HttpResendQueue(final int maxSize) {
this.resendQueue = new SynchronizedEvictingQueue<byte[]>(maxSize);
this.resendQueue = new SynchronizedEvictingQueue<HttpResendQueueItem>(maxSize);
}

/**
Expand All @@ -62,7 +67,7 @@ public int size() {
* @param e IOException
*/
public void offer(final byte[] request, final IOException e) {
resendQueue.offer(request);
resendQueue.offer(new HttpResendQueueItem(request));
}

/**
Expand All @@ -72,7 +77,7 @@ public void offer(final byte[] request, final IOException e) {
*/
public void offer(final byte[] request, final HttpException e) {
if (!e.isClientError()) {
resendQueue.offer(request);
resendQueue.offer(new HttpResendQueueItem(request));
}
}

Expand All @@ -92,15 +97,54 @@ public void drain(final HttpClient httpClient, final String path) {
* @param gzip True if the post should be gzipped, false otherwise
*/
public void drain(final HttpClient httpClient, final String path, final boolean gzip) {

if (!resendQueue.isEmpty()) {

// queued items are available for retransmission

try {
// drain resend queue until empty or first exception

LOGGER.info("Attempting to retransmit {} requests", resendQueue.size());

while (!resendQueue.isEmpty()) {
byte[] jsonBytes = resendQueue.peek();
httpClient.post(path, jsonBytes, gzip);
resendQueue.remove();
Threads.sleepQuietly(250, TimeUnit.MILLISECONDS);

// get next item off queue

HttpResendQueueItem item = resendQueue.peek();

try {

// retransmit queued request

byte[] jsonBytes = item.getJsonBytes();
httpClient.post(path, jsonBytes, gzip);

// retransmission successful
// remove from queue and sleep for 250ms

resendQueue.remove();

Threads.sleepQuietly(250, TimeUnit.MILLISECONDS);

} catch (Throwable t) {

// retransmission failed
// increment the item's counter

item.failed();

// remove it from the queue if we have had MAX_POST_ATTEMPTS (3) failures for the same request

if (MAX_POST_ATTEMPTS <= item.getNumFailures())
{
resendQueue.remove();
}

// rethrow original exception from retransmission

throw t;
}
}
} catch (Throwable t) {
LOGGER.info("Failure retransmitting queued requests", t);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2015 Stackify
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stackify.api.common.http;

/**
* HttpResendQueueItem
* @author Eric Martin
*/
public class HttpResendQueueItem {

/**
* JSON bytes
*/
private final byte[] jsonBytes;

/**
* Number of failures for the item;
*/
private int numFailures;

/**
* Constructor
* @param jsonBytes JSON bytes
*/
public HttpResendQueueItem(final byte[] jsonBytes) {
this.jsonBytes = jsonBytes;
this.numFailures = 1;
}

/**
* @return the jsonBytes
*/
public byte[] getJsonBytes() {
return jsonBytes;
}

/**
* @return the numFailures
*/
public int getNumFailures() {
return numFailures;
}

/**
* Increment the number of failures
*/
public void failed() {
++numFailures;
}
}
7 changes: 6 additions & 1 deletion src/main/java/com/stackify/api/common/log/LogAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
*/
public class LogAppender<T> implements Closeable {

/**
* Internal package prefix
*/
private static final String COM_DOT_STACKIFY = "com.stackify.";

/**
* Logger project name
*/
Expand Down Expand Up @@ -132,7 +137,7 @@ public void append(final T event) {
String className = eventAdapter.getClassName(event);

if (className != null) {
if (className.startsWith("com.stackify.api.")) {
if (className.startsWith(COM_DOT_STACKIFY)) {
return;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/stackify/api/common/log/LogSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public class LogSender {
private final ObjectMapper objectMapper;

/**
* The queue of requests to be retransmitted (max of 100 batches of 100 messages)
* The queue of requests to be retransmitted (max of 20 batches of 100 messages)
*/
private final HttpResendQueue resendQueue = new HttpResendQueue(100);
private final HttpResendQueue resendQueue = new HttpResendQueue(20);

/**
* Default constructor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2015 Stackify
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stackify.api.common.http;

import org.junit.Assert;
import org.junit.Test;

/**
* HttpResendQueueItem JUnit Test
* @author Eric Martin
*/
public class HttpResendQueueItemTest {

/**
* testConstrcutorAndGetters
*/
@Test
public void testConstrcutorAndGetters() {
byte[] jsonBytes = "{\"method\": \"testConstrcutorAndGetters\"}".getBytes();

HttpResendQueueItem item = new HttpResendQueueItem(jsonBytes);
Assert.assertNotNull(item);

Assert.assertEquals(jsonBytes, item.getJsonBytes());
Assert.assertEquals(1, item.getNumFailures());
}

/**
* testIncrementFailures
*/
@Test
public void testIncrementFailures() {
byte[] jsonBytes = "{\"method\": \"testIncrementRetries\"}".getBytes();

HttpResendQueueItem item = new HttpResendQueueItem(jsonBytes);
Assert.assertEquals(1, item.getNumFailures());

item.failed();
Assert.assertEquals(2, item.getNumFailures());

item.failed();
Assert.assertEquals(3, item.getNumFailures());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,27 @@ public void testDrainWithException() throws Exception {

Assert.assertEquals(0, resendQueue.size());
}

/**
* testDrainWithExceptionAndSkip
* @throws Exception
*/
@Test
public void testDrainWithExceptionAndSkip() throws Exception {
byte[] request1 = new byte[]{1};

HttpResendQueue resendQueue = new HttpResendQueue(3);
resendQueue.offer(request1, new IOException());

Assert.assertEquals(1, resendQueue.size());

HttpClient httpClient = Mockito.mock(HttpClient.class);
Mockito.when(httpClient.post("/path", request1, false)).thenThrow(new RuntimeException());

resendQueue.drain(httpClient, "/path");
Assert.assertEquals(1, resendQueue.size());

resendQueue.drain(httpClient, "/path");
Assert.assertEquals(0, resendQueue.size());
}
}