Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Admission Controller framework for OpenSearch #8910

Open
bharath-techie opened this issue Jul 27, 2023 · 10 comments
Open

[RFC] Admission Controller framework for OpenSearch #8910

bharath-techie opened this issue Jul 27, 2023 · 10 comments
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request feedback needed Issue or PR needs feedback RFC Issues requesting major changes Roadmap:StAR (Stability, Availability, Resiliency) Project-wide roadmap label

Comments

@bharath-techie
Copy link
Contributor

bharath-techie commented Jul 27, 2023

Background

Currently, in opensearch, we have various rejection mechanisms for in-flight requests and new requests when cluster is overloaded.
We have backpressure mechanisms which are reactive to node duress conditions, circuit breakers in data nodes which rejects requests based on real time memory usage. We also have queue size based rejections. But there are different gaps in each of the above solutions.
We don’t have a state based admission control framework which is capable of rejecting incoming requests before we start the execution chain.

Existing throttling solutions

1. Search backpressure

Search backpressure in opensearch currently cancels resource intensive tasks when node is in duress. Coordinator node cancels the search tasks and data nodes cancel the shard tasks.

Challenges

2. Indexing backpressure

Opensearch has node level and shard level indexing backpressure that dynamically rejects indexing requests when the cluster is under strain.

Challenges

3. Circuit breaker

We have circuit breakers in data nodes which rejects requests based on real time memory usage. This is the last line of defence and it prevents nodes from further going down.

Challenges

  • 3.1 Circuit breaker limits cannot be configured per request type
  • 3.2 Circuit breaker currently only works based on memory usage and doesn’t support other resource parameters such as CPU, I/O etc

4. Queue sized based rejection

In OpenSearch, we currently have different queues for different operations such as search, indexing etc. And we reject new requests if the respective queue is full.

Challenges

  • 4.1 Queue size is not accurate representation of the load. For example, bulk requests can vary widely based on request size and number of documents to be indexed. Similarly search requests can vary widely on resource consumption based on the type of the query, eg: aggregation queries, large term queries etc.
  • 4.2 Queue size limits are intentionally not honoured during few framework level actions, such as ReplicationAction, to maintain consistency, and reduce the wasteful work done. For example, the TransportReplicationAction allows the Replica nodes to enqueue the Replication requests, even if the Write queue is full or beyond its defined limit.

Proposal

We propose to implement Admission control framework for OpenSearch which rejects incoming requests based on the resource utilization stats of the nodes. This will allow real-time, state-based admission control on the nodes.
We will build a new admission control core plugin which can help in intercepting and rejecting requests in rest layer and transport layer. We will extend ‘ResponseCollector’ service to maintain performance utilization of downstream nodes in coordinator node.

Goals and benefits

  1. Build extensible admission control framework which will be an opensearch core plugin / module , that can intercept requests in both rest layer and transport layer entry and reject requests when the cluster is overloaded.
  2. Build resource utilization / health status view of downstream nodes in coordinator nodes.
    • This allows us to fail fast the requests in coordinator when the associated target nodes are in stress.
    • This can help for cases where rejection has to be based on target nodes resource utilization such as I/O
    • This can help in optimizing routing decisions in search flow.
  3. Intelligently route search requests away from stressed nodes if possible by enhancing existing routing methods such as ARS. We also will adjust stats of the stressed nodes so that new requests are retried on stressed nodes periodically.
    • This will improve performance of the search requests as we route away from the stressed nodes.
  4. Framework should be configurable per request type and resource type. It should be extensible to any new resource type parameter in future based on which we’ll reject requests.
    • The rejections will be more fair as limits and rejections are based on each request type ( solves 3.1)
  5. The framework should provide a half open state to retry requests on the stressed nodes in time based / count based manner.
    • This will help for coordinator node rejections - to retry when stressed nodes are recovered.
  6. The framework will initially provide admission control based on CPU, JVM, IO and request size.
    • This can help extend existing rejection solutions by considering CPU, I/O etc. ( Solves 2.1, 3.2 )
    • I/O will be a completely new resource backpressure / admission control resource parameter which we will build from ground up

High level design

Admission control plugin

We can add a new admission control core opensearch module / plugin that extends ‘NetworkPlugin’ which intercepts the requests at rest layer and transport layer to perform rejections.

  1. The plugin can override ‘getRestHandlerWrapper’ to wrap incoming requests and perform admission control on them. We’ll initially perform AC only for search and indexing rest calls and can add more based on need.
  2. The plugin can override ‘getTransportInterceptors’ to add a new AC transport interceptor which intercepts requests based on the transport action name and perform admission control.

AdmissionControllerEntireFlow (2)

Admission control service

  1. This service will aid in rejection of incoming requests based on the resource utilization view of the nodes.
  2. We will utilizeresponse collector serviceto get the performance stats of the target nodes.
  3. We will maintain a state machine that helps rejecting requests when performance thresholds are breached with an option to retry requests periodically to check if target node is still in duress.
    1. CLOSED → This state will allow all the incoming requests ( Default )
    2. OPEN → This state will reject all the incoming requests.
    3. HALF_OPEN → This state will allow X percent ( configurable ) of the requests and reject other requests.

