From 54c05f32be9db01bcf44107b6883eba5f26c515e Mon Sep 17 00:00:00 2001 From: Subodh Kant Chaturvedi Date: Fri, 16 Feb 2024 00:36:48 +0530 Subject: [PATCH] destination-async-framework: move the state emission logic into GlobalAsyncStateManager (#35240) --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../destination_async/FlushWorkers.java | 20 +- .../buffers/MemoryAwareMessageBatch.java | 14 +- .../state/GlobalAsyncStateManager.java | 23 +- .../src/main/resources/version.properties | 2 +- .../buffers/BufferDequeueTest.java | 1 - .../state/GlobalAsyncStateManagerTest.java | 396 +++++++++++++----- .../destination-bigquery/build.gradle | 2 +- .../destination-bigquery/metadata.yaml | 2 +- .../destination-snowflake/build.gradle | 2 +- .../destination-snowflake/metadata.yaml | 2 +- docs/integrations/destinations/bigquery.md | 3 +- docs/integrations/destinations/snowflake.md | 1 + 13 files changed, 312 insertions(+), 157 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index f8d2a961fee453..d982ff531bd3bf 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. | | 0.20.8 | 2024-02-15 | [\#35285](https://github.com/airbytehq/airbyte/pull/35285) | Improve blobstore module structure. | | 0.20.7 | 2024-02-13 | [\#35236](https://github.com/airbytehq/airbyte/pull/35236) | output logs to files in addition to stdout when running tests | | 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java index 4b3d9f489519ae..32b01c5702917b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java @@ -8,11 +8,8 @@ import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta; import io.airbyte.cdk.integrations.destination_async.state.FlushFailure; import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; -import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats; -import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -67,8 +64,6 @@ public class FlushWorkers implements AutoCloseable { private final AtomicBoolean isClosing; private final GlobalAsyncStateManager stateManager; - private final Object LOCK = new Object(); - public FlushWorkers(final BufferDequeue bufferDequeue, final DestinationFlushFunction flushFunction, final Consumer outputRecordCollector, @@ -172,7 +167,7 @@ private void flush(final StreamDescriptor desc, final UUID flushWorkerId) { AirbyteFileUtils.byteCountToDisplaySize(batch.getSizeInBytes())); flusher.flush(desc, batch.getData().stream().map(MessageWithMeta::message)); - emitStateMessages(batch.flushStates(stateIdToCount)); + batch.flushStates(stateIdToCount, outputRecordCollector); } log.info("Flush Worker ({}) -- Worker finished flushing. Current queue size: {}", @@ -222,7 +217,7 @@ public void close() throws Exception { log.info("Closing flush workers -- all buffers flushed"); // before shutting down the supervisor, flush all state. - emitStateMessages(stateManager.flushStates()); + stateManager.flushStates(outputRecordCollector); supervisorThread.shutdown(); while (!supervisorThread.awaitTermination(5L, TimeUnit.MINUTES)) { log.info("Waiting for flush worker supervisor to shut down"); @@ -239,17 +234,6 @@ public void close() throws Exception { debugLoop.shutdownNow(); } - private void emitStateMessages(final List partials) { - synchronized (LOCK) { - for (final PartialStateWithDestinationStats partial : partials) { - final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class); - message.getState().setDestinationStats(partial.stats()); - log.info("State with arrival number {} emitted from thread {}", partial.stateArrivalNumber(), Thread.currentThread().getName()); - outputRecordCollector.accept(message); - } - } - } - private static String humanReadableFlushWorkerId(final UUID flushWorkerId) { return flushWorkerId.toString().substring(0, 5); } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java index 591837196c1ae9..213f30e7768e7f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java @@ -7,9 +7,10 @@ import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta; import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager; -import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats; +import io.airbyte.protocol.models.v0.AirbyteMessage; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,16 +58,13 @@ public void close() throws Exception { } /** - * For the batch, marks all the states that have now been flushed. Also returns states that can be - * flushed. This method is descriptrive, it assumes that whatever consumes the state messages emits - * them, internally it purges the states it returns. message that it can. + * For the batch, marks all the states that have now been flushed. Also writes the states that can + * be flushed back to platform via stateManager. *

- * - * @return list of states that can be flushed */ - public List flushStates(final Map stateIdToCount) { + public void flushStates(final Map stateIdToCount, final Consumer outputRecordCollector) { stateIdToCount.forEach(stateManager::decrement); - return stateManager.flushStates(); + stateManager.flushStates(outputRecordCollector); } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java index fe4ea4a82c167f..845dfdd629ea0d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java @@ -10,13 +10,13 @@ import com.google.common.base.Strings; import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; +import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStateStats; import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.ArrayList; +import java.time.Instant; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -153,16 +154,12 @@ public void decrement(final long stateId, final long count) { } /** - * Returns state messages with no more inflight records i.e. counter = 0 across all streams. + * Flushes state messages with no more inflight records i.e. counter = 0 across all streams. * Intended to be called by {@link io.airbyte.cdk.integrations.destination_async.FlushWorkers} after * a worker has finished flushing its record batch. *

- * The return list of states should be emitted back to the platform. - * - * @return list of state messages with no more inflight records. */ - public List flushStates() { - final List output = new ArrayList<>(); + public void flushStates(final Consumer outputRecordCollector) { Long bytesFlushed = 0L; synchronized (LOCK) { for (final Map.Entry> entry : descToStateIdQ.entrySet()) { @@ -195,8 +192,13 @@ public List flushStates() { if (allRecordsCommitted) { final StateMessageWithArrivalNumber stateMessage = oldestState.getLeft(); final double flushedRecordsAssociatedWithState = stateIdToCounterForPopulatingDestinationStats.get(oldestStateId).doubleValue(); - output.add(new PartialStateWithDestinationStats(stateMessage.partialAirbyteStateMessage(), - new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState), stateMessage.arrivalNumber())); + + log.info("State with arrival number {} emitted from thread {} at {}", stateMessage.arrivalNumber(), Thread.currentThread().getName(), + Instant.now().toString()); + final AirbyteMessage message = Jsons.deserialize(stateMessage.partialAirbyteStateMessage.getSerialized(), AirbyteMessage.class); + message.getState().setDestinationStats(new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState)); + outputRecordCollector.accept(message); + bytesFlushed += oldestState.getRight(); // cleanup @@ -212,7 +214,6 @@ public List flushStates() { } freeBytes(bytesFlushed); - return output; } private Long getStateIdAndIncrement(final StreamDescriptor streamDescriptor, final long increment) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 92e02bc716a568..55d88e2da2a25d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.20.8 \ No newline at end of file +version=0.20.9 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java index eb565b90ec6d3d..669579c7af9681 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java @@ -22,7 +22,6 @@ public class BufferDequeueTest { private static final int RECORD_SIZE_20_BYTES = 20; private static final String DEFAULT_NAMESPACE = "foo_namespace"; - public static final String RECORD_20_BYTES = "abc"; private static final String STREAM_NAME = "stream1"; private static final StreamDescriptor STREAM_DESC = new StreamDescriptor().withName(STREAM_NAME); private static final PartialAirbyteMessage RECORD_MSG_20_BYTES = new PartialAirbyteMessage() diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java index fa1c60b1f0e2da..b77c4419cd1cb5 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManagerTest.java @@ -8,19 +8,17 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteStateMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteStreamState; import io.airbyte.protocol.models.Jsons; +import io.airbyte.protocol.models.v0.*; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; -import io.airbyte.protocol.models.v0.AirbyteStateStats; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -45,58 +43,105 @@ class GlobalAsyncStateManagerTest { private static final PartialAirbyteMessage GLOBAL_STATE_MESSAGE1 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() - .withType(AirbyteStateType.GLOBAL)); + .withType(AirbyteStateType.GLOBAL)) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.GLOBAL, Jsons.jsonNode(ImmutableMap.of("cursor", 1)))); private static final PartialAirbyteMessage GLOBAL_STATE_MESSAGE2 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() - .withType(AirbyteStateType.GLOBAL)); + .withType(AirbyteStateType.GLOBAL)) + .withSerialized(serializedState(STREAM2_DESC, AirbyteStateType.GLOBAL, Jsons.jsonNode(ImmutableMap.of("cursor", 2)))); + private static final PartialAirbyteMessage GLOBAL_STATE_MESSAGE3 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() - .withType(AirbyteStateType.GLOBAL)); + .withType(AirbyteStateType.GLOBAL)) + .withSerialized(serializedState(STREAM3_DESC, AirbyteStateType.GLOBAL, Jsons.jsonNode(ImmutableMap.of("cursor", 2)))); private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE1 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 1)))); private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE2 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 2)))); private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE3 = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC))) + .withSerialized(serializedState(STREAM1_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 3)))); private static final PartialAirbyteMessage STREAM2_STATE_MESSAGE = new PartialAirbyteMessage() .withType(Type.STATE) .withState(new PartialAirbyteStateMessage() .withType(AirbyteStateType.STREAM) - .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM2_DESC))); + .withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM2_DESC))) + .withSerialized(serializedState(STREAM2_DESC, AirbyteStateType.STREAM, Jsons.jsonNode(ImmutableMap.of("cursor", 4)))); + + public static String serializedState(final StreamDescriptor streamDescriptor, final AirbyteStateType type, final JsonNode state) { + switch (type) { + case GLOBAL -> { + return Jsons.serialize(new AirbyteMessage().withType(Type.STATE).withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.GLOBAL) + .withGlobal(new AirbyteGlobalState() + .withSharedState(state) + .withStreamStates(Collections.singletonList(new AirbyteStreamState() + .withStreamState(Jsons.emptyObject()) + .withStreamDescriptor(streamDescriptor)))))); + + } + case STREAM -> { + return Jsons.serialize(new AirbyteMessage().withType(Type.STATE).withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamState(state) + .withStreamDescriptor(streamDescriptor)))); + } + default -> throw new RuntimeException("LEGACY STATE NOT SUPPORTED"); + } + } @Test void testBasic() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var firstStateId = stateManager.getStateIdAndIncrementCounter(STREAM1_DESC); final var secondStateId = stateManager.getStateIdAndIncrementCounter(STREAM1_DESC); assertEquals(firstStateId, secondStateId); stateManager.decrement(firstStateId, 2); + stateManager.flushStates(emittedStatesFromDestination::add); // because no state message has been tracked, there is nothing to flush yet. - final Map stateWithStats = - stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); assertEquals(0, stateWithStats.size()); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats2 = - stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(2.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(2.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats2.values().stream().toList()); + } + + public AirbyteMessage attachDestinationStateStats(final AirbyteMessage stateMessage, final AirbyteStateStats airbyteStateStats) { + stateMessage.getState().withDestinationStats(airbyteStateStats); + return stateMessage; } @Nested @@ -104,21 +149,32 @@ class GlobalState { @Test void testEmptyQueuesGlobalState() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); // GLOBAL stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(0.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + // + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); assertThrows(IllegalArgumentException.class, () -> stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE)); } @Test void testConversion() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var preConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); @@ -129,68 +185,114 @@ void testConversion() { // Since this is actually a global state, we can only flush after all streams are done. stateManager.decrement(preConvertId0, 10); - assertEquals(List.of(), stateManager.flushStates()); + stateManager.flushStates(emittedStatesFromDestination::add); + assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(preConvertId1, 10); - assertEquals(List.of(), stateManager.flushStates()); + stateManager.flushStates(emittedStatesFromDestination::add); + assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(preConvertId2, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(30.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(30.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); } @Test void testCorrectFlushingOneStream() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(10.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + + emittedStatesFromDestination.clear(); final var afterConvertId1 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId1, 10); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats2.values().stream().toList()); } @Test void testZeroRecordFlushing() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(10.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(0.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); + emittedStatesFromDestination.clear(); final var afterConvertId2 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId2, 10); - final Map stateWithStats3 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE3), stateWithStats3.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats3.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final Map stateWithStats3 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE3.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats3.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats3.values().stream().toList()); } @Test void testCorrectFlushingManyStreams() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var preConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); @@ -198,20 +300,32 @@ void testCorrectFlushingManyStreams() { stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(preConvertId0, 10); stateManager.decrement(preConvertId1, 10); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(20.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(20.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); final var afterConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); final var afterConvertId1 = simulateIncomingRecords(STREAM2_DESC, 10, stateManager); assertEquals(afterConvertId0, afterConvertId1); stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(afterConvertId0, 20); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(GLOBAL_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(20.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(GLOBAL_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats2.values().stream().toList()); } } @@ -221,89 +335,148 @@ class PerStreamState { @Test void testEmptyQueues() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); // GLOBAL stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(0.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); assertThrows(IllegalArgumentException.class, () -> stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE)); } @Test void testCorrectFlushingOneStream() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); var stateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 3); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(3.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + + emittedStatesFromDestination.clear(); stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 10); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(10.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals(List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); } @Test void testZeroRecordFlushing() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); var stateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 3); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(3.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE); - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(0.0); + assertEquals(List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE2.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); + emittedStatesFromDestination.clear(); stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stateId, 10); - final Map stateWithStats3 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE3), stateWithStats3.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats3.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final Map stateWithStats3 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + final AirbyteStateStats expectedDestinationStats3 = new AirbyteStateStats().withRecordCount(10.0); + assertEquals(List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE3.getSerialized(), AirbyteMessage.class), expectedDestinationStats3)), + stateWithStats3.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats3), stateWithStats3.values().stream().toList()); } @Test void testCorrectFlushingManyStream() { - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final var stream1StateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager); final var stream2StateId = simulateIncomingRecords(STREAM2_DESC, 7, stateManager); stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stream1StateId, 3); - final Map stateWithStats = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(3.0)), stateWithStats.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats = new AirbyteStateStats().withRecordCount(3.0); + final Map stateWithStats = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM1_STATE_MESSAGE1.getSerialized(), AirbyteMessage.class), expectedDestinationStats)), + stateWithStats.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats), stateWithStats.values().stream().toList()); + emittedStatesFromDestination.clear(); stateManager.decrement(stream2StateId, 4); - assertEquals(List.of(), stateManager.flushStates()); + stateManager.flushStates(emittedStatesFromDestination::add); + assertEquals(List.of(), emittedStatesFromDestination); stateManager.trackState(STREAM2_STATE_MESSAGE, STATE_MSG_SIZE, DEFAULT_NAMESPACE); stateManager.decrement(stream2StateId, 3); // only flush state if counter is 0. - final Map stateWithStats2 = stateManager.flushStates().stream() - .collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats)); - assertEquals(List.of(STREAM2_STATE_MESSAGE), stateWithStats2.keySet().stream().toList()); - assertEquals(List.of(new AirbyteStateStats().withRecordCount(7.0)), stateWithStats2.values().stream().toList()); + stateManager.flushStates(emittedStatesFromDestination::add); + final AirbyteStateStats expectedDestinationStats2 = new AirbyteStateStats().withRecordCount(7.0); + final Map stateWithStats2 = + emittedStatesFromDestination.stream() + .collect(Collectors.toMap(c -> c, c -> c.getState().getDestinationStats())); + assertEquals( + List.of( + attachDestinationStateStats(Jsons.deserialize(STREAM2_STATE_MESSAGE.getSerialized(), AirbyteMessage.class), expectedDestinationStats2)), + stateWithStats2.keySet().stream().toList()); + assertEquals(List.of(expectedDestinationStats2), stateWithStats2.values().stream().toList()); } } @@ -318,21 +491,18 @@ private static long simulateIncomingRecords(final StreamDescriptor desc, final l @Test void flushingRecordsShouldNotReduceStatsCounterForGlobalState() { - final PartialAirbyteMessage globalState = new PartialAirbyteMessage() - .withState(new PartialAirbyteStateMessage().withType(AirbyteStateType.GLOBAL)) - .withSerialized(Jsons.serialize(ImmutableMap.of("cursor", "1"))) - .withType(Type.STATE); - final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); - + final List emittedStatesFromDestination = new ArrayList<>(); + final GlobalAsyncStateManager stateManager = + new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)); final long stateId = simulateIncomingRecords(STREAM1_DESC, 6, stateManager); stateManager.decrement(stateId, 4); - stateManager.trackState(globalState, 1, STREAM1_DESC.getNamespace()); - final List stateBeforeAllRecordsAreFlushed = stateManager.flushStates(); - assertEquals(0, stateBeforeAllRecordsAreFlushed.size()); + stateManager.trackState(GLOBAL_STATE_MESSAGE1, 1, STREAM1_DESC.getNamespace()); + stateManager.flushStates(emittedStatesFromDestination::add); + assertEquals(0, emittedStatesFromDestination.size()); stateManager.decrement(stateId, 2); - List stateAfterAllRecordsAreFlushed = stateManager.flushStates(); - assertEquals(1, stateAfterAllRecordsAreFlushed.size()); - assertEquals(6.0, stateAfterAllRecordsAreFlushed.get(0).stats().getRecordCount()); + stateManager.flushStates(emittedStatesFromDestination::add); + assertEquals(1, emittedStatesFromDestination.size()); + assertEquals(6.0, emittedStatesFromDestination.getFirst().getState().getDestinationStats().getRecordCount()); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 25099f1d30b77e..f5d1b05d4b5479 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.8' + cdkVersionRequired = '0.20.9' features = [ 'db-destinations', 'datastore-bigquery', diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index e73aa2723d434e..04c3fbdd22a2b5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.4.9 + dockerImageTag: 2.4.10 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 44c45c891d739e..3cc7265e2df9df 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.2' + cdkVersionRequired = '0.20.9' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index dd0c15064808d7..2f49fddbe29421 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.5.11 + dockerImageTag: 3.5.12 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index e942362f8393f0..1a6a55fa40e011 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -210,7 +210,8 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | +| 2.4.10 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | +| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | | 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | | 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 | | 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index d572da987adb04..3160d8c39dd92e 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.5.12 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | | 3.5.11 | 2024-02-12 | [35194](https://github.com/airbytehq/airbyte/pull/35194) | Reorder auth options | | 3.5.10 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | | 3.5.9 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |