Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rest] stream json without starting a new thread #4136

Merged
merged 1 commit into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;

import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -79,15 +81,15 @@ public void beforeEach() {
public void shouldReturnAllConfigDescriptions() throws IOException {
Response response = resource.getAll(null, null);
assertThat(response.getStatus(), is(200));
assertThat(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8), is(
assertThat(toString(response.getEntity()), is(
"[{\"uri\":\"system:i18n\",\"parameters\":[{\"default\":\"test\",\"name\":\"name\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]},{\"uri\":\"system:ephemeris\",\"parameters\":[{\"name\":\"country\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]}]"));
}

@Test
public void shouldReturnAConfigDescription() throws IOException {
Response response = resource.getByURI(null, CONFIG_DESCRIPTION_SYSTEM_I18N_URI);
assertThat(response.getStatus(), is(200));
assertThat(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8), is(
assertThat(toString(response.getEntity()), is(
"{\"uri\":\"system:i18n\",\"parameters\":[{\"default\":\"test\",\"name\":\"name\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]}"));
}

Expand All @@ -96,4 +98,17 @@ public void shouldReturnStatus404() {
Response response = resource.getByURI(null, "uri:invalid");
assertThat(response.getStatus(), is(404));
}

public String toString(Object entity) throws IOException {
byte[] bytes;
if (entity instanceof StreamingOutput streaming) {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
streaming.write(buffer);
bytes = buffer.toByteArray();
}
} else {
bytes = ((InputStream) entity).readAllBytes();
}
return new String(bytes, StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.StreamingOutput;

import org.openhab.core.library.types.DateTimeType;
import org.slf4j.Logger;
Expand All @@ -31,7 +32,6 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.stream.JsonWriter;

Expand All @@ -40,6 +40,7 @@
*
* @author Joerg Plewe - Initial contribution
* @author Henning Treu - Provide streaming capabilities
* @author Jörg Sautter - Improve streaming capabilities
*/
public class JSONResponse {

Expand Down Expand Up @@ -152,33 +153,15 @@ private Response createResponse(Response.StatusType status, final Object entity)
return rp.build();
}

// The PipedOutputStream will only be closed by the writing thread
// since closing it during this method call would be too early.
// The receiver of the response will read from the pipe after this method returns.
PipedOutputStream out = new PipedOutputStream();

try {
// we will not actively close the PipedInputStream since it is read by the receiving end
// and will be GC'ed once the response is consumed.
PipedJSONInputStream in = new PipedJSONInputStream(out);
rp.entity(in);
} catch (IOException e) {
throw new IllegalStateException(e);
}
rp.entity((StreamingOutput) (target) -> {
// target must not be closed, see javadoc of javax.ws.rs.ext.MessageBodyWriter
JsonWriter jsonWriter = new JsonWriter(
new BufferedWriter(new OutputStreamWriter(target, StandardCharsets.UTF_8)));

Thread writerThread = new Thread(() -> {
try (JsonWriter jsonWriter = new JsonWriter(
new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)))) {
gson.toJson(entity, entity.getClass(), jsonWriter);
jsonWriter.flush();
} catch (IOException | JsonIOException e) {
logger.debug("Error streaming JSON through PipedInputStream / PipedOutputStream.", e);
}
gson.toJson(entity, entity.getClass(), jsonWriter);
jsonWriter.flush();
});

writerThread.setDaemon(true); // daemonize thread to permit the JVM shutdown even if we stream JSON.
writerThread.start();

return rp.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import static org.hamcrest.object.IsCompatibleType.typeCompatibleWith;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -27,6 +27,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -92,12 +93,11 @@ public void shouldCreateSuccessResponseWithStreamEntity() throws IOException {
assertThat(response.getMediaType(), is(MediaType.APPLICATION_JSON_TYPE));

Object entity = response.getEntity();
assertThat(entity.getClass(), is(typeCompatibleWith(InputStream.class)));
assertThat(entity.getClass(), is(typeCompatibleWith(StreamingOutput.class)));

try (InputStream entityInStream = (InputStream) entity) {
byte[] entityValue = new byte[ENTITY_JSON_VALUE.length()];
entityInStream.read(entityValue);
assertThat(new String(entityValue), is(ENTITY_JSON_VALUE));
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
((StreamingOutput) entity).write(buffer);
assertThat(new String(buffer.toByteArray(), StandardCharsets.UTF_8), is(ENTITY_JSON_VALUE));
}
}

Expand All @@ -120,10 +120,11 @@ public void shouldCreateSuccessResponseWithLargeStreamEntity() throws IOExceptio
assertThat(response.getMediaType(), is(MediaType.APPLICATION_JSON_TYPE));

Object entity = response.getEntity();
assertThat(entity.getClass(), is(typeCompatibleWith(InputStream.class)));
assertThat(entity.getClass(), is(typeCompatibleWith(StreamingOutput.class)));

try (InputStream entityInStream = (InputStream) entity) {
String largeEntityJSON = new String(entityInStream.readAllBytes(), StandardCharsets.UTF_8);
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
((StreamingOutput) entity).write(buffer);
String largeEntityJSON = new String(buffer.toByteArray(), StandardCharsets.UTF_8);
assertThat(largeEntityJSON, is(notNullValue()));
assertTrue(largeEntityJSON.startsWith("{"));
assertTrue(largeEntityJSON.endsWith("}"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -33,6 +34,7 @@
import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

Expand Down Expand Up @@ -204,8 +206,7 @@ public void shouldIncludeRequestedFieldsOnly() throws Exception {
Response response = itemResource.getItems(uriInfoMock, httpHeadersMock, request, null, null, "MyTag", null,
false, "type,name", false);

JsonElement result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
JsonElement result = JsonParser.parseString(toString(response.getEntity()));
JsonElement expected = JsonParser.parseString("[{type: \"Switch\", name: \"Switch\"}]");
assertEquals(expected, result);
}
Expand All @@ -227,12 +228,12 @@ public void shouldProvideReturnCodesForTagHandling() {
}

private List<String> readItemNamesFromResponse(Response response) throws IOException {
String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
return JsonPath.read(jsonResponse, "$..name");
}

private List<String> readItemLabelsFromResponse(Response response) throws IOException, TransformationException {
String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
return JsonPath.read(jsonResponse, "$..label");
}

Expand All @@ -256,7 +257,7 @@ public void addMultipleItems() throws IOException {
items = itemList.toArray(items);
Response response = itemResource.createOrUpdateItems(items);

String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
List<String> statusCodes = JsonPath.read(jsonResponse, "$..status");

// expect 2x created
Expand All @@ -274,7 +275,7 @@ public void addMultipleItems() throws IOException {
items = itemList.toArray(items);
response = itemResource.createOrUpdateItems(items);

jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
jsonResponse = toString(response.getEntity());
statusCodes = JsonPath.read(jsonResponse, "$..status");

// expect error and updated
Expand Down Expand Up @@ -380,4 +381,17 @@ public void findTagTest(String itemName, String semanticClassName, @Nullable Mat
assertThat(response.getStatus(), is(404));
}
}

public String toString(Object entity) throws IOException {
byte[] bytes;
if (entity instanceof StreamingOutput streaming) {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
streaming.write(buffer);
bytes = buffer.toByteArray();
}
} else {
bytes = ((InputStream) entity).readAllBytes();
}
return new String(bytes, StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -27,6 +28,7 @@

import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

Expand Down Expand Up @@ -148,29 +150,39 @@ public void shouldReturnLink() throws Exception {
public void shouldIncludeEditableFields() throws IOException, JsonSyntaxException {
managedItemChannelLinkProvider.add(link1);
Response response = itemChannelLinkResource.getLink(ITEM_NAME1, CHANNEL_UID1);
JsonElement result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
JsonElement result = JsonParser.parseString(toString(response.getEntity()));
JsonElement expected = JsonParser.parseString("{channelUID:\"" + CHANNEL_UID1
+ "\", configuration:{}, editable:true, itemName:\"" + ITEM_NAME1 + "\"}");
assertEquals(expected, result);

response = itemChannelLinkResource.getAll(CHANNEL_UID1, ITEM_NAME1);
result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
result = JsonParser.parseString(toString(response.getEntity()));
expected = JsonParser.parseString("[{channelUID:\"" + CHANNEL_UID1
+ "\", configuration:{}, editable:true, itemName:\"" + ITEM_NAME1 + "\"}]");
assertEquals(expected, result);

response = itemChannelLinkResource.getLink(ITEM_NAME2, CHANNEL_UID2);
result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
result = JsonParser.parseString(toString(response.getEntity()));
expected = JsonParser.parseString("{channelUID:\"" + CHANNEL_UID2
+ "\", configuration:{}, editable:false, itemName:\"" + ITEM_NAME2 + "\", configuration:{}}");
assertEquals(expected, result);
}

private List<String> readItemNamesFromResponse(Response response) throws IOException {
String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
return JsonPath.read(jsonResponse, "$..itemName");
}

public String toString(Object entity) throws IOException {
byte[] bytes;
if (entity instanceof StreamingOutput streaming) {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
streaming.write(buffer);
bytes = buffer.toByteArray();
}
} else {
bytes = ((InputStream) entity).readAllBytes();
}
return new String(bytes, StandardCharsets.UTF_8);
}
}