Skip to content

Commit

Permalink
refactor: print response body via toString method
Browse files Browse the repository at this point in the history
  • Loading branch information
mayur.gubrele committed Aug 26, 2021
1 parent 4b60fb1 commit 017d027
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 28 deletions.
12 changes: 3 additions & 9 deletions src/main/java/io/odpf/firehose/sink/common/AbstractHttpSink.java
Expand Up @@ -7,6 +7,7 @@
import io.odpf.firehose.exception.NeedToRetry;
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.sink.AbstractSink;
import io.odpf.firehose.sink.http.SerializableHttpResponse;
import joptsimple.internal.Strings;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
Expand Down Expand Up @@ -49,7 +50,8 @@ public List<Message> execute() throws Exception {
response = httpClient.execute(httpRequest);
getInstrumentation().logInfo("Response Status: {}", statusCode(response));
if (shouldLogResponse(response)) {
printResponse(response);
SerializableHttpResponse serializableHttpResponse = new SerializableHttpResponse(response);
getInstrumentation().logDebug("Response Body: {}", serializableHttpResponse);
}
if (shouldLogRequest(response)) {
printRequest(httpRequest);
Expand Down Expand Up @@ -125,14 +127,6 @@ private void printRequest(HttpEntityEnclosingRequestBase httpRequest) throws IOE
inputStream.reset();
}

private void printResponse(HttpResponse response) throws IOException {
InputStream inputStream = response.getEntity().getContent();
String entireRequest = String.format("Response Body: %s",
Strings.join(readContent(inputStream), "\n"));
getInstrumentation().logDebug(entireRequest);
inputStream.reset();
}

protected abstract List<String> readContent(InputStream inputStream) throws IOException;

protected abstract void captureMessageDropCount(HttpResponse response, HttpEntityEnclosingRequestBase httpRequest) throws IOException;
Expand Down
@@ -0,0 +1,36 @@
package io.odpf.firehose.sink.http;

import org.apache.http.HttpResponse;
import org.apache.logging.log4j.util.Strings;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

public class SerializableHttpResponse implements Serializable {
private HttpResponse httpResponse;

public SerializableHttpResponse(HttpResponse httpResponse) {
this.httpResponse = httpResponse;
}

@Override
public String toString() {
InputStream inputStream = null;
try {
inputStream = httpResponse.getEntity().getContent();
} catch (IOException e) {
e.printStackTrace();
}
return Strings.join(readContent(inputStream), '\n');
}

private List<String> readContent(InputStream inputStream) {
return new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).lines().collect(Collectors.toList());
}
}
3 changes: 1 addition & 2 deletions src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java
Expand Up @@ -423,13 +423,12 @@ public void shouldLogResponseBodyInCaseOfNonNullResponse() throws Exception {
when(request.build(messages)).thenReturn(httpRequests);
when(httpClient.execute(httpPut)).thenReturn(response);
when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")});
when(response.getEntity()).thenReturn(httpEntity);

HttpSink httpSink = new HttpSink(instrumentation, request, httpClient, stencilClient,
retryStatusCodeRange, requestLogStatusCodeRanges);
httpSink.prepare(messages);
httpSink.execute();
verify(instrumentation, times(1)).logDebug(
"Response Body: [{\"key\":\"value1\"},{\"key\":\"value2\"}]");
eq("Response Body: {}"), any(SerializableHttpResponse.class));
}
}
@@ -0,0 +1,39 @@
package io.odpf.firehose.sink.http;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.tools.ant.filters.StringInputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.io.IOException;

import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class SerializableHttpResponseTest {

@Mock
private HttpResponse httpResponse;
@Mock
private HttpEntity httpEntity;
private SerializableHttpResponse serializableHttpResponse;

@Before
public void setUp() throws Exception {
serializableHttpResponse = new SerializableHttpResponse(httpResponse);
}

@Test
public void shouldReturnTheResponseBodyStringInCaseOfNonNullResponse() throws IOException {
when(httpResponse.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]"));

String responseBody = serializableHttpResponse.toString();
Assert.assertEquals("[{\"key\":\"value1\"}, {\"key\":\"value2\"}]", responseBody);
}
}
17 changes: 0 additions & 17 deletions src/test/java/io/odpf/firehose/sink/prometheus/PromSinkTest.java
Expand Up @@ -92,8 +92,6 @@ public void shouldPrepareRequestDuringPreparationAndCallItDuringExecution() thro
when(statusLine.getStatusCode()).thenReturn(200);
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
promSink.prepare(messages);
Expand All @@ -110,8 +108,6 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws
when(httpPost.getURI()).thenReturn(new URI("http://dummy.com"));
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges);
Expand Down Expand Up @@ -181,7 +177,6 @@ public void shouldLogEntireRequestIfInStatusCodeRangeAndCaptureDroppedMessages()
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-505"));
Expand All @@ -207,7 +202,6 @@ public void shouldNotLogEntireRequestIfNotInStatusCodeRange() throws Exception {
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
retryStatusCodeRange, new RangeToHashMapConverter().convert(null, "400-499"));
Expand All @@ -229,7 +223,6 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-499"), requestLogStatusCodeRanges);
Expand All @@ -246,8 +239,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws E
when(httpPost.getURI()).thenReturn(new URI("http://dummy.com"));
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-600"), requestLogStatusCodeRanges);
Expand All @@ -263,8 +254,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs200() throws Exc
when(httpPost.getURI()).thenReturn(new URI("http://dummy.com"));
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
retryStatusCodeRange, requestLogStatusCodeRanges);
Expand All @@ -281,8 +270,6 @@ public void shouldNotCaptureDroppedMessagesMetricsIfStatusCodeIs201() throws Exc
when(httpPost.getURI()).thenReturn(new URI("http://dummy.com"));
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
retryStatusCodeRange, requestLogStatusCodeRanges);
Expand All @@ -300,8 +287,6 @@ public void shouldCaptureResponseStatusCount() throws Exception {
when(httpPost.getURI()).thenReturn(uri);
when(request.build(messages)).thenReturn(httpPostList);
when(httpClient.execute(httpPost)).thenReturn(response);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray())));

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
retryStatusCodeRange, requestLogStatusCodeRanges);
Expand All @@ -315,8 +300,6 @@ public void shouldCaptureResponseStatusCount() throws Exception {
public void shouldReadSnappyCompressedContent() throws Exception {
String body = "[timeseries {\n labels {\n name: \"__name__\"\n value: \"test_metric\"\n }\n samples {\n value: 10.0\n timestamp_ms: 1000000\n }\n}\n]";
InputStream inputStream = new ByteArrayInputStream(Snappy.compress(writeRequest.toByteArray()));
when(httpPost.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(inputStream);

PromSink promSink = new PromSink(instrumentation, request, httpClient, stencilClient,
retryStatusCodeRange, requestLogStatusCodeRanges);
Expand Down

0 comments on commit 017d027

Please sign in to comment.