StateManagementAdmissionController (1)

Building resource utilization view

Response collector service

We’ll extend the existing ‘ResponseCollectorService’ to collect performance statistics such as CPU, JVM and IO of downstream nodes in coordinator node.
We also will collect node unresponsiveness / timeouts when a request fails which will be treated with more severity.
The coordinator can use this service at any time to get the resource utilization of the downstream nodes.

Local node resource monitoring

We will reuse node stats monitors such as process, jvm, fs which already monitors node resources at 1 second interval.

Track the resource utilization of the downstream nodes

We will enhance the search and indexing flows to get the downstream node performance stats.

Approach 1 - Use the thread context to get the required stats from downstream nodes

  1. For every indexing / search request we will add the performance stats to the thread context response headers on all the nodes (primary&replica) where the request is processed.
  2. Once the request is completed we will get the perf stats from the thread context and update these stats in response collector service in the coordinator node. Post that, we will filter these perf stats from the response headers before we return the response to the client.

Pros

This approach has no regression / backward compatibility risks as we don’t alter any schema

Risks

We need to check if there are any security implications in carrying perf stats as part of threadcontext

Approach 2 - Schema change

Search flow

  1. Enhance ‘QuerySearchResult’ schema to get target nodes resource utilization stats
  2. We already get queue size, service time etc as part of ‘QuerySearchResult’ and hence it’s a good fit to add additional performance stats of the node.

Indexing flow

  1. Enhance the ‘BulkShardResponse’ to return the target nodes resource utilization stats.

Risks

  1. Currently ‘BulkShardResponse’ schema doesn’t have any information of any perf parameters of the target nodes. We have to make changes in serialize / deserialize to hide the perf stats info from the user.

Other approaches considered

We can enhance follower check / leader check APIs to propagate performance stats of the nodes to all other nodes.

Cons

This builds a dependency on cluster manager and might have an impact on cluster manager node’s performance.
These health checks are very critical , and so any regression will be quite problematic

Search flow enhancements - #8913
Indexing flow enhancements - #8911

Co-authored by @ajaymovva

@Bukhtawar
Copy link
Collaborator

Tagging @jainankitk @reta @sohami @dblock

@reta
Copy link
Collaborator

reta commented Aug 1, 2023

💯 see the usefulness of such a feature, thanks @bharath-techie, at high level it looks great

We’ll extend the existing ‘ResponseCollectorService’ to collect performance statistics such as CPU, JVM and IO of downstream nodes in coordinator node.

This looks more like reactive and not proactive approach. Could we collect the metric from the nodes periodically and base admission decisions on that? (something like adaptive load balancing [1] as an example). The issues I have seen in the past is one single query could bring all affected nodes into endless GC cycles but it seems like we won't be able to detect that till the responses from the nodes are received.

[1] https://doc.akka.io/docs/akka/2.8.2/cluster-metrics.html#adaptive-load-balancing

We propose to implement Admission control framework for OpenSearch which rejects incoming requests based on the resource utilization stats of the nodes.

So we have this new types of nodes, the extensions nodes (more generally, extensions as separate processes), and these guys could significantly impact search or indexing if are part of the relevant processing pipeline. It might be too much to ask from extension developers to provide resource utilization (can we back it in to the sdk?) but I believe the extensions nodes should be factored into admission controller in some way (track latency?).

What do you think?

@mgodwan
Copy link
Member

mgodwan commented Aug 1, 2023

