Skip to content

Commit

Permalink
Merge pull request #5528 from oasisprotocol/kostko/feature/rt-load-ba…
Browse files Browse the repository at this point in the history
…lancing

go/runtime: Add load-balancer runtime provisioner
  • Loading branch information
kostko committed Jan 22, 2024
2 parents 4104c5e + 8782933 commit af2692c
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 110 deletions.
1 change: 1 addition & 0 deletions .changelog/5528.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/runtime: Add load-balancer runtime provisioner
2 changes: 2 additions & 0 deletions docs/oasis-node/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ oasis_worker_aborted_batch_count | Counter | Number of aborted batches. | runtim
oasis_worker_batch_processing_time | Summary | Time it takes for a batch to finalize (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_batch_runtime_processing_time | Summary | Time it takes for a batch to be processed by the runtime (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_batch_size | Summary | Number of transactions in a batch. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
oasis_worker_client_lb_healthy_instance_count | Gauge | Number of healthy instances in the load balancer. | runtime | [runtime/host/loadbalance](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/host/loadbalance/metrics.go)
oasis_worker_client_lb_requests | Counter | Number of requests processed by the given load balancer instance. | runtime, lb_instance | [runtime/host/loadbalance](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/host/loadbalance/metrics.go)
oasis_worker_epoch_number | Gauge | Current epoch number as seen by the worker. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_epoch_transition_count | Counter | Number of epoch transitions. | runtime | [worker/common/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/common/committee/node.go)
oasis_worker_execution_discrepancy_detected_count | Counter | Number of detected execute discrepancies. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/metrics.go)
Expand Down
19 changes: 18 additions & 1 deletion go/runtime/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type Config struct {
// AttestInterval is the interval for periodic runtime re-attestation. If not specified
// a default will be used.
AttestInterval time.Duration `yaml:"attest_interval,omitempty"`

// LoadBalancer is the load balancer configuration.
LoadBalancer LoadBalancerConfig `yaml:"load_balancer,omitempty"`
}

// PruneConfig is the history pruner configuration structure.
Expand All @@ -106,6 +109,13 @@ type PruneConfig struct {
NumKept uint64 `yaml:"num_kept"`
}

// LoadBalancerConfig is the load balancer configuration.
type LoadBalancerConfig struct {
// NumInstances is the number of runtime instances to provision for load-balancing. Setting it
// to zero (default) or one disables load balancing.
NumInstances uint64 `yaml:"num_instances,omitempty"`
}

// Validate validates the configuration settings.
func (c *Config) Validate() error {
switch c.Provisioner {
Expand Down Expand Up @@ -140,6 +150,10 @@ func (c *Config) Validate() error {
return fmt.Errorf("unknown runtime history pruner strategy: %s", c.Environment)
}

if c.LoadBalancer.NumInstances > 128 {
return fmt.Errorf("cannot specify more than 128 instances for load balancing")
}

return nil
}

Expand All @@ -161,10 +175,13 @@ func DefaultConfig() Config {
TxPool: tpConfig.Config{
MaxPoolSize: 50_000,
MaxLastSeenCacheSize: 100_000,
MaxCheckTxBatchSize: 1000,
MaxCheckTxBatchSize: 128,
RecheckInterval: 5,
RepublishInterval: 60 * time.Second,
},
PreWarmEpochs: 3,
LoadBalancer: LoadBalancerConfig{
NumInstances: 0,
},
}
}
6 changes: 3 additions & 3 deletions go/runtime/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ type Runtime interface {
UpdateCapabilityTEE()

// WatchEvents subscribes to runtime status events.
WatchEvents(ctx context.Context) (<-chan *Event, pubsub.ClosableSubscription, error)
WatchEvents() (<-chan *Event, pubsub.ClosableSubscription)

// Start attempts to start the runtime.
Start() error
// Start starts the runtime.
Start()

// Abort attempts to abort a runtime so that it will be ready to service new requests.
// In case abort fails or force flag is set, the runtime will be restarted.
Expand Down
300 changes: 300 additions & 0 deletions go/runtime/host/loadbalance/loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// Package loadbalance implements a runtime provisioner that internally load-balances requests among
// multiple runtime instances. This is especially useful on client nodes handling queries.
package loadbalance

import (
"context"
"errors"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/runtime/host"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

// Config is the configuration for the runtime load balancer.
type Config struct {
// NumInstances is the number of runtime instances to provision.
NumInstances int
}

type lbRuntime struct {
id common.Namespace
instances []host.Runtime

l sync.Mutex
nextIdx int
healthyInstances map[int]struct{}

startOnce sync.Once
stopOnce sync.Once
stopCh chan struct{}

logger *logging.Logger
}

// Implements host.Runtime.
func (lb *lbRuntime) ID() common.Namespace {
return lb.id
}

// Implements host.Runtime.
func (lb *lbRuntime) GetInfo(ctx context.Context) (*protocol.RuntimeInfoResponse, error) {
return lb.instances[0].GetInfo(ctx)
}

// Implements host.Runtime.
func (lb *lbRuntime) GetCapabilityTEE() (*node.CapabilityTEE, error) {
// TODO: This won't work when registration of all client runtimes is required.
return lb.instances[0].GetCapabilityTEE()
}

// shouldPropagateToAll checks whether the given runtime request should be propagated to all
// instances.
func shouldPropagateToAll(body *protocol.Body) bool {
switch {
case body.RuntimeConsensusSyncRequest != nil:
// Consensus view of all instances should be up to date as otherwise signed attestations
// will be stale, resulting in them being rejected.
return true
case body.RuntimeKeyManagerPolicyUpdateRequest != nil,
body.RuntimeKeyManagerStatusUpdateRequest != nil,
body.RuntimeKeyManagerQuotePolicyUpdateRequest != nil:
// Key manager updates should be propagated.
return true
default:
return false
}
}

// Implements host.Runtime.
func (lb *lbRuntime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body, error) {
switch {
case shouldPropagateToAll(body):
// Propagate call to all instances.
type result struct {
rsp *protocol.Body
err error
}
resCh := make(chan *result)
for _, rt := range lb.instances {
rt := rt // Make sure goroutine below operates on the right instance.

go func() {
rsp, err := rt.Call(ctx, body)
resCh <- &result{
rsp: rsp,
err: err,
}
}()
}

var (
anyErr error
rsp *protocol.Body
)
for range lb.instances {
res := <-resCh
// Return the response of the instance that finished last. Note that currently all of
// the propagated methods return a `protocol.Empty` response so this does not matter.
rsp = res.rsp
anyErr = errors.Join(anyErr, res.err)
}
if anyErr != nil {
return nil, anyErr
}
return rsp, nil
case body.RuntimeQueryRequest != nil, body.RuntimeCheckTxBatchRequest != nil:
// Load-balance queries.
idx, err := lb.selectInstance()
if err != nil {
return nil, err
}

lbRequestCount.With(prometheus.Labels{
"runtime": lb.id.String(),
"lb_instance": fmt.Sprintf("%d", idx),
}).Inc()

return lb.instances[idx].Call(ctx, body)
default:
// Propagate only to the first instance.
return lb.instances[0].Call(ctx, body)
}
}

func (lb *lbRuntime) selectInstance() (int, error) {
lb.l.Lock()
defer lb.l.Unlock()

for attempt := 0; attempt < len(lb.instances); attempt++ {
idx := lb.nextIdx
lb.nextIdx = (lb.nextIdx + 1) % len(lb.instances)

if _, healthy := lb.healthyInstances[idx]; healthy {
return idx, nil
}
}

return 0, fmt.Errorf("host/loadbalance: no healthy instances available")
}

// Implements host.Runtime.
func (lb *lbRuntime) UpdateCapabilityTEE() {
for _, rt := range lb.instances {
rt.UpdateCapabilityTEE()
}
}

// Implements host.Runtime.
func (lb *lbRuntime) WatchEvents() (<-chan *host.Event, pubsub.ClosableSubscription) {
return lb.instances[0].WatchEvents()
}

// Implements host.Runtime.
func (lb *lbRuntime) Start() {
lb.startOnce.Do(func() {
for idx, rt := range lb.instances {
idx := idx
rt := rt // Make sure goroutine below operates on the right instance.

// Subscribe to runtime events before starting runtime to make sure we don't miss the
// started event.
evCh, sub := rt.WatchEvents()

// Start a goroutine to monitor whether an instance is healthy.
go func() {
defer sub.Close()

for {
select {
case ev := <-evCh:
switch {
case ev.Started != nil:
// Mark instance as available.
lb.logger.Info("instance is available",
"instance", idx,
)

lb.l.Lock()
lb.healthyInstances[idx] = struct{}{}
lb.l.Unlock()
case ev.FailedToStart != nil, ev.Stopped != nil:
// Mark instance as failed.
lb.logger.Warn("instance is no longer available",
"instance", idx,
)

lb.l.Lock()
delete(lb.healthyInstances, idx)
lb.l.Unlock()
default:
}

// Update healthy instance count metrics.
lb.l.Lock()
healthyInstanceCount := len(lb.healthyInstances)
lb.l.Unlock()

lbHealthyInstanceCount.With(prometheus.Labels{
"runtime": lb.id.String(),
}).Set(float64(healthyInstanceCount))
case <-lb.stopCh:
return
}
}
}()

rt.Start()
}
})
}

// Implements host.Runtime.
func (lb *lbRuntime) Abort(ctx context.Context, force bool) error {
// We don't know which instance to abort, so we abort all instances.
errCh := make(chan error)
for _, rt := range lb.instances {
rt := rt // Make sure goroutine below operates on the right instance.

go func() {
errCh <- rt.Abort(ctx, force)
}()
}

var anyErr error
for range lb.instances {
err := <-errCh
anyErr = errors.Join(anyErr, err)
}
return anyErr
}

// Implements host.Runtime.
func (lb *lbRuntime) Stop() {
lb.stopOnce.Do(func() {
close(lb.stopCh)

for _, rt := range lb.instances {
rt.Stop()
}
})
}

type lbProvisioner struct {
inner host.Provisioner
cfg Config
}

// Implements host.Provisioner.
func (lb *lbProvisioner) NewRuntime(cfg host.Config) (host.Runtime, error) {
if lb.cfg.NumInstances < 2 {
// This should never happen as the provisioner constructor made sure, but just to be safe.
return nil, fmt.Errorf("host/loadbalance: number of instances must be at least two")
}

// Use the inner provisioner to provision multiple runtimes.
var instances []host.Runtime
for i := 0; i < lb.cfg.NumInstances; i++ {
rt, err := lb.inner.NewRuntime(cfg)
if err != nil {
return nil, fmt.Errorf("host/loadbalance: failed to provision instance %d: %w", i, err)
}

instances = append(instances, rt)
}

return &lbRuntime{
id: cfg.Bundle.Manifest.ID,
instances: instances,
healthyInstances: make(map[int]struct{}),
stopCh: make(chan struct{}),
logger: logging.GetLogger("runtime/host/loadbalance").With("runtime_id", cfg.Bundle.Manifest.ID),
}, nil
}

// Implements host.Provisioner.
func (lb *lbProvisioner) Name() string {
return fmt.Sprintf("load-balancer[%d]/%s", lb.cfg.NumInstances, lb.inner.Name())
}

// New creates a load-balancing runtime provisioner.
func New(inner host.Provisioner, cfg Config) host.Provisioner {
if cfg.NumInstances < 2 {
// If there is only a single instance configured just return the inner provisioner.
return inner
}

initMetrics()

return &lbProvisioner{
inner: inner,
cfg: cfg,
}
}

0 comments on commit af2692c

Please sign in to comment.