Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move admin and meta component into cluster controller role #1180

Merged
merged 9 commits into from
Feb 15, 2024

Conversation

tillrohrmann
Copy link
Contributor

This PR moves the Admin and Meta components from the worker role into the cluster controller role. For this to happen, we needed to do the following things:

  1. Expose invocation control API on Worker grpc service and modify Admin service to use it for killing/cancelling of invocations
  2. Add Schema grpc service which allows to fetch schema information and update worker role to periodically fetch schema information
  3. Update subscriptions based on the latest schema information (start new and stop old subscriptions)
  4. Expose storage query API on Worker grpc service and update Admin to use this API to query the storage. The query results are sent as arrow-flight encoded RecordBatches.

Currently, the grpc address is hard-coded to 127.0.0.1:5122 which prevents the system from being run as separate processes.

@tillrohrmann
Copy link
Contributor Author

e2e test are failing: https://github.com/restatedev/restate/actions/runs/7888143370/job/21525412929?pr=1180. Investigating what I broke.

@tillrohrmann
Copy link
Contributor Author

The e2e tests were broken because of restatedev/e2e#265. They should be fixed now.

@tillrohrmann
Copy link
Contributor Author

Still failing. Investigating what is going wrong with the e2e tests.

@tillrohrmann
Copy link
Contributor Author

I suspect that there is race condition between the e2e test invoking a service and the schema information having propagated to the ingress.

Copy link
Contributor

@AhmedSoliman AhmedSoliman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @tillrohrmann. The changes look good to me, but I'd love to get clarification on the couple of questions about potential data-loss.

invocation_termination: InvocationTermination,
) -> Result<(), Error> {
let invocation_termination =
bincode::serde::encode_to_vec(invocation_termination, bincode::config::standard())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a transitional step or do you think we will continue with the nested serialization for the long-run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of it as a transitional step to simplify things for the time being. I guess once we introduce the Admin grpc service we will specify the InvocationTermination request properly and then could reuse it here.

crates/node/src/roles/cluster_controller.rs Show resolved Hide resolved
crates/node/src/roles/worker.rs Outdated Show resolved Hide resolved
crates/node/src/roles/worker.rs Outdated Show resolved Hide resolved
.map_err(WorkerRoleError::Worker),
);
component_set.spawn(
Self::reload_schemas(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed we will need to perform a blocking initial schema fetch before starting the worker. Do we have a risk of partition processors failing/ignoring invocations because they can't see the schema on startup, or if it takes time? I'm worried about the risk of data loss if PP drops an invocation because it thinks the service have been removed (vs. that we are running with old Meta information).

Copy link
Contributor Author

@tillrohrmann tillrohrmann Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PP won't drop any invocations it does not know about. What can happen is that the ingress rejects for some time new invocations until it learns about the updated schema information. What can also happen, and this is probably not so nice, is that the PP will try to invoke some invocations for which it does not know the deployment information. This would then lead to a couple of retries which causes some noise in the logs. To prevent this from happening trying to fetch the schema before starting the worker is probably a good idea. This won't solve the problem in all cases but in most.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a fragile design assumption that might not stand the test of time. The good news is that it won't cause problems at the moment, but I think we can have a more robust approach with the following design:

  • Schema is versioned.
  • When schema is updated on an admin node, the admin node can inform all nodes with "ingress" role with the update. This is either a notification of the higher version number, or a full push of the new schema object. Ideally, we finish up the ingress version notification before we respond to user-requested schema changes, this is to achieve monotonic read.
  • Ingress writes its last-seen schema version in all messages going through bifrost or in RPC to workers. A worker processing an invocation message will compare the schema version, if it's running with an older version, it pulls the new schema before moving forward. If the worker has the same schema as the incoming message (whether from another PP or ingress) it can more confidently reject invocations if we need to do that in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending the schema information version with the invocation messages is a good idea to achieve monotonic reads :-)

}
}
Err(err) => {
debug!("Failed fetching schema information: {err}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a debug in the single-node setup? Why do we expect failures to happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the single-node setup this should indeed not happen. It is a bit of a preparational step for allowing the cluster controller and the worker role to be run as two separate processes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather make this scream loud if we are confident that it shouldn't happen :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By allowing to separate the admin from the worker role which will be the next step I intend to implement, it will be possible that the worker cannot reach the admin.

crates/node-services/proto/schema.proto Outdated Show resolved Hide resolved
crates/node-services/proto/worker.proto Outdated Show resolved Hide resolved

package dev.restate.schema;

service Schema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this can be the seed for the Metadata service? I also think we should postfix all grpc services with something to make it distinct from potential message types, perhaps with Svc at the end or something.

Note: The postfix comment isn't related to this particular PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought so too concerning the Metadata service. Should we start right away calling it MetadataSvc instead? Given that these services are currently only internally used, it is probably also possible to rename them later.

Re postfix: +1 will change it.

crates/node/src/server/service.rs Show resolved Hide resolved
AhmedSoliman added a commit that referenced this pull request Feb 14, 2024
This introduces a central system to manage long-running and background restate async tasks.
The core of this proposal is to help us lean more towards spawning self-contained tasks that
are addressable, trackable, and cancellable, than current deep future poll trees.

It also allows nice possibilities like:
- Limited structured concurrency (no auto-waiting for children though)
- Graceful cancellations/shutdown reduces the risk of cancellation-unsafe drops
- Potentially less memory, deep nested future state machines become shallower.
- A single place where we can auto-propagate tracing context, flag tasks by priority, schedule tasks on different runtime, provide observability of what kind of tasks are running, etc.
- Scoped tasks allow scoped cancellation. (cooperatively cancel all tasks for a partition id, or a specific tenant in the future, or be specific and filter specific kinds of tasks)
- Limit concurrency of certain tasks by kind, partition, or tenant, etc.
- Distributing tasks among multiple tokio runtimes based on the task kind
- Support for different abort-policy based on the task kind.

A the moment, I only migrated a small pieces of our code to this system to make the merging with #1180 easier.
Once #1180 is merged, I'll move the rest of our services and bifrost to use it.
@AhmedSoliman AhmedSoliman mentioned this pull request Feb 14, 2024
@tillrohrmann
Copy link
Contributor Author

The current CI run fails because of conflicting changes on main. Will rebase onto the latest main to resolve the problem.

Copy link
Contributor

@AhmedSoliman AhmedSoliman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

AhmedSoliman added a commit that referenced this pull request Feb 15, 2024
This introduces a central system to manage long-running and background restate async tasks.
The core of this proposal is to help us lean more towards spawning self-contained tasks that
are addressable, trackable, and cancellable, than current deep future poll trees.

It also allows nice possibilities like:
- Limited structured concurrency (no auto-waiting for children though)
- Graceful cancellations/shutdown reduces the risk of cancellation-unsafe drops
- Potentially less memory, deep nested future state machines become shallower.
- A single place where we can auto-propagate tracing context, flag tasks by priority, schedule tasks on different runtime, provide observability of what kind of tasks are running, etc.
- Scoped tasks allow scoped cancellation. (cooperatively cancel all tasks for a partition id, or a specific tenant in the future, or be specific and filter specific kinds of tasks)
- Limit concurrency of certain tasks by kind, partition, or tenant, etc.
- Distributing tasks among multiple tokio runtimes based on the task kind
- Support for different abort-policy based on the task kind.

A the moment, I only migrated a small pieces of our code to this system to make the merging with #1180 easier.
Once #1180 is merged, I'll move the rest of our services and bifrost to use it.
As a first step to move the Meta service into the cluster controller role,
this commit removes the restate_worker_api::Handle dependency from this
component.
This commit decouples the meta and admin components from the worker and
moves them over to the cluster controller role. The way the worker learns
about schema information changes is by periodically asking the schema
service for updates. At the moment, every schema information fetch request
will send the whole list of schema update commands over.

Currently, it is not possible to query the storage via the Admin service.
This will be fixed with a follow-up commit.
The QueryStorage grpc method allows to query the storage of a worker.
It returns the result as arrow-flight encoded RecordBatches. The admin
service uses this service to forward queries it receives on /query to
create a response for them.
This commit renames the following grpc services:

* Worker -> WorkerSvc
* ClusterController -> ClusterControllerSvc
* NodeCtrl -> NodeCtrlSvc
This commit enables the admin service to push schema updates to the worker
in case of schema changes.
@tillrohrmann tillrohrmann merged commit 429b74a into restatedev:main Feb 15, 2024
@tillrohrmann tillrohrmann deleted the separate-admin-from-worker branch February 15, 2024 15:14
AhmedSoliman added a commit that referenced this pull request Feb 15, 2024
This introduces a central system to manage long-running and background restate async tasks.
The core of this proposal is to help us lean more towards spawning self-contained tasks that
are addressable, trackable, and cancellable, than current deep future poll trees.

It also allows nice possibilities like:
- Limited structured concurrency (no auto-waiting for children though)
- Graceful cancellations/shutdown reduces the risk of cancellation-unsafe drops
- Potentially less memory, deep nested future state machines become shallower.
- A single place where we can auto-propagate tracing context, flag tasks by priority, schedule tasks on different runtime, provide observability of what kind of tasks are running, etc.
- Scoped tasks allow scoped cancellation. (cooperatively cancel all tasks for a partition id, or a specific tenant in the future, or be specific and filter specific kinds of tasks)
- Limit concurrency of certain tasks by kind, partition, or tenant, etc.
- Distributing tasks among multiple tokio runtimes based on the task kind
- Support for different abort-policy based on the task kind.

A the moment, I only migrated a small pieces of our code to this system to make the merging with #1180 easier.
Once #1180 is merged, I'll move the rest of our services and bifrost to use it.
AhmedSoliman added a commit that referenced this pull request Feb 16, 2024
This introduces a central system to manage long-running and background restate async tasks.
The core of this proposal is to help us lean more towards spawning self-contained tasks that
are addressable, trackable, and cancellable, than current deep future poll trees.

It also allows nice possibilities like:
- Limited structured concurrency (no auto-waiting for children though)
- Graceful cancellations/shutdown reduces the risk of cancellation-unsafe drops
- Potentially less memory, deep nested future state machines become shallower.
- A single place where we can auto-propagate tracing context, flag tasks by priority, schedule tasks on different runtime, provide observability of what kind of tasks are running, etc.
- Scoped tasks allow scoped cancellation. (cooperatively cancel all tasks for a partition id, or a specific tenant in the future, or be specific and filter specific kinds of tasks)
- Limit concurrency of certain tasks by kind, partition, or tenant, etc.
- Distributing tasks among multiple tokio runtimes based on the task kind
- Support for different abort-policy based on the task kind.

A the moment, I only migrated a small pieces of our code to this system to make the merging with #1180 easier.
Once #1180 is merged, I'll move the rest of our services and bifrost to use it.
AhmedSoliman added a commit that referenced this pull request Feb 16, 2024
This introduces a central system to manage long-running and background restate async tasks.
The core of this proposal is to help us lean more towards spawning self-contained tasks that
are addressable, trackable, and cancellable, than current deep future poll trees.

It also allows nice possibilities like:
- Limited structured concurrency (no auto-waiting for children though)
- Graceful cancellations/shutdown reduces the risk of cancellation-unsafe drops
- Potentially less memory, deep nested future state machines become shallower.
- A single place where we can auto-propagate tracing context, flag tasks by priority, schedule tasks on different runtime, provide observability of what kind of tasks are running, etc.
- Scoped tasks allow scoped cancellation. (cooperatively cancel all tasks for a partition id, or a specific tenant in the future, or be specific and filter specific kinds of tasks)
- Limit concurrency of certain tasks by kind, partition, or tenant, etc.
- Distributing tasks among multiple tokio runtimes based on the task kind
- Support for different abort-policy based on the task kind.

A the moment, I only migrated a small pieces of our code to this system to make the merging with #1180 easier.
Once #1180 is merged, I'll move the rest of our services and bifrost to use it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants