Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions docs/api/appending-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,17 @@ This feature is only available in KurrentDB 25.1 and later.
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.

::: warning
Currently, metadata must be valid JSON. Binary metadata will not be supported in
this version. This limitation ensures compatibility with KurrentDB's metadata
handling and will be removed in the next major release.
Metadata must be a valid JSON object, using string keys and string values only.
Binary metadata is not supported in this version to maintain compatibility with
KurrentDB's metadata handling. This restriction will be lifted in the next major
release.
:::

```java
JsonMapper mapper = new JsonMapper();

Map<String, Object> metadata = new HashMap<>();
metadata.put("timestamp", Instant.now().toString());
metadata.put("source", "OrderProcessingSystem");
metadata.put("version", 1.0);
metadata.put("source", "OrderProcessingSystem");

byte[] metadataBytes = mapper.writeValueAsBytes(metadata);

Expand Down
92 changes: 14 additions & 78 deletions src/main/java/io/kurrent/dbclient/DynamicValueMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.Duration;
import com.google.protobuf.Value;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -25,6 +19,7 @@ public class DynamicValueMapper {
*
* @param jsonMetadata the source metadata as JSON bytes
* @return a map with DynamicValue objects
* @throws IllegalArgumentException if any metadata value is not a string
*/
public static Map<String, Value> mapJsonToValueMap(byte[] jsonMetadata) {
if (jsonMetadata == null || jsonMetadata.length == 0)
Expand All @@ -35,7 +30,7 @@ public static Map<String, Value> mapJsonToValueMap(byte[] jsonMetadata) {
});
return mapToValueMap(metadata);
} catch (Exception e) {
return Collections.emptyMap();
throw new IllegalArgumentException(e);
}
}

Expand All @@ -44,85 +39,26 @@ public static Map<String, Value> mapJsonToValueMap(byte[] jsonMetadata) {
*
* @param metadata the source metadata map
* @return a map with DynamicValue objects
* @throws IllegalArgumentException if any metadata value is not a string
*/
public static Map<String, Value> mapToValueMap(Map<String, Object> metadata) {
public static Map<String, Value> mapToValueMap(Map<String, ?> metadata) {
if (metadata == null) {
return Collections.emptyMap();
}

return metadata.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> mapToValue(entry.getValue())
));
}

/**
* Converts a Java object to a DynamicValue protobuf message.
*
* @param source the source object
* @return the corresponding DynamicValue
*/
public static Value mapToValue(Object source) {
if (source == null) {
return Value.newBuilder()
.setNullValue(com.google.protobuf.NullValue.NULL_VALUE)
.build();
for (Map.Entry<String, ?> entry : metadata.entrySet()) {
if (entry.getValue() != null && !(entry.getValue() instanceof String)) {
throw new IllegalArgumentException(
String.format("Metadata value for key '%s' must be a string, but was %s",
entry.getKey(),
entry.getValue().getClass().getSimpleName())
);
}
}

Value.Builder builder = Value.newBuilder();

if (source instanceof String) {
return builder.setStringValue((String) source).build();
} else if (source instanceof Boolean) {
return builder.setBoolValue((Boolean) source).build();
} else if (source instanceof Integer) {
return builder.setNumberValue((Integer) source).build();
} else if (source instanceof Long) {
return builder.setNumberValue((Long) source).build();
} else if (source instanceof Float) {
return builder.setNumberValue((Float) source).build();
} else if (source instanceof Double) {
return builder.setNumberValue((Double) source).build();
} else if (source instanceof Instant) {
Instant instant = (Instant) source;
return builder.setStringValue(
Timestamp.newBuilder()
.setSeconds(instant.getEpochSecond())
.setNanos(instant.getNano())
.build().toString()
).build();
} else if (source instanceof LocalDateTime) {
LocalDateTime localDateTime = (LocalDateTime) source;
Instant instant = localDateTime.atZone(java.time.ZoneOffset.UTC).toInstant();
return builder.setStringValue(
Timestamp.newBuilder()
.setSeconds(instant.getEpochSecond())
.setNanos(instant.getNano())
.build().toString()
).build();
} else if (source instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) source;
Instant instant = zonedDateTime.toInstant();
return builder.setStringValue(
Timestamp.newBuilder()
.setSeconds(instant.getEpochSecond())
.setNanos(instant.getNano())
.build().toString()
).build();
} else if (source instanceof java.time.Duration) {
java.time.Duration duration = (java.time.Duration) source;
return builder.setStringValue(
Duration.newBuilder()
.setSeconds(duration.getSeconds())
.setNanos(duration.getNano())
.build().toString()
).build();
} else if (source instanceof byte[]) {
return builder.setStringValue(ByteString.copyFrom((byte[]) source).toStringUtf8()).build();
} else {
// For any other type, convert to string
return builder.setStringValue(source.toString()).build();
}
return metadata.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> builder.setStringValue((String) entry.getValue()).build()));
}
}
1 change: 0 additions & 1 deletion src/main/java/io/kurrent/dbclient/MultiStreamAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.Value;
import com.google.rpc.ErrorInfo;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.grpc.stub.ClientResponseObserver;
Expand Down
55 changes: 41 additions & 14 deletions src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -36,7 +35,7 @@ public static void cleanup() {
}

@Test
public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException {
public void testBadEventMetadata() throws ExecutionException, InterruptedException, IOException {
KurrentDBClient client = getDefaultClient();

Optional<ServerVersion> version = client.getServerVersion().get();
Expand All @@ -48,16 +47,50 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept

// Arrange
String streamName1 = generateName();
String streamName2 = generateName();

Map<String, Object> metadata = new HashMap<>();
metadata.put("stringProperty", "hello world");
metadata.put("intProperty", 42);
metadata.put("longProperty", 9876543210L);
metadata.put("booleanProperty", true);
metadata.put("doubleProperty", 3.14159);
metadata.put("nullProperty", null);
metadata.put("timestampProperty", Instant.now().toString());

byte[] metadataBytes = mapper.writeValueAsBytes(metadata);

EventData event1 = EventData.builderAsJson("event-a", "{\"data\":\"test1\"}".getBytes())
.metadataAsBytes(metadataBytes)
.build();

List<EventData> events1 = Collections.singletonList(event1);

List<AppendStreamRequest> requests = Collections.singletonList(
new AppendStreamRequest(streamName1, events1.iterator(), StreamState.noStream())
);

// Act & Assert
Assertions.assertThrows(IllegalArgumentException.class, () -> {
try {
client.multiStreamAppend(requests.iterator()).get();
} catch (ExecutionException e) {
throw e.getCause();
}
});
}

@Test
public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException {
KurrentDBClient client = getDefaultClient();

Optional<ServerVersion> version = client.getServerVersion().get();

Assumptions.assumeTrue(
version.isPresent() && version.get().isGreaterOrEqualThan(25, 0),
"Multi-stream append is not supported server versions below 25.0.0"
);

// Arrange
String streamName1 = generateName();
String streamName2 = generateName();

Map<String, String> metadata = new HashMap<>();
metadata.put("stringProperty", "hello world");

byte[] metadataBytes = mapper.writeValueAsBytes(metadata);

Expand Down Expand Up @@ -94,12 +127,6 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept

Map deserializedMetadata = mapper.readValue(readMetadata, Map.class);
Assertions.assertEquals(metadata.get("stringProperty"), deserializedMetadata.get("stringProperty"));
Assertions.assertEquals(metadata.get("intProperty"), deserializedMetadata.get("intProperty"));
Assertions.assertEquals(metadata.get("longProperty"), ((Number) deserializedMetadata.get("longProperty")).longValue());
Assertions.assertEquals(metadata.get("booleanProperty"), deserializedMetadata.get("booleanProperty"));
Assertions.assertEquals((Double) metadata.get("doubleProperty"), ((Number) deserializedMetadata.get("doubleProperty")).doubleValue(), 0.00001);
Assertions.assertEquals(metadata.get("timestampProperty"), deserializedMetadata.get("timestampProperty"));
Assertions.assertNull(deserializedMetadata.get("nullProperty"));

List<ResolvedEvent> readEvents2 = client.readStream(streamName2, ReadStreamOptions.get()).get().getEvents();
Assertions.assertEquals(1, readEvents2.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws T
}

@Test
default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable {
default void testMultiStreamAppendIsInstrumentedWithErrors() throws Throwable {
KurrentDBClient client = getDefaultClient();

Optional<ServerVersion> version = client.getServerVersion().get();
Expand Down Expand Up @@ -377,20 +377,22 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable
StreamState.streamExists()
);

MultiStreamAppendResponse result = client.multiStreamAppend(
Arrays.asList(request1, request2).iterator()
).get();
WrongExpectedVersionException actualException = null;
try {
MultiStreamAppendResponse result = client.multiStreamAppend(
Arrays.asList(request1, request2).iterator()
).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof WrongExpectedVersionException)
actualException = (WrongExpectedVersionException) e.getCause();
}

Assertions.assertNotNull(result);
Assertions.assertFalse(result.getResults().isEmpty());
Assertions.assertTrue(result.getPosition() > 0);
// Ensure WrongExpectedVersionException was thrown.
Assertions.assertNotNull(actualException);

List<ReadableSpan> spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND);
Assertions.assertEquals(1, spans.size());

ReadableSpan span = spans.get(0);

Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode());
Assertions.assertEquals(SpanKind.CLIENT, span.getKind());
assertErroneousSpanHasExpectedAttributes(spans.get(0), actualException);
}
}
Loading