From 8448f6fcde6301899a6c65ab8bbfb14f00eef124 Mon Sep 17 00:00:00 2001 From: "mayur.gubrele" <2310-mayur.gubrele@users.noreply.source.golabs.io> Date: Thu, 19 Aug 2021 15:07:51 +0530 Subject: [PATCH 1/6] feat: log http response body when log_level equals debug --- .../sink/common/AbstractHttpSink.java | 7 ++++++ .../odpf/firehose/sink/http/HttpSinkTest.java | 25 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java index 25f4ee618..08cc20ac4 100644 --- a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java @@ -49,6 +49,9 @@ public List execute() throws Exception { response = httpClient.execute(httpRequest); List contentStringList = null; getInstrumentation().logInfo("Response Status: {}", statusCode(response)); + if (shouldLogResponse(response)) { + printResponse(response); + } if (shouldLogRequest(response)) { contentStringList = readContent(httpRequest); printRequest(httpRequest, contentStringList); @@ -88,6 +91,10 @@ private boolean shouldLogRequest(HttpResponse response) { return response == null || getRequestLogStatusCodeRanges().containsKey(response.getStatusLine().getStatusCode()); } + private boolean shouldLogResponse(HttpResponse response) { + return response != null; + } + private boolean shouldRetry(HttpResponse response) { return response == null || getRetryStatusCodeRanges().containsKey(response.getStatusLine().getStatusCode()); } diff --git a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java index a1ef684ca..64060b8e8 100644 --- a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java @@ -407,4 +407,29 @@ public void shouldCaptureResponseStatusCount() throws Exception { verify(instrumentation, times(1)).captureCountWithTags("firehose_sink_http_response_code_total", 1, "status_code=" + statusLine.getStatusCode(), "url=" + uri.getPath()); } + + @Test + public void shouldLogResponseBodyInCaseOfNonNullResponse() throws Exception { + when(response.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(200); + + List httpRequests = Collections.singletonList(httpPut); + + when(httpPut.getMethod()).thenReturn("PUT"); + when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); + when(httpPut.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); + when(httpPut.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"},{\"key\":\"value2\"}]")); + when(request.build(messages)).thenReturn(httpRequests); + when(httpClient.execute(httpPut)).thenReturn(response); + when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); + when(response.getEntity()).thenReturn(httpEntity); + + HttpSink httpSink = new HttpSink(instrumentation, request, httpClient, stencilClient, + retryStatusCodeRange, requestLogStatusCodeRanges); + httpSink.prepare(messages); + httpSink.execute(); + verify(instrumentation, times(1)).logDebug( + "Response Body: {}", "[{\"key\":\"value1\"},{\"key\":\"value2\"}]"); + } } From ec33d83a88c9a39707b38d7aa4a1acad7cdfd111 Mon Sep 17 00:00:00 2001 From: "mayur.gubrele" <2310-mayur.gubrele@users.noreply.source.golabs.io> Date: Mon, 23 Aug 2021 13:18:46 +0530 Subject: [PATCH 2/6] refactor: reset input stream after logging response --- .../io/odpf/firehose/sink/common/AbstractHttpSink.java | 8 ++++++++ .../java/io/odpf/firehose/sink/http/HttpSinkTest.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java index 08cc20ac4..8e961e6fd 100644 --- a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java @@ -126,6 +126,14 @@ private void printRequest(HttpEntityEnclosingRequestBase httpRequest, List readContent(HttpEntityEnclosingRequestBase httpRequest) throws IOException; protected abstract void captureMessageDropCount(HttpResponse response, List contentString) throws IOException; diff --git a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java index 64060b8e8..164baf1e5 100644 --- a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java @@ -430,6 +430,6 @@ public void shouldLogResponseBodyInCaseOfNonNullResponse() throws Exception { httpSink.prepare(messages); httpSink.execute(); verify(instrumentation, times(1)).logDebug( - "Response Body: {}", "[{\"key\":\"value1\"},{\"key\":\"value2\"}]"); + "Response Body: [{\"key\":\"value1\"},{\"key\":\"value2\"}]"); } } From 4f38738bde43bfea5e69f19a8d71f49b6a71fbd1 Mon Sep 17 00:00:00 2001 From: "mayur.gubrele" <2310-mayur.gubrele@users.noreply.source.golabs.io> Date: Mon, 23 Aug 2021 14:02:05 +0530 Subject: [PATCH 3/6] refactor: fix prom sink tests by mocking response body --- .../firehose/sink/prometheus/PromSinkTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java b/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java index a4ee14ac3..34667c9db 100644 --- a/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java @@ -93,6 +93,8 @@ public void shouldPrepareRequestDuringPreparationAndCallItDuringExecution() thro when(statusLine.getStatusCode()).thenReturn(200); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); promSink.prepare(messages); @@ -109,6 +111,8 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); @@ -178,6 +182,7 @@ public void shouldLogEntireRequestIfInStatusCodeRangeAndCaptureDroppedMessages() when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-505")); @@ -203,6 +208,7 @@ public void shouldNotLogEntireRequestIfNotInStatusCodeRange() throws Exception { when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-499")); @@ -224,6 +230,7 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-499"), requestLogStatusCodeRanges); @@ -240,6 +247,8 @@ public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws E when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-600"), requestLogStatusCodeRanges); @@ -255,6 +264,8 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs200() throws Exc when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -271,6 +282,8 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs201() throws Exc when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -288,6 +301,8 @@ public void shouldCaptureResponseStatusCount() throws Exception { when(httpPost.getURI()).thenReturn(uri); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); From 71994b47be9c8dcb6b8a299fd2b95c36a909cd32 Mon Sep 17 00:00:00 2001 From: "mayur.gubrele" <2310-mayur.gubrele@users.noreply.source.golabs.io> Date: Thu, 26 Aug 2021 16:35:55 +0530 Subject: [PATCH 4/6] refactor: print response body via toString method --- .../sink/common/AbstractHttpSink.java | 12 ++---- .../sink/http/SerializableHttpResponse.java | 36 +++++++++++++++++ .../odpf/firehose/sink/http/HttpSinkTest.java | 3 +- .../http/SerializableHttpResponseTest.java | 39 +++++++++++++++++++ .../sink/prometheus/PromSinkTest.java | 15 ------- 5 files changed, 79 insertions(+), 26 deletions(-) create mode 100644 src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java create mode 100644 src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java diff --git a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java index 8e961e6fd..15756e2c8 100644 --- a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java @@ -8,6 +8,7 @@ import io.odpf.firehose.metrics.Instrumentation; import io.odpf.firehose.sink.AbstractSink; import com.gojek.de.stencil.client.StencilClient; +import io.odpf.firehose.sink.http.SerializableHttpResponse; import joptsimple.internal.Strings; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -50,7 +51,8 @@ public List execute() throws Exception { List contentStringList = null; getInstrumentation().logInfo("Response Status: {}", statusCode(response)); if (shouldLogResponse(response)) { - printResponse(response); + SerializableHttpResponse serializableHttpResponse = new SerializableHttpResponse(response); + getInstrumentation().logDebug("Response Body: {}", serializableHttpResponse); } if (shouldLogRequest(response)) { contentStringList = readContent(httpRequest); @@ -126,14 +128,6 @@ private void printRequest(HttpEntityEnclosingRequestBase httpRequest, List readContent(HttpEntityEnclosingRequestBase httpRequest) throws IOException; protected abstract void captureMessageDropCount(HttpResponse response, List contentString) throws IOException; diff --git a/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java b/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java new file mode 100644 index 000000000..06b43e527 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java @@ -0,0 +1,36 @@ +package io.odpf.firehose.sink.http; + +import org.apache.http.HttpResponse; +import org.apache.logging.log4j.util.Strings; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; + +public class SerializableHttpResponse implements Serializable { + private HttpResponse httpResponse; + + public SerializableHttpResponse(HttpResponse httpResponse) { + this.httpResponse = httpResponse; + } + + @Override + public String toString() { + InputStream inputStream = null; + try { + inputStream = httpResponse.getEntity().getContent(); + } catch (IOException e) { + e.printStackTrace(); + } + return Strings.join(readContent(inputStream), '\n'); + } + + private List readContent(InputStream inputStream) { + return new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).lines().collect(Collectors.toList()); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java index 164baf1e5..4961031c2 100644 --- a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java @@ -423,13 +423,12 @@ public void shouldLogResponseBodyInCaseOfNonNullResponse() throws Exception { when(request.build(messages)).thenReturn(httpRequests); when(httpClient.execute(httpPut)).thenReturn(response); when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); - when(response.getEntity()).thenReturn(httpEntity); HttpSink httpSink = new HttpSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); httpSink.execute(); verify(instrumentation, times(1)).logDebug( - "Response Body: [{\"key\":\"value1\"},{\"key\":\"value2\"}]"); + eq("Response Body: {}"), any(SerializableHttpResponse.class)); } } diff --git a/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java b/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java new file mode 100644 index 000000000..12b38e161 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java @@ -0,0 +1,39 @@ +package io.odpf.firehose.sink.http; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.tools.ant.filters.StringInputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SerializableHttpResponseTest { + + @Mock + private HttpResponse httpResponse; + @Mock + private HttpEntity httpEntity; + private SerializableHttpResponse serializableHttpResponse; + + @Before + public void setUp() throws Exception { + serializableHttpResponse = new SerializableHttpResponse(httpResponse); + } + + @Test + public void shouldReturnTheResponseBodyStringInCaseOfNonNullResponse() throws IOException { + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]")); + + String responseBody = serializableHttpResponse.toString(); + Assert.assertEquals("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]", responseBody); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java b/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java index 34667c9db..a4ee14ac3 100644 --- a/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java @@ -93,8 +93,6 @@ public void shouldPrepareRequestDuringPreparationAndCallItDuringExecution() thro when(statusLine.getStatusCode()).thenReturn(200); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); promSink.prepare(messages); @@ -111,8 +109,6 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); @@ -182,7 +178,6 @@ public void shouldLogEntireRequestIfInStatusCodeRangeAndCaptureDroppedMessages() when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-505")); @@ -208,7 +203,6 @@ public void shouldNotLogEntireRequestIfNotInStatusCodeRange() throws Exception { when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-499")); @@ -230,7 +224,6 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-499"), requestLogStatusCodeRanges); @@ -247,8 +240,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws E when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-600"), requestLogStatusCodeRanges); @@ -264,8 +255,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs200() throws Exc when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -282,8 +271,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs201() throws Exc when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -301,8 +288,6 @@ public void shouldCaptureResponseStatusCount() throws Exception { when(httpPost.getURI()).thenReturn(uri); when(request.build(messages)).thenReturn(httpPostList); when(httpClient.execute(httpPost)).thenReturn(response); - when(response.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()))); PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); From 6a85e679e92b8ef7b488a45ed519369f2ef474e0 Mon Sep 17 00:00:00 2001 From: "mayur.gubrele" <2310-mayur.gubrele@users.noreply.source.golabs.io> Date: Thu, 9 Sep 2021 16:42:28 +0530 Subject: [PATCH 5/6] feat: log http response body if debug is enabled --- .../firehose/metrics/Instrumentation.java | 3 +++ .../sink/common/AbstractHttpSink.java | 21 +++++++++++++++---- .../odpf/firehose/sink/http/HttpSinkTest.java | 12 +++++------ 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/odpf/firehose/metrics/Instrumentation.java b/src/main/java/io/odpf/firehose/metrics/Instrumentation.java index eea683f92..7c821771a 100644 --- a/src/main/java/io/odpf/firehose/metrics/Instrumentation.java +++ b/src/main/java/io/odpf/firehose/metrics/Instrumentation.java @@ -75,6 +75,9 @@ public void logError(String template, Object... t) { logger.error(template, t); } + public boolean isDebugEnabled() { + return logger.isDebugEnabled(); + } // ============== FILTER MESSAGES ============== /** diff --git a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java index 15756e2c8..a67603015 100644 --- a/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java @@ -8,19 +8,23 @@ import io.odpf.firehose.metrics.Instrumentation; import io.odpf.firehose.sink.AbstractSink; import com.gojek.de.stencil.client.StencilClient; -import io.odpf.firehose.sink.http.SerializableHttpResponse; import joptsimple.internal.Strings; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; import org.apache.http.util.EntityUtils; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static io.odpf.firehose.metrics.Metrics.SINK_HTTP_RESPONSE_CODE_TOTAL; @@ -51,8 +55,7 @@ public List execute() throws Exception { List contentStringList = null; getInstrumentation().logInfo("Response Status: {}", statusCode(response)); if (shouldLogResponse(response)) { - SerializableHttpResponse serializableHttpResponse = new SerializableHttpResponse(response); - getInstrumentation().logDebug("Response Body: {}", serializableHttpResponse); + printResponse(response); } if (shouldLogRequest(response)) { contentStringList = readContent(httpRequest); @@ -94,7 +97,7 @@ private boolean shouldLogRequest(HttpResponse response) { } private boolean shouldLogResponse(HttpResponse response) { - return response != null; + return getInstrumentation().isDebugEnabled() && response != null; } private boolean shouldRetry(HttpResponse response) { @@ -128,6 +131,16 @@ private void printRequest(HttpEntityEnclosingRequestBase httpRequest, List readContent(HttpEntityEnclosingRequestBase httpRequest) throws IOException; protected abstract void captureMessageDropCount(HttpResponse response, List contentString) throws IOException; diff --git a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java index 4961031c2..7843edebc 100644 --- a/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java @@ -409,7 +409,7 @@ public void shouldCaptureResponseStatusCount() throws Exception { } @Test - public void shouldLogResponseBodyInCaseOfNonNullResponse() throws Exception { + public void shouldLogResponseBodyWhenDebugIsEnabledAndNonNullResponse() throws Exception { when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(200); @@ -417,18 +417,16 @@ public void shouldLogResponseBodyInCaseOfNonNullResponse() throws Exception { when(httpPut.getMethod()).thenReturn("PUT"); when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); - when(httpPut.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); - when(httpPut.getEntity()).thenReturn(httpEntity); + when(httpClient.execute(httpPut)).thenReturn(response); + when(response.getEntity()).thenReturn(httpEntity); when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"},{\"key\":\"value2\"}]")); when(request.build(messages)).thenReturn(httpRequests); - when(httpClient.execute(httpPut)).thenReturn(response); - when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); + when(instrumentation.isDebugEnabled()).thenReturn(true); HttpSink httpSink = new HttpSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); httpSink.execute(); - verify(instrumentation, times(1)).logDebug( - eq("Response Body: {}"), any(SerializableHttpResponse.class)); + verify(instrumentation, times(1)).logDebug("Response Body: [{\"key\":\"value1\"},{\"key\":\"value2\"}]"); } } From e6d394e099cb0770d48b2348fa46ef1ba3a2a6da Mon Sep 17 00:00:00 2001 From: "mayur.gubrele" <2310-mayur.gubrele@users.noreply.source.golabs.io> Date: Thu, 9 Sep 2021 16:46:05 +0530 Subject: [PATCH 6/6] refactor: remove serializableHttpResponse class --- .../sink/http/SerializableHttpResponse.java | 36 ----------------- .../http/SerializableHttpResponseTest.java | 39 ------------------- 2 files changed, 75 deletions(-) delete mode 100644 src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java delete mode 100644 src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java diff --git a/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java b/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java deleted file mode 100644 index 06b43e527..000000000 --- a/src/main/java/io/odpf/firehose/sink/http/SerializableHttpResponse.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.odpf.firehose.sink.http; - -import org.apache.http.HttpResponse; -import org.apache.logging.log4j.util.Strings; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.stream.Collectors; - -public class SerializableHttpResponse implements Serializable { - private HttpResponse httpResponse; - - public SerializableHttpResponse(HttpResponse httpResponse) { - this.httpResponse = httpResponse; - } - - @Override - public String toString() { - InputStream inputStream = null; - try { - inputStream = httpResponse.getEntity().getContent(); - } catch (IOException e) { - e.printStackTrace(); - } - return Strings.join(readContent(inputStream), '\n'); - } - - private List readContent(InputStream inputStream) { - return new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).lines().collect(Collectors.toList()); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java b/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java deleted file mode 100644 index 12b38e161..000000000 --- a/src/test/java/io/odpf/firehose/sink/http/SerializableHttpResponseTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.odpf.firehose.sink.http; - -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.tools.ant.filters.StringInputStream; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -import java.io.IOException; - -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableHttpResponseTest { - - @Mock - private HttpResponse httpResponse; - @Mock - private HttpEntity httpEntity; - private SerializableHttpResponse serializableHttpResponse; - - @Before - public void setUp() throws Exception { - serializableHttpResponse = new SerializableHttpResponse(httpResponse); - } - - @Test - public void shouldReturnTheResponseBodyStringInCaseOfNonNullResponse() throws IOException { - when(httpResponse.getEntity()).thenReturn(httpEntity); - when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]")); - - String responseBody = serializableHttpResponse.toString(); - Assert.assertEquals("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]", responseBody); - } -}