Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions lib/si-layer-cache/tests/integration_test/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,124 @@ async fn activities_subscribe_partial() {
.into_parts();
assert_eq!(rebase_finished_activity, restored_activity);
}

#[tokio::test]
async fn subscribe_rebaser_requests_work_queue() {
let token = CancellationToken::new();

let tempdir_slash = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir");
let tempdir_axl = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir");
let tempdir_duff = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir");
let db = setup_pg_db("subscribe_rebaser_requests_work_queue").await;

// we need a layerdb for slash, which will be a consumer of our work queue
let (ldb_slash, _): (TestLayerDb, _) = LayerDb::initialize(
tempdir_slash,
db.clone(),
setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await,
token.clone(),
)
.await
.expect("cannot create layerdb");
ldb_slash.pg_migrate().await.expect("migrate layerdb");

// we need a layerdb for axl, who will also be a consumer for our work queue
let (ldb_axl, _): (TestLayerDb, _) = LayerDb::initialize(
tempdir_axl,
db.clone(),
setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await,
token.clone(),
)
.await
.expect("cannot create layerdb");
ldb_axl.pg_migrate().await.expect("migrate layerdb");

// we need a layerdb for duff, who will also be a consumer for our work queue
let (ldb_duff, _): (TestLayerDb, _) = LayerDb::initialize(
tempdir_duff,
db,
setup_nats_client(Some("subscribe_rebaser_requests_work_queue".to_string())).await,
token.clone(),
)
.await
.expect("cannot create layerdb");
ldb_duff.pg_migrate().await.expect("migrate layerdb");

// Subscribe to a work queue of rebase activities on axl and slash
let mut axl_work_queue = ldb_axl
.subscribe_rebaser_requests_work_queue()
.await
.expect("cannot retrieve a work queue");
let mut slash_work_queue = ldb_slash
.subscribe_rebaser_requests_work_queue()
.await
.expect("cannot retrieve a work queue");

// Send a rebase request activity from duff
let rebase_request = RebaseRequest::new(Ulid::new(), Ulid::new(), Ulid::new());
let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new());
let actor = Actor::System;
let metadata = LayeredEventMetadata::new(tenancy, actor);
let rebase_request_activity = Activity::rebase(rebase_request, metadata);
// Publish an activity
ldb_duff
.publish_activity(&rebase_request_activity)
.await
.expect("cannot publish activity");

// Send a rebase finished activity
let rebase_finished = RebaseFinished::new(
si_layer_cache::activities::rebase::RebaseStatus::Error {
message: "poop".to_string(),
},
Ulid::new(),
Ulid::new(),
);
let tenancy = Tenancy::new(WorkspacePk::new(), ChangeSetId::new());
let actor = Actor::System;
let metadata = LayeredEventMetadata::new(tenancy, actor);
let rebase_finished_activity = Activity::rebase_finished(rebase_finished, metadata);
ldb_duff
.publish_activity(&rebase_finished_activity)
.await
.expect("cannot publish activity");

let which = tokio::select! {
maybe_result = slash_work_queue.next() => {
let request = maybe_result.expect("had no messages").expect("cannot retrieve the ack rebase request");
assert_eq!(request.payload, rebase_request);
request.ack().await.expect("cannot ack message");
"slash".to_string()
},
maybe_result = axl_work_queue.next() => {
let request = maybe_result.expect("had no messages").expect("cannot retrieve the ack rebase request");
assert_eq!(request.payload, rebase_request);
request.ack().await.expect("cannot ack message");
"axl".to_string()
},
};

// This is long enough to confirm that we get once-and-only-once delivery.
// It isn't long enough to confirm that we didn't ack the payload, but that
// is totally fine - we don't need to test that NATS works as directed.
let sleep = tokio::time::sleep(tokio::time::Duration::from_millis(100));
tokio::pin!(sleep);

if which == "slash" {
tokio::select! {
maybe_result = axl_work_queue.next() => {
assert!(maybe_result.is_none(), "expected no work, but there is some work to do");
},
_ = &mut sleep => {
}
}
} else {
tokio::select! {
maybe_result = slash_work_queue.next() => {
assert!(maybe_result.is_none(), "expected no work, but there is some work to do");
},
_ = &mut sleep => {
}
}
}
}