Thanks @bharath-techie for the proposal. This should help with increasing the overall resiliency of the OpenSearch cluster.

  • Since the proposal is to introduce checks ad Rest Layer as well as Transport layer, can you document the behavior for various modes where requests may fail partially? e.g. what happens if indexing happens successfully on primary but fails while sending to replica due Admission control.

  • Will this framework apply for operator triggered background operations like re-index, update by query, etc which result in calling Bulk/Search internally?

  • Since the proposal suggests applying Admission control at client side as well in transport layer, is there a possibility that coordinating node may end up not being able to update the downstream perf-stats if the current view of stats on coordinating is not allowing us to send the indexing/search request (which in turn is what we may rely to update downstream node's perf stats). Instead, can we instead evaluate a periodic check mechanism (a mechanism like periodic ConnectionChecker)?

@bharath-techie
Copy link
Contributor Author

This looks more like reactive and not proactive approach. Could we collect the metric from the nodes periodically and base admission decisions on that? (something like adaptive load balancing [1] as an example). The issues I have seen in the past is one single query could bring all affected nodes into endless GC cycles but it seems like we won't be able to detect that till the responses from the nodes are received.

Yeah agreed on it being a bit reactive , we evaluated making separate calls periodically to downstream nodes as well - but we thought that we can live with the first search query to pass through AC framework if we don't have data and then we can rely on existing search backpressure to cancel such resource intensive queries in the node, let me know your thoughts on this.

I believe the extensions nodes should be factored into admission controller in some way (track latency?)

Yeah since we are planning to extend ResponseCollectorService, we can see how to utilize/extend existing data such as 'ResponseTime', 'ServiceTime' in such cases.

@bharath-techie
Copy link
Contributor Author

bharath-techie commented Aug 1, 2023

Since the proposal is to introduce checks ad Rest Layer as well as Transport layer, can you document the behavior for various modes where requests may fail partially? e.g. what happens if indexing happens successfully on primary but fails while sending to replica due Admission control.

Some of this is covered in indexing AC doc - please refer #8911 cc : @ajaymovva

Will this framework apply for operator triggered background operations like re-index, update by query, etc which result in calling Bulk/Search internally?

We are planning to add support based on TransportActionNames ( so we can use prefix for any search / index actions if needed ) in data nodes or any RestEndPoints in case of coordinator.

Instead, can we instead evaluate a periodic check mechanism (a mechanism like periodic ConnectionChecker)?

We have a half open state throughout - Ac framework will allow requests periodically ( or request count based ) to check if target nodes are still in stress - which should solve this to some extent.
But agreed, we can also evaluate a periodic check mechanism or a separate API which can get stats + health status from downstream nodes.

@sohami
Copy link
Collaborator

sohami commented Aug 2, 2023

Thanks @bharath-techie for the proposal. This looks great.

Build resource utilization / health status view of downstream nodes in coordinator nodes. This allows us to fail fast the requests in coordinator when the associated target nodes are in stress.

Intelligently route search requests away from stressed nodes if possible by enhancing existing routing methods such as ARS. We also will adjust stats of the stressed nodes so that new requests are retried on stressed nodes periodically.

I am thinking if each node takes the local decision based on its own resource view then that should be good for the admission control decision at the node level. Building the downstream nodes view of resource utilization may not be needed for Admission Control mechanism. Even if we have that, it may be out of sync with the downstream node current utilization and probably may not be very accurate to make AC decisions ? Note: I am assuming that admission control will come in post the routing decision is made to either fail or admit the request

However, it can be useful for routing decisions and will be relevant to certain request types. For example: In case of search (it can help with ranking the shard replicas) whereas for others like indexing it doesn't necessarily matter since we need to route the request to all the shards. Also with SegRep model it may not be needed for indexing request.

Approach 1 - Use the thread context to get the required stats from downstream nodes

  • Similar to feedback from @reta and @mgodwan, this is also dependent on the node frequently communicating with other nodes to have their updated view of resource utilization. Probably we may need some pro-active mechanism as well to update the downstream resource utilization in cases when a node is not interacting with other node very often (like idle timeout behavior). So seems like we will need a hybrid approach such that we don't incur the overhead of polling by default (1 to many) but do it on-demand basis.
  • Have we looked into the distributed tracing framework and see if this can be used to build the downstream nodes view ?

@Gaganjuneja
Copy link
Contributor

@sohami I don't think that the distributed framework can be used to build the node view since this is also reactive and never collects back the state from the downstream node. (Though we are talking about this in this proposal #8918 if traces can be communicated back to the coordinator node.)
@bharath-techie Did we also explore the health check framework like leader/follower check to build this state as this communication between nodes is already in place?

@bharath-techie
Copy link
Contributor Author

Thanks for the comments @sohami . Yeah the overhead of polling (all node to all node communication) is one of the reasons we didn't go for perioidic api / checks. It doesn't scale well with the size of clusters.

Tradeoff is that rejections are best effort basis in coordinator.
Coordinator will reject requests if it has recent data of downstream nodes, otherwise we need to rely on data node rejections.

We can look into the hybrid approach incrementally, we can make the decision based on whether there is a lack of accuracy during benchmarks.

@bharath-techie
Copy link
Contributor Author

bharath-techie commented Aug 2, 2023

Did we also explore the health check framework

Yes 'other approaches' section has the information on that.

@ajaymovva
Copy link
Contributor

indexing it doesn't necessarily matter since we need to route the request to all the shards

Yes, for indexing, we need to route the request for all the shards, but this approach will help to proactively reject the request at the coordinator before it lands on the target node. For every indexing request, we will evaluate the nodes (primary and replica) stats where the request will land based on the coordinator cluster state and take action at the coordinator.
As you said earlier, we can reject at node level, but if the indexing is successful at primary and replica nodes are in stress, and if we reject indexing action on replica nodes, then it results in shard failures. So If we have the stats at the coordinator for all the downstream nodes, we can reject them upfront at the coordinator only.

Also with SegRep model it may not be needed for indexing request.

For the plain SegRep, we still write to replica translogs, so this approach will be useful.
But with SegRep and Remote Store, we don't index the data into replicas, so we can take a call to skip the replica checks at the coordinator. Even here, we can still reduce the transport call from the coordinator to the primary node.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request feedback needed Issue or PR needs feedback RFC Issues requesting major changes Roadmap:StAR (Stability, Availability, Resiliency) Project-wide roadmap label
Projects
None yet
Development

No branches or pull requests

7 participants