Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage controller: robustness improvements #7027

Merged
merged 16 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 22 additions & 13 deletions control_plane/attachment_service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ impl Service {
async fn startup_reconcile(self: &Arc<Service>) {
// For all tenant shards, a vector of observed states on nodes (where None means
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed = HashMap::new();
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
HashMap::new();

let mut nodes_online = HashSet::new();

Expand Down Expand Up @@ -235,7 +236,8 @@ impl Service {
nodes_online.insert(node_id);

for (tenant_shard_id, conf_opt) in tenant_shards {
observed.insert(tenant_shard_id, (node_id, conf_opt));
let shard_observations = observed.entry(tenant_shard_id).or_default();
shard_observations.push((node_id, conf_opt));
}
}

Expand All @@ -257,21 +259,22 @@ impl Service {
}
*nodes = Arc::new(new_nodes);

for (tenant_shard_id, (node_id, observed_loc)) in observed {
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};

tenant_state
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
for (tenant_shard_id, shard_observations) in observed {
for (node_id, observed_loc) in shard_observations {
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
tenant_state
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
}
}

// Populate each tenant's intent state
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
tenant_state.intent_from_observed();
tenant_state.intent_from_observed(scheduler);
if let Err(e) = tenant_state.schedule(scheduler) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
Expand Down Expand Up @@ -933,6 +936,12 @@ impl Service {
// Ordering: we must persist generation number updates before making them visible in the in-memory state
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;

tracing::info!(
node_id=%reattach_req.node_id,
"Incremented {} tenant shards' generations",
incremented_generations.len()
);

// Apply the updated generation to our in-memory state
let mut locked = self.inner.write().unwrap();

Expand Down
61 changes: 58 additions & 3 deletions control_plane/attachment_service/src/tenant_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,12 @@ impl TenantState {
/// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
/// to get an intent state that complies with placement policy. The overall goal is to do scheduling
/// in a way that makes use of any configured locations that already exist in the outside world.
pub(crate) fn intent_from_observed(&mut self) {
pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
tracing::info!("Observed:");
for (node_id, loc) in &self.observed.locations {
tracing::info!("{node_id}: {:?}", loc.conf);
jcsp marked this conversation as resolved.
Show resolved Hide resolved
}

// Choose an attached location by filtering observed locations, and then sorting to get the highest
// generation
let mut attached_locs = self
Expand All @@ -398,7 +403,7 @@ impl TenantState {

attached_locs.sort_by_key(|i| i.1);
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
self.intent.attached = Some(*node_id);
self.intent.set_attached(scheduler, Some(*node_id));
}

// All remaining observed locations generate secondary intents. This includes None
Expand All @@ -409,7 +414,7 @@ impl TenantState {
// will take care of promoting one of these secondaries to be attached.
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.secondary.push(*node_id);
self.intent.push_secondary(scheduler, *node_id);
}
});
}
Expand Down Expand Up @@ -930,4 +935,54 @@ pub(crate) mod tests {

Ok(())
}

#[test]
fn intent_from_observed() -> anyhow::Result<()> {
let nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());

let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));

tenant_state.observed.locations.insert(
NodeId(3),
ObservedStateLocation {
conf: Some(LocationConfig {
mode: LocationConfigMode::AttachedMulti,
generation: Some(2),
secondary_conf: None,
shard_number: tenant_state.shard.number.0,
shard_count: tenant_state.shard.count.literal(),
shard_stripe_size: tenant_state.shard.stripe_size.0,
tenant_conf: TenantConfig::default(),
}),
},
);

tenant_state.observed.locations.insert(
NodeId(2),
ObservedStateLocation {
conf: Some(LocationConfig {
mode: LocationConfigMode::AttachedStale,
generation: Some(1),
secondary_conf: None,
shard_number: tenant_state.shard.number.0,
shard_count: tenant_state.shard.count.literal(),
shard_stripe_size: tenant_state.shard.stripe_size.0,
tenant_conf: TenantConfig::default(),
}),
},
);

tenant_state.intent_from_observed(&mut scheduler);

// The highest generationed attached location gets used as attached
assert_eq!(tenant_state.intent.attached, Some(NodeId(3)));
// Other locations get used as secondary
assert_eq!(tenant_state.intent.secondary, vec![NodeId(2)]);

scheduler.consistency_check(nodes.values(), [&tenant_state].into_iter())?;

tenant_state.intent.clear(&mut scheduler);
Ok(())
}
}