Skip to content

Commit

Permalink
Merge pull request #13408 from SimonRichardson/2.9-develop-21-10-11
Browse files Browse the repository at this point in the history
#13408

Usual conflicts, but some tricky ones around metrics.

33a867b (upstream/2.9, origin/2.9, 2.9) Merge pull request #13356 from SimonRichardson/raft-client-wireup
8712c9a (manadart/2.9, achilleasa/2.9) Merge pull request #13391 from hmlanigan/backport-metrics
9dbed19 Merge pull request #13396 from SimonRichardson/backport-mongo-4.4-fixes
7283465 Merge pull request #13389 from SimonRichardson/batching-fsm
eaa218b Merge pull request #13403 from wallyworld/better-k8s-model-message
90e5368 Merge pull request #13400 from ycliuhw/enhance-ghcr
f970413 Merge pull request #13394 from tlm/aws-instance-profile
526ee49 Merge pull request #13401 from jujubot/increment-to-2.9.17
b84c559 (tag: juju-2.9.16) Merge pull request #13399 from wallyworld/mongo-version-parsing
9983864 Merge pull request #13393 from hpidcock/fix-caas-iaas-tools
3d09733 Merge pull request #13386 from ycliuhw/feature/aks

## Conflicts

CONFLICT (content): Merge conflict in version/version.go
CONFLICT (content): Merge conflict in state/enableha.go
CONFLICT (modify/delete): state/backups/restore_test.go deleted in HEAD and modified in 33a867b. Version 33a867b of state/backups/restore_test.go left in tree.
CONFLICT (content): Merge conflict in snap/snapcraft.yaml
CONFLICT (content): Merge conflict in scripts/win-installer/setup.iss
CONFLICT (content): Merge conflict in mongo/mongodfinder_test.go
CONFLICT (content): Merge conflict in mongo/mongodfinder.go
CONFLICT (content): Merge conflict in go.sum
CONFLICT (content): Merge conflict in go.mod
CONFLICT (content): Merge conflict in feature/flags.go
CONFLICT (content): Merge conflict in core/charm/repository/charmhub_test.go
CONFLICT (content): Merge conflict in core/charm/repository/charmhub.go
CONFLICT (content): Merge conflict in cmd/jujud/agent/agenttest/agent.go
CONFLICT (content): Merge conflict in charmhub/transport/refresh.go
CONFLICT (content): Merge conflict in charmhub/refresh_test.go
CONFLICT (content): Merge conflict in charmhub/refresh.go
CONFLICT (content): Merge conflict in apiserver/facades/controller/charmrevisionupdater/updater_test.go
CONFLICT (content): Merge conflict in apiserver/facades/controller/charmrevisionupdater/updater.go
CONFLICT (content): Merge conflict in apiserver/facades/controller/charmrevisionupdater/mocks/mocks.go
CONFLICT (content): Merge conflict in apiserver/facades/controller/charmrevisionupdater/interface.go
CONFLICT (content): Merge conflict in apiserver/facades/controller/charmrevisionupdater/charmhub.go
CONFLICT (content): Merge conflict in apiserver/facades/client/application/updateseries_mocks_test.go
CONFLICT (content): Merge conflict in apiserver/facades/client/application/application_test.go
CONFLICT (content): Merge conflict in apiserver/facades/client/application/application.go
  • Loading branch information
