Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archival: Improve archival STM concurrency control #16774

Merged

Conversation

Lazin
Copy link
Contributor

@Lazin Lazin commented Feb 28, 2024

Currently, the archival STM doesn't propagate errors from the background loop to the caller. The background loop applies record batches to the in-memory state of the STM. Recently, the admission control mechanism was introduced. The background loop checks if the batch can be safely applied and only after that it can apply any changes to the in-memory state of the STM. This is needed to avoid metadata inconsistencies. If the batch can't be safely applied it gets discarded. The problem is that the caller is not receiving the error code.

This PR fixes error propagation. The caller now receives an inconsistent_stm_update error in case if metadata inconsistency have been detected.

Apart from this the PR changes the sync method implementation. The sync method of the persisted_stm assumes that it will be called once in the beginning of the term. If the STM is responsible for creating its own commands (provides the write interface) it guarantees that no commands were replicated in the current term so sync guarantees that the in-memory state of the STM is up to date.

The archival STM maintains a complex data structure (the manifest) and the ntp_archiver makes decisions based on this state. Because of that it's crucial to guarantee that the state is up to date. Otherwise the ntp_archiver may try to replicate inconsistent metadata. Previously, we called sync in the beginning of the upload loop which was started in the beginning of the term.

Since then the mid-term restart was added to the ntp_archiver. In some cases we can restart the archiver which may lead to the metadata inconsistencies (they sync call doesn't help in this case because it only guarantees that all batches from previous terms are applied). We modified the sync so it would work as expected after mid-term restart. The archival STM stored the current replicate future (implemented as shared future). In the sync call we would check if there is an ongoing replicate call and wait for the future to be ready and apply some other logic based on current term/offset of the raft group.

This PR changes the implementation of the sync. It removes cached shared_future from the archival STM. To guarantee that there is no in-flight replication request the code acquires the lock (all replication operations related to the STM are performed under the lock). Then it waits until current committed_offset of the Raft group is applied to the in-memory state of the STM.

There is another concurrency control improvement in this PR. To guarantee that we can't apply inconsistent metadata even if it passes the check for some reason the read-write-fence command was added. First, the manifest is now tracking the offset of the last applied STM command in the applied_offset field. The read-write-fence command contains the applied offset from the manifest. When the background loop processes this command it compares the offset in the manifest to the offset in the command and if it's different it skips the batch. The idea is that the ntp_archiver should cache the applied offset before upload is created. When the upload is completed it should add read-write-fence command to the batch that contains archival metadata. This will guarantee that the metadata update will happen only if the manifest didn't change concurrently during the upload operation.

Updates

Force push - rebase with dev
Force push - make sync method return optional<offset> that contains nullopt on failure or read-write-fence offset on success. The offset is supposed to be used to add a fence to the command batch.
Force push - avoid triggering the per-operation promise when STM is catching up (commit has detailed description)

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.3.x
  • v23.2.x
  • v23.1.x

Release Notes

Improvements

  • Improve concurrency control and metadata consistency in Tiered-Storage.

@Lazin Lazin added the area/cloud-storage Shadow indexing subsystem label Feb 28, 2024
Comment on lines 311 to 312
config::shard_local_cfg()
.cloud_storage_disable_metadata_consistency_checks.set_value(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a scoped_config for this so it gets reset when the test exits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally forgot about the problem. Thanks!

vassert(
!_lock.try_get_units().has_value(),
"Attempt to replicate STM command while not under lock");
vassert(
_active_operation_res.has_value() == false, "Concurrency violation");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be cleared if there is an error (e.g. timeout). Otherwise it seems like the next attempt to replicate will hit this assertion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

co_return synced;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it makes sense to add an assert that _active_operation_res here. In theory we should be guaranteed it has completed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense, we don't want it to be carried over from previous operation

@@ -190,9 +190,13 @@ struct archival_metadata_stm::update_highest_producer_id_cmd {
using value = model::producer_id;
};

// Serde format description
// v4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -1367,6 +1402,21 @@ void archival_metadata_stm::apply_reset_metadata() {
_manifest->unsafe_reset();
}

bool archival_metadata_stm::apply_read_write_fence(
const archival_metadata_stm::read_write_fence_cmd& cmd) noexcept {
if (_manifest->get_applied_offset() > cmd.last_applied_offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible/expected to be able to have get_applied_offset() < cmd.last_applied_offset? Does it imply some kind of gap in the log or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's possible in our use case.

/// Add update_highest_producer_id_cmd command
command_batch_builder&
update_highest_producer_id(model::producer_id highest_pid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: meant for a different PR? Or is this just added for completeness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@Lazin Lazin force-pushed the feature/propagate-archival-stm-apply-error branch 2 times, most recently from 8862052 to 58532ac Compare March 8, 2024 18:18
@Lazin Lazin requested a review from andrwng March 8, 2024 18:18
@Lazin Lazin force-pushed the feature/propagate-archival-stm-apply-error branch from 98c431f to 74676d8 Compare March 10, 2024 09:18
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Mar 10, 2024

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e27ee-8983-4da8-b311-b41c519c7422:

"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_from_cloud.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.shadow_indexing_compacted_topic_test.ShadowIndexingCompactedTopicTest.test_upload.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast3.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e27ee-898d-4b17-b8ec-dfed2cb4417e:

"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_from_cloud.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e27ee-898a-45a1-969d-c07471959f74:

"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.offset_for_leader_epoch_archival_test.OffsetForLeaderEpochArchivalTest.test_querying_archive"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast2.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e27ee-8986-43f4-8202-bf582bb41e0f:

"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.tiered_storage_model_test.TieredStorageTest.test_tiered_storage.cloud_storage_type=CloudStorageType.ABS.test_case=.TS_Delete==True.SpilloverManifestUploaded==True.TS_Spillover_ManifestDeleted==True"
"rptest.tests.shadow_indexing_compacted_topic_test.ShadowIndexingCompactedTopicTest.test_upload.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast2.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e2800-a3c0-43b7-a728-77227dd03eda:

"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=off.test_mode=TestMode.FAST_MOVES.cleanup_policy=compact"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast2.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_size_based_retention.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e2800-a3c3-4d59-a5f5-c392cff998d7:

"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=off.test_mode=TestMode.FAST_MOVES.cleanup_policy=compact.delete"
"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.NO_TIRED_STORAGE.cleanup_policy=compact.delete"
"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=off.test_mode=TestMode.TIRED_STORAGE.cleanup_policy=compact.delete"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast2.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e2800-a3bd-4922-8f51-dc28a7f27975:

"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.TIRED_STORAGE.cleanup_policy=compact.delete"
"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.FAST_MOVES.cleanup_policy=compact.delete"
"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast1.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast3.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_time_based_retention.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/45948#018e2800-a3c6-4ce0-b872-f2ae6fffd81f:

"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.FAST_MOVES.cleanup_policy=compact"
"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.TIRED_STORAGE.cleanup_policy=compact"
"rptest.tests.random_node_operations_test.RandomNodeOperationsTest.test_node_operations.enable_failures=False.num_to_upgrade=0.with_tiered_storage=True"
"rptest.tests.random_node_operations_test.RandomNodeOperationsTest.test_node_operations.enable_failures=True.num_to_upgrade=0.with_tiered_storage=True"
"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.topic_recovery_test.TopicRecoveryTest.test_fast1.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/45949#018e28d8-a2c4-4e04-b951-0733e8752b4a:

"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/45949#018e28d8-a2ce-444e-9c71-81cc326fe89f:

"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/45949#018e28d8-a2cb-4783-8b3b-5ca346402b56:

"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/45949#018e28d8-a2c8-4d51-bce2-771b47b8c9d7:

"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/45949#018e28ea-2690-42de-b463-6a0d02aad556:

"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/45949#018e28ea-2686-4091-a9bb-8e85a9c60cf9:

"rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/46565#018e61a6-786d-4753-b052-bebcb9e55ebe:

"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_from_cloud.cloud_storage_type=CloudStorageType.ABS"

@Lazin Lazin force-pushed the feature/propagate-archival-stm-apply-error branch from 74676d8 to c119bac Compare March 10, 2024 13:33
@Lazin Lazin force-pushed the feature/propagate-archival-stm-apply-error branch 3 times, most recently from aaf9295 to e35b079 Compare March 11, 2024 13:06
const bool synced = co_await persisted_stm::sync(timeout);
ss::future<bool> archival_metadata_stm::do_sync(
model::timeout_clock::duration timeout, ss::abort_source* as) {
co_await raft::persisted_stm<>::sync(timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we handle the result here? Persisted stm sync will return false in couple of cases like not a leader or timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -1311,6 +1265,7 @@ void archival_metadata_stm::apply_add_segment(const segment& segment) {
"Can't add segment: {}, previous segment: {}",
meta,
last);
maybe_notify_waiter(errc::inconsistent_stm_update);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we throw here instead and let the try-catch bock in apply propagate the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be registered and propagated as an exception and we need an error code
this can be done of cause but will require more code and separate code path to call maybe_notify_waiter with the error code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't then we have a case where the STM is a bit behind, we call replicate (we even include the fence command) and set _active_operation_res but then ... maybe_notify_waiter is called for an older command? What's preventing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the protocol is to call sync first and only after that call replicate
so basically, you're calling sync, getting read-write-fence value and then replicating data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think some external failsafe mechanism should be added to prevent incorrect usage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I believe this implementation is still problematic. Anything that prevents the caller to call sync then do some other async thing and then call replicate? The correctness of the STM still relies on assumptions which is not better than what we had from what I understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess one way to fix this is to returns some value from the sync method and then require user to pass this value to the replicate method. I didn't went for it because for now I want to keep original ntp_archiver mostly intact and this will require a lot of changes there.

Another idea that I had (and I also had an implementation which I removed later because it only got more confusing) is to have some sort of cursor that can be used to write to the STM. You can only obtain such object using sync call and if such object exists nothing else can write to the STM.

I'll think about something that can be done here, maybe we can rely on retry-chain-node instance to check the identity of the waiter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I solved this problem by forcing sync in case if the STM is catching up. So we're only setting the promise when the STM is up to date and we're holding the lock so it's guaranteed that the promise will get the correct result.

// All keys other than mark clean make the manifest dirty
_last_dirty_at = base_offset + model::offset{r.offset_delta()};
}
auto on_exit = ss::defer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind me please, Is the STM re-created on leadership change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not re-created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up question in wrong thread: #16774 (comment)

@nvartolomei
Copy link
Contributor

/cdt

@Lazin Lazin force-pushed the feature/propagate-archival-stm-apply-error branch from e35b079 to 25e0cd3 Compare March 20, 2024 13:53
@Lazin Lazin requested a review from nvartolomei March 20, 2024 13:53
Currently, the 'ntp_archiver' calls 'sync' method in every iteration to
avoid inconsistencies. It uses custom implementation that takes into
account the in-flight replication operation, current state of the raft
group (term, committed offset), and state of the manifest.

This commit changes this custom 'sync' implementation. Instead of
storing the in-flight operation we're just holding a lock and waiting
for the data in the log to be applied to the in-memory state of the STM.
The field contains an offset of the last applied command. It's advanced
by the 'archival_metadata_stm' every time the command is applied to the
in-memory state of the STM.

The commit contains a unit-test that checks that the field is serialized
correctly. The field is not added to the json manifest on purpose. The
json compatibility is tricky and it's OK for this field to be default
initialized after the manfiest reset.
The command is supposed to be used as a concurrency control mechanism.
The user of the archival STM (ntp-archiver) should read the
applied_offset from the manifest before making any changes. Then, before
the segments are addded to the command builder (or any other changes)
the user should add `read_write_fence` command using the applied offset.
When the command is replicated the user will get an error if there was
any change to the in-memory state of the STM between the moment when the
applied_offset was acquired and the moment when the commands generated
by the user are applied. In other words, the read-write-fence guarantees
that the changes to the in-memory state are not made based on the stale
state. It guarantees that there is only one active writer.

The 'concurrent_modification_error' is returned if the property of the
read-write-fence is violated. The fence commmand is only applied to the
current batch. It can only abort commands which are added to the record
batch after it. If the read-write-fence command is added to the record
batch first it will abort the entire batch. If the command is added to
the record batch last it will not have any effect.
Add new method to command_batch_builder. The method adds
'update_highest_producer_id_cmd' command to the record batch.
Add wait_for_apply method that waits until all messages in the log are
applied. Invoke this method in tests to guarantee graceful shutdown.
@Lazin Lazin force-pushed the feature/propagate-archival-stm-apply-error branch from 25e0cd3 to 2dc4dbc Compare March 21, 2024 14:42
Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting there. Just clarifying / pointing out some behaviors that are worth thinking about, but no meaningful requests for changes

@@ -1314,6 +1269,7 @@ void archival_metadata_stm::apply_add_segment(const segment& segment) {
"Can't add segment: {}, previous segment: {}",
meta,
last);
maybe_notify_waiter(errc::inconsistent_stm_update);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not a part of this change, but should we be doing this for most/every apply_* function that logs an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that this method is called explicitly in few places when we have the error code only to return inconsistency or concurrency violation error. The remaining errors are exceptions and handled in the try/catch in one place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remaining errors are exceptions and handled in the try/catch in one place.

if you threw, could the error handling be unified in one place?

Comment on lines +1034 to +1035
on_exit.cancel();
maybe_notify_waiter(std::current_exception());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting down some of the remaining odd semantics that are maybe possible? Though not 100% sure.

  • wait_no_throw() returns after replicating, even though the op that was being replicated didn't get applied, because we're a stale follower that previously thought we were leader. I don't think this is problematic, since things downstream of the wait_no_throw() will likely detect we're no longer leader. Still a rough edge IMO.
  • sync() returns success but nothing is preventing more data from coming in and being applied before the call to replicate(). Not necessarily a bug, but is error prone. Perhaps sync() should return a token that can be passed to replicate() that's only valid while nothing else is applied to archival.
  • some ops succeed, while others fail in the same batch. As written, it seems like we'll notify waiters on first failure, but again, probably no practical impact since we also wait_no_throw() before awaiting on the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption is that the caller of the sync is the only caller. It's not enforced now because I want to keep existing ntp_archiver as is. Also, I'll try to make a change and return fence offset from the sync method.

In the future PR I'll try to enforce one caller invariant at compile time (thinking about acquire/release semantics for it, so the archiver will acquire the STM and then call sync). This PR adds the mechanism (fence and error propagation) without changing the interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more I think that we don't even need to do this. When we replicating config batch we're acquiring the lock. Then, while holding the lock we're setting the promise, replicating the batch and calling wait_no_throw and then waiting for the promise to be set. The entire lifecycle of the promise is done under the lock so such violation is not possible. Concurrent fiber may try to replicate something when the promise is created but not yet set but it will get stuck waiting for the lock.

Comment on lines 1271 to +1272
last);
maybe_notify_waiter(errc::inconsistent_stm_update);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple ops to apply in the batch, and we fail on the first one, it seems like we will notify waiters immediately without waiting for the rest of the batch to be applied. Is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the fence should be the first in the batch and supposed to abort the batch on failure

/// applied to the in-memory state of the STM if the 'applied_offset' is
/// greater than the 'offset'. This mechanism is supposed to be used as a
/// concurrency-control mechanism.
command_batch_builder& read_write_fence(model::offset offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is the only reason to allow an input offset for testing? It may be less error prone to just call into the stm for the last applied offset, and have tests explicitly interleave construction of the builders (don't feel strongly about it though)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not for testing. The STM doesn't know when you accessed the manifest so it can't start building the batch at the right moment. The user is supposed to read the fence offset manually and then use this method to add it to the batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're also allowed not to use it. For instance, in the unsafe manifest reset you should just avoid using it and let possible concurrent operation to be interrupted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense. Thanks for explaining

Comment on lines +100 to +101
/// applied to the in-memory state of the STM if the 'applied_offset' is
/// greater than the 'offset'. This mechanism is supposed to be used as a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite true though, right? It should exactly match the offset?

Also maybe it makes sense to mention this should always be the first record written to an archival batch? Perhaps we could even assert that to be the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lazin can you respond here?

andrwng
andrwng previously approved these changes Mar 21, 2024
Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the suggestion to have an explicit ordering requirement between sync() and replicate() makes sense, but I don't think it should block this change.

The questions I have aren't blocking if the behaviors pointed out are intentional.

Would be great to correct the fence comment though

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 3 separate PRs, one for each of the separate logical changes described in the cover letter?

Comment on lines 1094 to 963
};
});
switch (key) {
case add_segment_cmd::key:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting in a separate commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting changed due to extra indent level from try-catch bock. I don't understand why this need to be a separate commit. It's part of the change. All this code uses different exception handling approach now and it's visible in the commit history.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this need to be a separate commit. It's part of the change.

It doesn't have to be, but the importance of making commits easy to review increases with the size/complexity of the commit itself. You could add that try-catch block as a commit before this one as a mechanical change. Then it would lower the cognitive complexity of the next commit. It'd be a silly commit, but it would lower complexity.

@Lazin
Copy link
Contributor Author

Lazin commented Mar 22, 2024

Should this be 3 separate PRs, one for each of the separate logical changes described in the cover letter?

It could be but the PR is moderately sized (only 500 lines) and all changes are related. Error propagation is needed so the caller could see the error from read-write fence.

The "sync" method returns optional<offset> value which contains last
applied offset from the manifest on success or nullopt on failure.
The caller is supposed to use this offset to add a read_write_fence
command to the archival configuration batch.
The 'replicate' method of the archival_metadata_stm using the following
algorithm:
1. Acquire the lock.
2. Create the promise.
3. Replicate the configuration batch and get its offset.
4. Wait until the offset is applied.
5. Wait until the future associated with the created promise is set.
6. Return the result and release the lock.

The background fiber of the archival STM is applying every record batch
to the in-memory state. If the promise is created the background loop
will
- set it to 'errc::success' when the batch is applied and reset it
- set it to error value if the batch can't be applied

Note that the promise is used as a one time mailbox. The replicate
method creates a promise and replicates the batch and then the
background fiber applies this record batch and sends result to the
mailbox (our promise).

This is correct under assumption that the in-memory state of the STM is
up to date (sync method was called before calling repliate). This
assumption is not always correct.

IF between the 'sync' and 'replciate' calls another fiber invoked
'replicate' there is no error because the 'replicate' is waiting until
all changes are applied to the STM. The only problem that we may have is
when the caller invokes 'replicate' without calling 'sync' first. In
this case some old batch may trigger the promise and we will get
incorrect error code in the 'replicate' method.

To avoid this this commit adds extra step to the 'replicate' method. If
'insync' offset of the manifest is lower than 'committed' offset of the
Raft group it will call 'do_sync'. After that it will proceed to step 2
in the algorithm described above. The final algorithm looks like this:

1. Acquire the lock.
2. Call sync if in-sync offset is less than committed offset.
3. Create the promise.
4. Replicate the configuration batch and get its offset.
5. Wait until the offset is applied.
6. Wait until the future associated with the created promise is set.
7. Return the result and release the lock.
@dotnwat
Copy link
Member

dotnwat commented Mar 22, 2024

It could be but the PR is moderately sized (only 500 lines) and all changes are related.

PR "size" is unrelated to LOC. Size is measured IMO as the amount of accidental complexity.

Comment on lines +238 to +239
ss::future<std::optional<model::offset>>
sync(model::timeout_clock::duration timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you please add comments about how users are supposed to use/think about the returned offset?

@@ -730,17 +730,29 @@ ss::future<std::error_code> archival_metadata_stm::process_anomalies(
co_return co_await builder.replicate();
}

ss::future<bool>
ss::future<std::optional<model::offset>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller is supposed to use this offset to add a read_write_fence
command to the archival configuration batch.

This will be in a follow-up change, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lazin ?

Comment on lines +805 to +806
// If the caller didn't invoke `sync` before calling `replicate` we
// might have some batches which are not applied to the STM yet. These
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking generally about how to make it clearer that this behavior is correct via tests.. it's tough because instantiating the archival stm in isolation and stressing it to exercise these code paths is so tough.

The only problem that we may have is when the caller invokes 'replicate' without calling 'sync' first. In this case some old batch may trigger the promise and we will get incorrect error code in the 'replicate' method.

Thinking about this specifically, I wonder if there's a way we could artificially/randomly delay the apply fiber and artificially replicate()/sync() from many different fibers in a test. Then the invariant we want to uphold is that all replicate() calls that succeeded are reflected in the manifest, and all that failed are not.

I guess it'd be tough to test with the archival stm's current interface because there are so many rules baked into archival operations, but just thinking out loud. It'd be great if this logic were encapsulated in the persisted stm -- then we could implement a dummy stm that's just a kvstore to tests the concurrency mechanics without also having to work around the stm's logical invariants.

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the command is replicated the user will get an error if there was
any change to the in-memory state of the STM between the moment when the
applied_offset was acquired and the moment when the commands generated
by the user are applied. In other words, the read-write-fence guarantees
that the changes to the in-memory state are not made based on the stale
state. It guarantees that there is only one active writer.

Makes sense. Is the manifest applied offset updated in lock step with actually applying offsets and also backed up into s3? that's the part i don't quite understand.

Comment on lines -722 to -723
if (res || _last_replicate->term < _raft->term()) {
_last_replicate = std::nullopt;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible for the result to have been 'success' but the term also changed? if so, what would that mean?

@@ -1314,6 +1269,7 @@ void archival_metadata_stm::apply_add_segment(const segment& segment) {
"Can't add segment: {}, previous segment: {}",
meta,
last);
maybe_notify_waiter(errc::inconsistent_stm_update);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remaining errors are exceptions and handled in the try/catch in one place.

if you threw, could the error handling be unified in one place?

"Unexpected error while applying changes to "
"archival_metadata_stm: {}",
std::current_exception());
on_exit.cancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the raii thing is nice, but catch-all handler here deals with it correctly in that you could move the notification for success down below the catch handler, right? or is there a control flow there im missing? maybe just conservative / safety?


/// Advance last applied STM command offset
void advance_applied_offset(model::offset o) noexcept {
_applied_offset = std::max(_applied_offset, o);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please document what race is occurring to require the max here.

@@ -582,6 +589,7 @@ ss::future<> archival_metadata_stm::make_snapshot(
.start_kafka_offset = m.get_start_kafka_offset_override(),
.spillover_manifests = std::move(spillover),
.highest_producer_id = m.highest_producer_id(),
.applied_offset = m.get_applied_offset(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the behavior change of the system and dependence on get applied offset going to necessitate a feature flag?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloud_storage: Add 'applied_offset' to the manifest
The field contains an offset of the last applied command. It's advanced
by the 'archival_metadata_stm' every time the command is applied to the
in-memory state of the STM.

This commit message is missing the critical component of why this is being added. An STM always knows its last applied offset because its the one applying it. So the commit message needs to explain why that information should be durable. Presumably that is because the STM has external effects?

How does concurrency work? For example, are we creating a new manifest for every command applied? If so, does the manifest get uploaded too before the next command is applied. If a new manifest isn't created for every command then what does the field really mean?

Comment on lines +1057 to +1063
if (apply_read_write_fence(
serde::from_iobuf<read_write_fence_cmd>(
r.release_value()))) {
// This means that there is a concurrency violation. The
// fence was created before some other command was
// applied. We can't apply the commands from this batch.
return ss::stop_iteration::yes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice. so is this resumable? you skip the batch, or this indicates a terrible situation?

Comment on lines +100 to +101
/// applied to the in-memory state of the STM if the 'applied_offset' is
/// greater than the 'offset'. This mechanism is supposed to be used as a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lazin can you respond here?

@@ -730,17 +730,29 @@ ss::future<std::error_code> archival_metadata_stm::process_anomalies(
co_return co_await builder.replicate();
}

ss::future<bool>
ss::future<std::optional<model::offset>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lazin ?

Comment on lines +365 to +367
auto is_synced = co_await _parent.archival_meta_stm()->sync(
sync_timeout);
if (!is_synced) {
if (!is_synced.has_value()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_synced isn't really a good name now, right? it's also odd to have a optional engaged check, but ignore the stored value. per Andrew's comment, is that changing in a subsequent PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the promise is used as a one time mailbox. The replicate
method creates a promise and replicates the batch and then the
background fiber applies this record batch and sends result to the
mailbox (our promise).

This is correct under assumption that the in-memory state of the STM is
up to date (sync method was called before calling repliate). This
assumption is not always correct.

IF between the 'sync' and 'replciate' calls another fiber invoked
'replicate'

Ok, but I thought the point of the mutex was to deal with this?

@dotnwat dotnwat merged commit 0d37b5a into redpanda-data:dev Mar 23, 2024
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/cloud-storage Shadow indexing subsystem area/redpanda
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants