From 4456e66b2dfa7eb36b051a3d5910d8105b229164 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 22 Oct 2025 10:53:12 +0400 Subject: [PATCH 1/2] Require plain json object as metadata --- docs/api/appending-events.md | 11 +-- .../kurrent/dbclient/DynamicValueMapper.java | 92 +++---------------- .../kurrent/dbclient/MultiStreamAppend.java | 1 - .../dbclient/MultiStreamAppendTests.java | 55 ++++++++--- 4 files changed, 60 insertions(+), 99 deletions(-) diff --git a/docs/api/appending-events.md b/docs/api/appending-events.md index 652209f8..3a519a17 100644 --- a/docs/api/appending-events.md +++ b/docs/api/appending-events.md @@ -256,18 +256,17 @@ This feature is only available in KurrentDB 25.1 and later. You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails. ::: warning -Currently, metadata must be valid JSON. Binary metadata will not be supported in -this version. This limitation ensures compatibility with KurrentDB's metadata -handling and will be removed in the next major release. +Metadata must be a valid JSON object, using string keys and string values only. +Binary metadata is not supported in this version to maintain compatibility with +KurrentDB's metadata handling. This restriction will be lifted in the next major +release. ::: ```java JsonMapper mapper = new JsonMapper(); Map metadata = new HashMap<>(); -metadata.put("timestamp", Instant.now().toString()); - metadata.put("source", "OrderProcessingSystem"); -metadata.put("version", 1.0); +metadata.put("source", "OrderProcessingSystem"); byte[] metadataBytes = mapper.writeValueAsBytes(metadata); diff --git a/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java b/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java index ab84d552..5da0227b 100644 --- a/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java +++ b/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java @@ -2,14 +2,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.json.JsonMapper; -import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; -import com.google.protobuf.Duration; import com.google.protobuf.Value; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZonedDateTime; import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; @@ -25,6 +19,7 @@ public class DynamicValueMapper { * * @param jsonMetadata the source metadata as JSON bytes * @return a map with DynamicValue objects + * @throws IllegalArgumentException if any metadata value is not a string */ public static Map mapJsonToValueMap(byte[] jsonMetadata) { if (jsonMetadata == null || jsonMetadata.length == 0) @@ -35,7 +30,7 @@ public static Map mapJsonToValueMap(byte[] jsonMetadata) { }); return mapToValueMap(metadata); } catch (Exception e) { - return Collections.emptyMap(); + throw new IllegalArgumentException(e); } } @@ -44,85 +39,26 @@ public static Map mapJsonToValueMap(byte[] jsonMetadata) { * * @param metadata the source metadata map * @return a map with DynamicValue objects + * @throws IllegalArgumentException if any metadata value is not a string */ - public static Map mapToValueMap(Map metadata) { + public static Map mapToValueMap(Map metadata) { if (metadata == null) { return Collections.emptyMap(); } - return metadata.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> mapToValue(entry.getValue()) - )); - } - - /** - * Converts a Java object to a DynamicValue protobuf message. - * - * @param source the source object - * @return the corresponding DynamicValue - */ - public static Value mapToValue(Object source) { - if (source == null) { - return Value.newBuilder() - .setNullValue(com.google.protobuf.NullValue.NULL_VALUE) - .build(); + for (Map.Entry entry : metadata.entrySet()) { + if (entry.getValue() != null && !(entry.getValue() instanceof String)) { + throw new IllegalArgumentException( + String.format("Metadata value for key '%s' must be a string, but was %s", + entry.getKey(), + entry.getValue().getClass().getSimpleName()) + ); + } } Value.Builder builder = Value.newBuilder(); - if (source instanceof String) { - return builder.setStringValue((String) source).build(); - } else if (source instanceof Boolean) { - return builder.setBoolValue((Boolean) source).build(); - } else if (source instanceof Integer) { - return builder.setNumberValue((Integer) source).build(); - } else if (source instanceof Long) { - return builder.setNumberValue((Long) source).build(); - } else if (source instanceof Float) { - return builder.setNumberValue((Float) source).build(); - } else if (source instanceof Double) { - return builder.setNumberValue((Double) source).build(); - } else if (source instanceof Instant) { - Instant instant = (Instant) source; - return builder.setStringValue( - Timestamp.newBuilder() - .setSeconds(instant.getEpochSecond()) - .setNanos(instant.getNano()) - .build().toString() - ).build(); - } else if (source instanceof LocalDateTime) { - LocalDateTime localDateTime = (LocalDateTime) source; - Instant instant = localDateTime.atZone(java.time.ZoneOffset.UTC).toInstant(); - return builder.setStringValue( - Timestamp.newBuilder() - .setSeconds(instant.getEpochSecond()) - .setNanos(instant.getNano()) - .build().toString() - ).build(); - } else if (source instanceof ZonedDateTime) { - ZonedDateTime zonedDateTime = (ZonedDateTime) source; - Instant instant = zonedDateTime.toInstant(); - return builder.setStringValue( - Timestamp.newBuilder() - .setSeconds(instant.getEpochSecond()) - .setNanos(instant.getNano()) - .build().toString() - ).build(); - } else if (source instanceof java.time.Duration) { - java.time.Duration duration = (java.time.Duration) source; - return builder.setStringValue( - Duration.newBuilder() - .setSeconds(duration.getSeconds()) - .setNanos(duration.getNano()) - .build().toString() - ).build(); - } else if (source instanceof byte[]) { - return builder.setStringValue(ByteString.copyFrom((byte[]) source).toStringUtf8()).build(); - } else { - // For any other type, convert to string - return builder.setStringValue(source.toString()).build(); - } + return metadata.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> builder.setStringValue((String) entry.getValue()).build())); } } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index 842b29df..e544eaba 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -2,7 +2,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Value; -import com.google.rpc.ErrorInfo; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.grpc.stub.ClientResponseObserver; diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java index 6cc2c315..ab31a4d1 100644 --- a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; @@ -36,7 +35,7 @@ public static void cleanup() { } @Test - public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException { + public void testBadEventMetadata() throws ExecutionException, InterruptedException, IOException { KurrentDBClient client = getDefaultClient(); Optional version = client.getServerVersion().get(); @@ -48,16 +47,50 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept // Arrange String streamName1 = generateName(); - String streamName2 = generateName(); Map metadata = new HashMap<>(); metadata.put("stringProperty", "hello world"); metadata.put("intProperty", 42); - metadata.put("longProperty", 9876543210L); - metadata.put("booleanProperty", true); - metadata.put("doubleProperty", 3.14159); - metadata.put("nullProperty", null); - metadata.put("timestampProperty", Instant.now().toString()); + + byte[] metadataBytes = mapper.writeValueAsBytes(metadata); + + EventData event1 = EventData.builderAsJson("event-a", "{\"data\":\"test1\"}".getBytes()) + .metadataAsBytes(metadataBytes) + .build(); + + List events1 = Collections.singletonList(event1); + + List requests = Collections.singletonList( + new AppendStreamRequest(streamName1, events1.iterator(), StreamState.noStream()) + ); + + // Act & Assert + Assertions.assertThrows(IllegalArgumentException.class, () -> { + try { + client.multiStreamAppend(requests.iterator()).get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + }); + } + + @Test + public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + + // Arrange + String streamName1 = generateName(); + String streamName2 = generateName(); + + Map metadata = new HashMap<>(); + metadata.put("stringProperty", "hello world"); byte[] metadataBytes = mapper.writeValueAsBytes(metadata); @@ -94,12 +127,6 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept Map deserializedMetadata = mapper.readValue(readMetadata, Map.class); Assertions.assertEquals(metadata.get("stringProperty"), deserializedMetadata.get("stringProperty")); - Assertions.assertEquals(metadata.get("intProperty"), deserializedMetadata.get("intProperty")); - Assertions.assertEquals(metadata.get("longProperty"), ((Number) deserializedMetadata.get("longProperty")).longValue()); - Assertions.assertEquals(metadata.get("booleanProperty"), deserializedMetadata.get("booleanProperty")); - Assertions.assertEquals((Double) metadata.get("doubleProperty"), ((Number) deserializedMetadata.get("doubleProperty")).doubleValue(), 0.00001); - Assertions.assertEquals(metadata.get("timestampProperty"), deserializedMetadata.get("timestampProperty")); - Assertions.assertNull(deserializedMetadata.get("nullProperty")); List readEvents2 = client.readStream(streamName2, ReadStreamOptions.get()).get().getEvents(); Assertions.assertEquals(1, readEvents2.size()); From abde71e2d6d512f6a9d425ef45f0eac646e50291 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 22 Oct 2025 19:01:17 +0400 Subject: [PATCH 2/2] Fix test --- .../StreamsTracingInstrumentationTests.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java index 2fad7b33..4e91ed0f 100644 --- a/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java +++ b/src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java @@ -344,7 +344,7 @@ default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws T } @Test - default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable { + default void testMultiStreamAppendIsInstrumentedWithErrors() throws Throwable { KurrentDBClient client = getDefaultClient(); Optional version = client.getServerVersion().get(); @@ -377,20 +377,22 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable StreamState.streamExists() ); - MultiStreamAppendResponse result = client.multiStreamAppend( - Arrays.asList(request1, request2).iterator() - ).get(); + WrongExpectedVersionException actualException = null; + try { + MultiStreamAppendResponse result = client.multiStreamAppend( + Arrays.asList(request1, request2).iterator() + ).get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof WrongExpectedVersionException) + actualException = (WrongExpectedVersionException) e.getCause(); + } - Assertions.assertNotNull(result); - Assertions.assertFalse(result.getResults().isEmpty()); - Assertions.assertTrue(result.getPosition() > 0); + // Ensure WrongExpectedVersionException was thrown. + Assertions.assertNotNull(actualException); List spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND); Assertions.assertEquals(1, spans.size()); - ReadableSpan span = spans.get(0); - - Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode()); - Assertions.assertEquals(SpanKind.CLIENT, span.getKind()); + assertErroneousSpanHasExpectedAttributes(spans.get(0), actualException); } }