jujubot committed Oct 11, 2021
2 parents 236bfbc + 50a2281 commit 0a1c0b6
Show file tree
Hide file tree
Showing 84 changed files with 2,139 additions and 662 deletions.
197 changes: 114 additions & 83 deletions api/raftlease/client.go
Expand Up @@ -5,7 +5,6 @@ package raftlease

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -31,11 +30,13 @@ import (
type Logger interface {
Errorf(string, ...interface{})
Debugf(string, ...interface{})
Tracef(string, ...interface{})
}

// Remote defines an interface for managing remote connections for the client.
type Remote interface {
worker.Worker
ID() string
Address() string
SetAddress(string)
Request(context.Context, *raftlease.Command) error
Expand Down Expand Up @@ -77,6 +78,9 @@ func (config Config) Validate() error {
if config.Random == nil {
return errors.NotValidf("nil Random")
}
if config.Clock == nil {
return errors.NotValidf("nil Clock")
}
return nil
}

Expand Down Expand Up @@ -133,20 +137,6 @@ func NewClient(config Config) (*Client, error) {
return nil, errors.Trace(err)
}

// Wait for at least one server connection.
if err := client.initServers(); err != nil {
unsubscribe()
return nil, errors.Trace(err)
}

// Add all the remote servers to the catacomb.
for _, remote := range client.servers {
if err := client.catacomb.Add(remote); err != nil {
unsubscribe()
return nil, errors.Trace(err)
}
}

return client, nil
}

Expand All @@ -157,8 +147,12 @@ func (c *Client) Request(ctx context.Context, command *raftlease.Command) error

remote, err := c.selectRemote()
if err != nil {
// TODO (stickupkid): If we find no remotes, should we force an attempt
// of a connection?
// If we can't find a remote server for any reason, then return an
// ErrDropped. This will cause the lease manager to correctly retry.
if errors.IsNotFound(err) {
c.config.Logger.Errorf("Masking %q with lease.ErrDropped to allow for retries", err)
return lease.ErrDropped
}
return errors.Trace(err)
}

Expand All @@ -177,40 +171,10 @@ func (c *Client) Request(ctx context.Context, command *raftlease.Command) error
}

err := remote.Request(ctx, command)

// If the error is nil, we've done it successfully.
remote, err = c.handleRetryRequestError(command, remote, err)
if err == nil {
// We had a successful connection against that remote, set it to
// the lastKnownRemote.
c.mutex.Lock()
c.lastKnownRemote = remote
c.mutex.Unlock()

c.record(command.Operation, "success", start)
return nil
}

// If the remote is no longer the leader, go and attempt to get it from
// the error. If it's not in the error, just select one at random.
if apiservererrors.IsNotLeaderError(err) {
// Grab the underlying not leader error.
notLeaderError := errors.Cause(err).(*apiservererrors.NotLeaderError)

remote, err = c.selectRemoteFromError(remote.Address(), err)
if err == nil && remote != nil {
// If we've got an remote, then attempt the request again.
return errors.Annotatef(notLeaderError, "not the leader, trying again")
}
// If we're not the leader and we don't have a remote to select from
// just return back.
if notLeaderError.ServerAddress() == "" {
// The raft instance isn't clustered, we don't have a way
// forward, so send back a dropped error.
c.config.Logger.Errorf("No leader found and no cluster available, dropping command: %v", command)
return lease.ErrDropped
}
}

return errors.Trace(err)
},
IsFatalError: func(err error) bool {
Expand All @@ -224,10 +188,88 @@ func (c *Client) Request(ctx context.Context, command *raftlease.Command) error
Clock: c.config.Clock,
})

// If the retry has stopped, then we've been cancelled, so we need to tell
// the lease manager that we've timedout.
if retry.IsRetryStopped(err) {
return c.handleRequestError(command, start, err)
}

func (c *Client) handleRetryRequestError(command *raftlease.Command, remote Remote, err error) (Remote, error) {
// If the error is nil, we've done it successfully.
if err == nil {
// We had a successful connection against that remote, set it to
// the lastKnownRemote.
c.mutex.Lock()
c.lastKnownRemote = remote
c.mutex.Unlock()
return remote, nil
}

// If the remote is no longer the leader, go and attempt to get it from
// the error. If it's not in the error, just select one at random.
if apiservererrors.IsNotLeaderError(err) {
// Grab the underlying not leader error.
notLeaderError := errors.Cause(err).(*apiservererrors.NotLeaderError)

remote, err = c.selectRemoteFromError(remote.Address(), err)
if err == nil && remote != nil {
// If we've got an remote, then attempt the request again.
return remote, errors.Annotatef(notLeaderError, "not the leader, trying again")
}
// If we're not the leader and we don't have a remote to select from
// just return back.
if notLeaderError.ServerAddress() == "" {
// The raft instance isn't clustered, we don't have a way
// forward, so send back a dropped error.
c.config.Logger.Errorf("No leader found and no cluster available, dropping command: %v", command)
return remote, lease.ErrDropped
}
} else if apiservererrors.IsDeadlineExceededError(err) {
// Enqueuing into the queue just timed out, we should just
// log this error and try again if possible. The lease manager
// will know if a retry at that level is possible.
c.config.Logger.Errorf("Deadline exceeded enqueuing command.")
}

// If we can't find a remote, we should just return that the error was
// dropped.
if remote == nil {
return nil, lease.ErrDropped
}
return remote, errors.Trace(err)
}

func (c *Client) handleRequestError(command *raftlease.Command, start time.Time, err error) error {
if err == nil {
c.record(command.Operation, "success", start)
return nil
}

switch {
case lease.IsLeaseError(err):
// We want to see this when debugging the raft implementation, but not
// in daily running.
c.config.Logger.Tracef("Lease Error %q", err)

case retry.IsRetryStopped(err), retry.IsAttemptsExceeded(err):
// If the retry or attempt is exceeded, check to see if the underlying
// error is a lease error. If it, then just trace the error and
// propergate it up.
if underlyingErr := retry.LastError(err); lease.IsLeaseError(underlyingErr) {
c.config.Logger.Tracef("Lease Error %q", err)

// Ensure we expose the real error here.
err = underlyingErr
break
}

// If the retry has stopped or the number of attempts have been
// exceeded, we need to tell the lease manager that we've timedout.
c.config.Logger.Errorf("Masking %q with lease.ErrTimeout to allow for retries", err)
err = lease.ErrTimeout

case errors.IsNotFound(err):
// If we can't find a remote server for any reason, then return an
// ErrDropped. This will cause the lease manager to correctly retry.
c.config.Logger.Errorf("Masking %q with lease.ErrDropped to allow for retries", err)
err = lease.ErrDropped
}

c.record(command.Operation, "error", start)
Expand Down Expand Up @@ -314,48 +356,17 @@ func (c *Client) loop() error {
select {
case <-c.catacomb.Dying():
return c.catacomb.ErrDying()

case details := <-c.serverDetails:
// Get the primary address for each server ID.
addresses := c.gatherAddresses(details)
if len(addresses) == 0 {
// If there are no addresses, then nothing is routable. In this
// case, we'll continue to use the current addresses.
c.config.Logger.Errorf("no server addresses found, will continue to use old addresses")
continue
}

if err := c.ensureServers(addresses); err != nil {
return errors.Trace(err)
}
}
}
}

func (c *Client) initServers() error {
if len(c.config.APIInfo.Addrs) == 0 {
return errors.NotFoundf("api addresses")
}

for k, address := range c.config.APIInfo.Addrs {
info := *c.config.APIInfo
info.Addrs = []string{address}

remote := c.config.NewRemote(RemoteConfig{
APIInfo: &info,
Clock: c.config.Clock,
Logger: c.config.Logger,
})

// In reality it doesn't matter what these get called, as a later down
// the line they'll be replaced via the request from the
// apiserver.Details result.
key := fmt.Sprintf("%d", k)
c.servers[key] = remote
}

return nil
}

// gatherAddresses turns a series of data addresses into a map of server ids
// and internal address. If no internal address is found, then fallback to
// the external sorted addresses.
Expand Down Expand Up @@ -402,6 +413,7 @@ func (c *Client) ensureServers(addresses map[string]string) error {

remote := c.config.NewRemote(RemoteConfig{
APIInfo: &info,
ID: id,
Clock: c.config.Clock,
Logger: c.config.Logger,
})
Expand Down Expand Up @@ -435,11 +447,25 @@ func (c *Client) ensureServers(addresses map[string]string) error {
// remote Wait might have failed.
delete(c.servers, id)
}

// When we get the data from the APIDetails, sometimes the details may have
// no data in them or the lastKnownRemote has been removed from the server
// map. In this case we should drop the current lastKnownRemote and retry
// from a clean slate.
if len(c.servers) == 0 {
c.config.Logger.Tracef("resetting last known remote, no servers are available")
c.lastKnownRemote = nil
} else if c.lastKnownRemote != nil && !witnessed.Contains(c.lastKnownRemote.ID()) {
c.config.Logger.Tracef("resetting last known remote, server %q was removed from server list", c.lastKnownRemote.ID())
c.lastKnownRemote = nil
}

return nil
}

// RemoteConfig defines the configuration for creating a NewRemote.
type RemoteConfig struct {
ID string
APIInfo *api.Info
Clock clock.Clock
Logger Logger
Expand Down Expand Up @@ -469,6 +495,11 @@ type remote struct {
client RaftLeaseApplier
}

// ID returns the server ID associated with the remote.
func (r *remote) ID() string {
return r.config.ID
}

// Address returns the current remote server address.
func (r *remote) Address() string {
r.mutex.Lock()
Expand Down

0 comments on commit 0a1c0b6

Please sign in to comment.