diff --git a/.openapi-generator/FILES b/.openapi-generator/FILES index 9290b751..02b57969 100644 --- a/.openapi-generator/FILES +++ b/.openapi-generator/FILES @@ -97,7 +97,9 @@ docs/WriteAuthorizationModelResponse.md docs/WriteRequest.md docs/WriteRequestDeletes.md docs/WriteRequestWrites.md +src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java src/main/java/dev/openfga/sdk/api/OpenFgaApi.java +src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java src/main/java/dev/openfga/sdk/api/model/AbortedMessageResponse.java src/main/java/dev/openfga/sdk/api/model/AbstractOpenApiSchema.java src/main/java/dev/openfga/sdk/api/model/Any.java @@ -157,6 +159,7 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java src/main/java/dev/openfga/sdk/api/model/SourceInfo.java src/main/java/dev/openfga/sdk/api/model/Status.java src/main/java/dev/openfga/sdk/api/model/Store.java +src/main/java/dev/openfga/sdk/api/model/StreamResult.java src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java src/main/java/dev/openfga/sdk/api/model/Tuple.java diff --git a/examples/streamed-list-objects/Makefile b/examples/streamed-list-objects/Makefile index 5651d28d..ee7a5d6b 100644 --- a/examples/streamed-list-objects/Makefile +++ b/examples/streamed-list-objects/Makefile @@ -6,10 +6,10 @@ openfga_version=latest language=java build: - ./gradlew -P language=$(language) build + ../../gradlew -P language=$(language) build run: - ./gradlew -P language=$(language) run + ../../gradlew -P language=$(language) run run-openfga: docker pull docker.io/openfga/openfga:${openfga_version} && \ diff --git a/src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java b/src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java new file mode 100644 index 00000000..52b25a6d --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java @@ -0,0 +1,182 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api; + +import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.model.Status; +import dev.openfga.sdk.api.model.StreamResult; +import dev.openfga.sdk.errors.ApiException; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** + * Base class for handling streaming API responses. + * This class provides generic streaming functionality that can be reused across + * different streaming endpoints by handling the common streaming parsing and error handling logic. + * + * @param The type of response objects in the stream + */ +public abstract class BaseStreamingApi { + protected final Configuration configuration; + protected final ApiClient apiClient; + protected final ObjectMapper objectMapper; + protected final TypeReference> streamResultTypeRef; + + /** + * Constructor for BaseStreamingApi + * + * @param configuration The API configuration + * @param apiClient The API client for making HTTP requests + * @param streamResultTypeRef TypeReference for deserializing StreamResult + */ + protected BaseStreamingApi( + Configuration configuration, ApiClient apiClient, TypeReference> streamResultTypeRef) { + this.configuration = configuration; + this.apiClient = apiClient; + this.objectMapper = apiClient.getObjectMapper(); + this.streamResultTypeRef = streamResultTypeRef; + } + + /** + * Process a streaming response asynchronously. + * Each line in the response is parsed and delivered to the consumer callback. + * + * @param request The HTTP request to execute + * @param consumer Callback to handle each response object (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @return CompletableFuture that completes when streaming finishes + */ + protected CompletableFuture processStreamingResponse( + HttpRequest request, Consumer consumer, Consumer errorConsumer) { + + // Use async HTTP client with streaming body handler + // ofLines() provides line-by-line streaming + return apiClient + .getHttpClient() + .sendAsync(request, HttpResponse.BodyHandlers.ofLines()) + .thenCompose(response -> { + // Check response status + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + ApiException apiException = + new ApiException(statusCode, "API error: " + statusCode, response.headers(), null); + return CompletableFuture.failedFuture(apiException); + } + + // Process the stream - this runs on HttpClient's executor thread + try (Stream lines = response.body()) { + lines.forEach(line -> { + if (!isNullOrWhitespace(line)) { + processLine(line, consumer, errorConsumer); + } + }); + return CompletableFuture.completedFuture((Void) null); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + }) + .handle((result, throwable) -> { + if (throwable != null) { + // Unwrap CompletionException to get the original exception + Throwable actualException = throwable; + if (throwable instanceof java.util.concurrent.CompletionException + && throwable.getCause() != null) { + actualException = throwable.getCause(); + } + + if (errorConsumer != null) { + errorConsumer.accept(actualException); + } + // Re-throw to keep the CompletableFuture in failed state + if (actualException instanceof RuntimeException) { + throw (RuntimeException) actualException; + } + throw new RuntimeException(actualException); + } + return result; + }); + } + + /** + * Process a single line from the stream + * + * @param line The JSON line to process + * @param consumer Callback to handle the parsed result + * @param errorConsumer Optional callback to handle errors + */ + private void processLine(String line, Consumer consumer, Consumer errorConsumer) { + try { + // Parse the JSON line to extract the object + StreamResult streamResult = objectMapper.readValue(line, streamResultTypeRef); + + if (streamResult.getError() != null) { + // Handle error in stream + if (errorConsumer != null) { + Status error = streamResult.getError(); + String errorMessage = error.getMessage() != null + ? "Stream error: " + error.getMessage() + : "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown"); + errorConsumer.accept(new ApiException(errorMessage)); + } + } else if (streamResult.getResult() != null) { + // Deliver the response object to the consumer + T result = streamResult.getResult(); + if (result != null) { + consumer.accept(result); + } + } + } catch (Exception e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + } + } + + /** + * Build an HTTP request for the streaming endpoint + * + * @param method HTTP method (e.g., "POST") + * @param path The API path + * @param body The request body + * @param configuration The configuration to use + * @return HttpRequest ready to execute + * @throws ApiException if request building fails + * @throws FgaInvalidParameterException if parameters are invalid + */ + protected HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + try { + byte[] bodyBytes = objectMapper.writeValueAsBytes(body); + HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration); + + // Apply request interceptors if any + var interceptor = apiClient.getRequestInterceptor(); + if (interceptor != null) { + interceptor.accept(requestBuilder); + } + + return requestBuilder.build(); + } catch (Exception e) { + throw new ApiException(e); + } + } +} diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java index bf4dfd5a..2421f277 100644 --- a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -1,39 +1,42 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + package dev.openfga.sdk.api; -import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace; import static dev.openfga.sdk.util.Validation.assertParamExists; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; import dev.openfga.sdk.api.client.ApiClient; import dev.openfga.sdk.api.configuration.Configuration; import dev.openfga.sdk.api.configuration.ConfigurationOverride; import dev.openfga.sdk.api.model.ListObjectsRequest; -import dev.openfga.sdk.api.model.Status; -import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; +import dev.openfga.sdk.api.model.StreamResult; import dev.openfga.sdk.api.model.StreamedListObjectsResponse; import dev.openfga.sdk.errors.ApiException; import dev.openfga.sdk.errors.FgaInvalidParameterException; import dev.openfga.sdk.util.StringUtil; import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -import java.util.stream.Stream; /** * API layer for handling streaming responses from the streamedListObjects endpoint. - * This class provides true asynchronous streaming with consumer callbacks using CompletableFuture - * and Java 11's HttpClient async streaming capabilities. + * This class extends BaseStreamingApi to provide true asynchronous streaming with consumer callbacks + * using CompletableFuture and Java 11's HttpClient async streaming capabilities. */ -public class StreamedListObjectsApi { - private final Configuration configuration; - private final ApiClient apiClient; - private final ObjectMapper objectMapper; +public class StreamedListObjectsApi extends BaseStreamingApi { public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) { - this.configuration = configuration; - this.apiClient = apiClient; - this.objectMapper = apiClient.getObjectMapper(); + super(configuration, apiClient, new TypeReference>() {}); } /** @@ -142,54 +145,7 @@ private CompletableFuture streamedListObjects( try { HttpRequest request = buildHttpRequest("POST", path, body, configuration); - - // Use async HTTP client with streaming body handler - // ofLines() provides line-by-line streaming which is perfect for NDJSON - return apiClient - .getHttpClient() - .sendAsync(request, HttpResponse.BodyHandlers.ofLines()) - .thenCompose(response -> { - // Check response status - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - ApiException apiException = - new ApiException(statusCode, "API error: " + statusCode, response.headers(), null); - return CompletableFuture.failedFuture(apiException); - } - - // Process the stream - this runs on HttpClient's executor thread - try (Stream lines = response.body()) { - lines.forEach(line -> { - if (!isNullOrWhitespace(line)) { - processLine(line, consumer, errorConsumer); - } - }); - return CompletableFuture.completedFuture((Void) null); - } catch (Exception e) { - return CompletableFuture.failedFuture(e); - } - }) - .handle((result, throwable) -> { - if (throwable != null) { - // Unwrap CompletionException to get the original exception - Throwable actualException = throwable; - if (throwable instanceof java.util.concurrent.CompletionException - && throwable.getCause() != null) { - actualException = throwable.getCause(); - } - - if (errorConsumer != null) { - errorConsumer.accept(actualException); - } - // Re-throw to keep the CompletableFuture in failed state - if (actualException instanceof RuntimeException) { - throw (RuntimeException) actualException; - } - throw new RuntimeException(actualException); - } - return result; - }); - + return processStreamingResponse(request, consumer, errorConsumer); } catch (Exception e) { if (errorConsumer != null) { errorConsumer.accept(e); @@ -197,55 +153,4 @@ private CompletableFuture streamedListObjects( return CompletableFuture.failedFuture(e); } } - - /** - * Process a single line from the NDJSON stream - */ - private void processLine( - String line, Consumer consumer, Consumer errorConsumer) { - try { - // Parse the JSON line to extract the object - StreamResultOfStreamedListObjectsResponse streamResult = - objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class); - - if (streamResult.getError() != null) { - // Handle error in stream - if (errorConsumer != null) { - Status error = streamResult.getError(); - String errorMessage = error.getMessage() != null - ? "Stream error: " + error.getMessage() - : "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown"); - errorConsumer.accept(new ApiException(errorMessage)); - } - } else if (streamResult.getResult() != null) { - // Deliver the response object to the consumer - StreamedListObjectsResponse result = streamResult.getResult(); - if (result != null) { - consumer.accept(result); - } - } - } catch (Exception e) { - if (errorConsumer != null) { - errorConsumer.accept(e); - } - } - } - - private HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration) - throws ApiException, FgaInvalidParameterException { - try { - byte[] bodyBytes = objectMapper.writeValueAsBytes(body); - HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration); - - // Apply request interceptors if any - var interceptor = apiClient.getRequestInterceptor(); - if (interceptor != null) { - interceptor.accept(requestBuilder); - } - - return requestBuilder.build(); - } catch (Exception e) { - throw new ApiException(e); - } - } } diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamResult.java b/src/main/java/dev/openfga/sdk/api/model/StreamResult.java new file mode 100644 index 00000000..d6e54361 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamResult.java @@ -0,0 +1,118 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; + +/** + * Generic wrapper for streaming results that can contain either a result or an error. + * This class is used to deserialize streaming responses where each line may contain + * either a successful result or an error status. + * + * @param The type of the result object + */ +@JsonPropertyOrder({StreamResult.JSON_PROPERTY_RESULT, StreamResult.JSON_PROPERTY_ERROR}) +public class StreamResult { + public static final String JSON_PROPERTY_RESULT = "result"; + private T result; + + public static final String JSON_PROPERTY_ERROR = "error"; + private Status error; + + public StreamResult() {} + + public StreamResult result(T result) { + this.result = result; + return this; + } + + /** + * Get result + * @return result + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public T getResult() { + return result; + } + + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setResult(T result) { + this.result = result; + } + + public StreamResult error(Status error) { + this.error = error; + return this; + } + + /** + * Get error + * @return error + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public Status getError() { + return error; + } + + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setError(Status error) { + this.error = error; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamResult streamResult = (StreamResult) o; + return Objects.equals(this.result, streamResult.result) && Objects.equals(this.error, streamResult.error); + } + + @Override + public int hashCode() { + return Objects.hash(result, error); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamResult {\n"); + sb.append(" result: ").append(toIndentedString(result)).append("\n"); + sb.append(" error: ").append(toIndentedString(error)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/src/test/java/dev/openfga/sdk/api/StreamingApiTest.java b/src/test/java/dev/openfga/sdk/api/StreamingApiTest.java new file mode 100644 index 00000000..235f0765 --- /dev/null +++ b/src/test/java/dev/openfga/sdk/api/StreamingApiTest.java @@ -0,0 +1,252 @@ +package dev.openfga.sdk.api; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.model.ListObjectsRequest; +import dev.openfga.sdk.api.model.StreamedListObjectsResponse; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Test class demonstrating the generic streaming implementation. + * This test shows how the BaseStreamingApi provides reusable functionality + * for any streaming endpoint that returns line-delimited JSON format. + */ +class StreamingApiTest { + + @Mock + private HttpClient mockHttpClient; + + @Mock + private HttpResponse> mockHttpResponse; + + @Mock + private Configuration mockConfiguration; + + @Mock + private ApiClient mockApiClient; + + private StreamedListObjectsApi streamingApi; + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + + objectMapper = new ObjectMapper(); + when(mockApiClient.getObjectMapper()).thenReturn(objectMapper); + when(mockApiClient.getHttpClient()).thenReturn(mockHttpClient); + + when(mockConfiguration.getApiUrl()).thenReturn("https://api.fga.example"); + when(mockConfiguration.getReadTimeout()).thenReturn(Duration.ofSeconds(10)); + when(mockConfiguration.getCredentials()).thenReturn(null); + when(mockConfiguration.override(any())).thenReturn(mockConfiguration); + + streamingApi = new StreamedListObjectsApi(mockConfiguration, mockApiClient); + } + + @Test + void testStreamedListObjects_successfulStream() throws Exception { + // Arrange: Create response with multiple objects + String resp = "{\"result\":{\"object\":\"document:1\"}}\n" + + "{\"result\":{\"object\":\"document:2\"}}\n" + + "{\"result\":{\"object\":\"document:3\"}}\n"; + + Stream lineStream = resp.lines(); + when(mockHttpResponse.body()).thenReturn(lineStream); + when(mockHttpResponse.statusCode()).thenReturn(200); + + when(mockHttpClient.sendAsync(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(CompletableFuture.completedFuture(mockHttpResponse)); + + // Act: Collect streamed objects + List receivedObjects = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch latch = new CountDownLatch(3); + + ListObjectsRequest request = + new ListObjectsRequest().type("document").relation("viewer").user("user:anne"); + + CompletableFuture future = streamingApi.streamedListObjects("store123", request, response -> { + receivedObjects.add(response.getObject()); + latch.countDown(); + }); + + // Assert: Wait for completion and verify results + assertTrue(latch.await(5, TimeUnit.SECONDS), "Should receive all objects"); + future.join(); + + assertEquals(3, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0)); + assertEquals("document:2", receivedObjects.get(1)); + assertEquals("document:3", receivedObjects.get(2)); + } + + @Test + void testStreamedListObjects_withErrorInStream() throws Exception { + // Arrange: Streaming response with an error + String resp = "{\"result\":{\"object\":\"document:1\"}}\n" + + "{\"error\":{\"code\":400,\"message\":\"Something went wrong\"}}\n"; + + Stream lineStream = resp.lines(); + when(mockHttpResponse.body()).thenReturn(lineStream); + when(mockHttpResponse.statusCode()).thenReturn(200); + + when(mockHttpClient.sendAsync(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(CompletableFuture.completedFuture(mockHttpResponse)); + + // Act: Collect objects and errors + List receivedObjects = Collections.synchronizedList(new ArrayList<>()); + List receivedErrors = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch latch = new CountDownLatch(1); + + ListObjectsRequest request = + new ListObjectsRequest().type("document").relation("viewer").user("user:anne"); + + CompletableFuture future = streamingApi.streamedListObjects( + "store123", request, response -> receivedObjects.add(response.getObject()), error -> { + receivedErrors.add(error); + latch.countDown(); + }); + + // Assert: Should receive one object and one error + assertTrue(latch.await(5, TimeUnit.SECONDS), "Should receive error"); + future.join(); + + assertEquals(1, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0)); + assertEquals(1, receivedErrors.size()); + assertTrue(receivedErrors.get(0).getMessage().contains("Something went wrong")); + } + + @Test + void testStreamedListObjects_httpError() throws Exception { + // Arrange: HTTP error response + when(mockHttpResponse.statusCode()).thenReturn(400); + when(mockHttpResponse.headers()) + .thenReturn(java.net.http.HttpHeaders.of(Collections.emptyMap(), (k, v) -> true)); + + when(mockHttpClient.sendAsync(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(CompletableFuture.completedFuture(mockHttpResponse)); + + // Act: Try to stream + List receivedObjects = Collections.synchronizedList(new ArrayList<>()); + AtomicBoolean errorReceived = new AtomicBoolean(false); + + ListObjectsRequest request = + new ListObjectsRequest().type("document").relation("viewer").user("user:anne"); + + CompletableFuture future = streamingApi.streamedListObjects( + "store123", + request, + response -> receivedObjects.add(response.getObject()), + error -> errorReceived.set(true)); + + // Assert: Should fail with ApiException + assertThrows(Exception.class, future::join); + assertTrue(errorReceived.get(), "Error consumer should be called"); + assertEquals(0, receivedObjects.size()); + } + + @Test + void testStreamedListObjects_emptyStream() throws Exception { + String streamResponse = ""; + + Stream lineStream = streamResponse.lines(); + when(mockHttpResponse.body()).thenReturn(lineStream); + when(mockHttpResponse.statusCode()).thenReturn(200); + + when(mockHttpClient.sendAsync(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(CompletableFuture.completedFuture(mockHttpResponse)); + + // Act: Stream with no results + List receivedObjects = Collections.synchronizedList(new ArrayList<>()); + + ListObjectsRequest request = + new ListObjectsRequest().type("document").relation("viewer").user("user:anne"); + + CompletableFuture future = streamingApi.streamedListObjects( + "store123", request, response -> receivedObjects.add(response.getObject())); + + // Assert: Should complete without errors + future.join(); + assertEquals(0, receivedObjects.size()); + } + + @Test + void testStreamedListObjects_largeStream() throws Exception { + // Arrange: Large streaming response with many objects + StringBuilder streamBuilder = new StringBuilder(); + int objectCount = 1000; + for (int i = 0; i < objectCount; i++) { + streamBuilder + .append("{\"result\":{\"object\":\"document:") + .append(i) + .append("\"}}\n"); + } + + Stream lineStream = streamBuilder.toString().lines(); + when(mockHttpResponse.body()).thenReturn(lineStream); + when(mockHttpResponse.statusCode()).thenReturn(200); + + when(mockHttpClient.sendAsync(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(CompletableFuture.completedFuture(mockHttpResponse)); + + List receivedObjects = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch latch = new CountDownLatch(objectCount); + + ListObjectsRequest request = + new ListObjectsRequest().type("document").relation("viewer").user("user:anne"); + + CompletableFuture future = streamingApi.streamedListObjects("store123", request, response -> { + receivedObjects.add(response.getObject()); + latch.countDown(); + }); + + // Assert: Should handle large stream efficiently + assertTrue(latch.await(10, TimeUnit.SECONDS), "Should receive all " + objectCount + " objects"); + future.join(); + + assertEquals(objectCount, receivedObjects.size()); + assertEquals("document:0", receivedObjects.get(0)); + assertEquals("document:999", receivedObjects.get(objectCount - 1)); + } + + @Test + void testGenericStreamResult_deserialization() throws Exception { + // Test that StreamResult properly deserializes for different types + ObjectMapper mapper = new ObjectMapper(); + + // Test with result + String jsonWithResult = "{\"result\":{\"object\":\"document:1\"}}"; + var resultType = mapper.getTypeFactory() + .constructParametricType( + dev.openfga.sdk.api.model.StreamResult.class, StreamedListObjectsResponse.class); + + Object streamResult = mapper.readValue(jsonWithResult, resultType); + assertNotNull(streamResult); + + // Test with error - code should be an integer + String jsonWithError = "{\"error\":{\"code\":400,\"message\":\"Error occurred\"}}"; + Object streamResultWithError = mapper.readValue(jsonWithError, resultType); + assertNotNull(streamResultWithError); + } +}