diff --git a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java index 0dff7ea25..ef434c799 100644 --- a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java @@ -7,6 +7,7 @@ import io.odpf.firehose.exception.NeedToRetry; import io.odpf.firehose.metrics.Instrumentation; import io.odpf.firehose.sink.AbstractSink; +import io.odpf.firehose.sink.http.SerializableHttpResponse; import joptsimple.internal.Strings; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -49,7 +50,8 @@ public List execute() throws Exception { response = httpClient.execute(httpRequest); getInstrumentation().logInfo("Response Status: {}", statusCode(response)); if (shouldLogResponse(response)) { - printResponse(response); + SerializableHttpResponse serializableHttpResponse = new SerializableHttpResponse(response); + getInstrumentation().logDebug("Response Body: {}", serializableHttpResponse); } if (shouldLogRequest(response)) { printRequest(httpRequest); @@ -125,14 +127,6 @@ private void printRequest(HttpEntityEnclosingRequestBase httpRequest) throws IOE inputStream.reset(); } - private void printResponse(HttpResponse response) throws IOException { - InputStream inputStream = response.getEntity().getContent(); - String entireRequest = String.format("Response Body: %s", - Strings.join(readContent(inputStream), "\n")); - getInstrumentation().logDebug(entireRequest); - inputStream.reset(); - } - protected abstract List readContent(InputStream inputStream) throws IOException; protected abstract void captureMessageDropCount(HttpResponse response, HttpEntityEnclosingRequestBase httpRequest) throws IOException; diff --git a/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java b/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java new file mode 100644 index 000000000..06b43e527 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java @@ -0,0 +1,36 @@ +package io.odpf.firehose.sink.http; + +import org.apache.http.HttpResponse; +import org.apache.logging.log4j.util.Strings; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; + +public class SerializableHttpResponse implements Serializable { + private HttpResponse httpResponse; + + public SerializableHttpResponse(HttpResponse httpResponse) { + this.httpResponse = httpResponse; + } + + @Override + public String toString() { + InputStream inputStream = null; + try { + inputStream = httpResponse.getEntity().getContent(); + } catch (IOException e) { + e.printStackTrace(); + } + return Strings.join(readContent(inputStream), '\n'); + } + + private List readContent(InputStream inputStream) { + return new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).lines().collect(Collectors.toList()); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java index bb2425239..30d1b4502 100644 --- a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java @@ -423,13 +423,12 @@ public void shouldLogResponseBodyInCaseOfNonNullResponse() throws Exception { when(request.build(messages)).thenReturn(httpRequests); when(httpClient.execute(httpPut)).thenReturn(response); when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); - when(response.getEntity()).thenReturn(httpEntity); HttpSink httpSink = new HttpSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); httpSink.execute(); verify(instrumentation, times(1)).logDebug( - "Response Body: [{\"key\":\"value1\"},{\"key\":\"value2\"}]"); + eq("Response Body: {}"), any(SerializableHttpResponse.class)); } } diff --git a/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java b/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java new file mode 100644 index 000000000..12b38e161 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java @@ -0,0 +1,39 @@ +package io.odpf.firehose.sink.http; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.tools.ant.filters.StringInputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SerializableHttpResponseTest { + + @Mock + private HttpResponse httpResponse; + @Mock + private HttpEntity httpEntity; + private SerializableHttpResponse serializableHttpResponse; + + @Before + public void setUp() throws Exception { + serializableHttpResponse = new SerializableHttpResponse(httpResponse); + } + + @Test + public void shouldReturnTheResponseBodyStringInCaseOfNonNullResponse() throws IOException { + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]")); + + String responseBody = serializableHttpResponse.toString(); + Assert.assertEquals("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]", responseBody); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java b/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java index 60634d30c..fd9cd7a88 100644 --- a/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java @@ -92,8 +92,6 @@ public void shouldPrepareRequestDuringPreparationAndCallItDuringExecution() thro when(statusLine.getStatusCode()).thenReturn(200); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); promSink.prepare(messages); @@ -110,8 +108,6 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); @@ -181,7 +177,6 @@ public void shouldLogEntireRequestIfInStatusCodeRangeAndCaptureDroppedMessages() when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-505")); @@ -207,7 +202,6 @@ public void shouldNotLogEntireRequestIfNotInStatusCodeRange() throws Exception { when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-499")); @@ -229,7 +223,6 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-499"), requestLogStatusCodeRanges); @@ -246,8 +239,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws E when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-600"), requestLogStatusCodeRanges); @@ -263,8 +254,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs200() throws Exc when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -281,8 +270,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs201() throws Exc when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -300,8 +287,6 @@ public void shouldCaptureResponseStatusCount() throws Exception { when(httpPost.getURI()).thenReturn(uri); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -315,8 +300,6 @@ public void shouldCaptureResponseStatusCount() throws Exception { public void shouldReadSnappyCompressedContent() throws Exception { String body = "[timeseries {\n labels {\n name: \"__name__\"\n value: \"test_metric\"\n }\n samples {\n value: 10.0\n timestamp_ms: 1000000\n }\n}\n]"; InputStream inputStream = new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())); - when(httpPost.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(inputStream); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);