Skip to content

Commit

Permalink
Merge pull request #54 from hexfusion/bump-v3.4.9-final-4.5
Browse files Browse the repository at this point in the history
Bug 1859201: bump etcd v3.4.9
  • Loading branch information
openshift-merge-robot committed Jul 28, 2020
2 parents e78ee7f + bdfc5da commit 9b0a7f7
Show file tree
Hide file tree
Showing 108 changed files with 3,681 additions and 3,330 deletions.
1 change: 1 addition & 0 deletions .words
Expand Up @@ -76,6 +76,7 @@ consistentIndex
todo
saveWALAndSnap

SHA
subconns
nop
SubConns
Expand Down
42 changes: 42 additions & 0 deletions auth/metrics.go
@@ -0,0 +1,42 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"github.com/prometheus/client_golang/prometheus"
"sync"
)

var (
currentAuthRevision = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "auth",
Name: "revision",
Help: "The current revision of auth store.",
},
func() float64 {
reportCurrentAuthRevMu.RLock()
defer reportCurrentAuthRevMu.RUnlock()
return reportCurrentAuthRev()
},
)
// overridden by auth store initialization
reportCurrentAuthRevMu sync.RWMutex
reportCurrentAuthRev = func() float64 { return 0 }
)

func init() {
prometheus.MustRegister(currentAuthRevision)
}
93 changes: 79 additions & 14 deletions auth/store.go
Expand Up @@ -94,6 +94,9 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}

// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)

// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
Expand Down Expand Up @@ -186,6 +189,9 @@ type AuthStore interface {

// HasRole checks that user has role
HasRole(user, role string) bool

// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}

type TokenProvider interface {
Expand All @@ -209,10 +215,14 @@ type authStore struct {

rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions

tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
}

func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
Expand Down Expand Up @@ -269,6 +279,7 @@ func (as *authStore) AuthDisable() {
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
tx.Unlock()
b.ForceCommit()

Expand Down Expand Up @@ -335,17 +346,27 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
return 0, ErrAuthNotEnabled
}

tx := as.be.BatchTx()
tx.Lock()
defer tx.Unlock()
var user *authpb.User
// CompareHashAndPassword is very expensive, so we use closures
// to avoid putting it in the critical section of the tx lock.
revision, err := func() (uint64, error) {
tx := as.be.BatchTx()
tx.Lock()
defer tx.Unlock()

user := getUser(as.lg, tx, username)
if user == nil {
return 0, ErrAuthFailed
}
user = getUser(as.lg, tx, username)
if user == nil {
return 0, ErrAuthFailed
}

if user.Options != nil && user.Options.NoPassword {
return 0, ErrAuthFailed
if user.Options != nil && user.Options.NoPassword {
return 0, ErrAuthFailed
}

return getRevision(tx), nil
}()
if err != nil {
return 0, err
}

if bcrypt.CompareHashAndPassword(user.Password, []byte(password)) != nil {
Expand All @@ -356,7 +377,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
}
return 0, ErrAuthFailed
}
return getRevision(tx), nil
return revision, nil
}

func (as *authStore) Recover(be backend.Backend) {
Expand Down Expand Up @@ -430,6 +451,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
putUser(as.lg, tx, newUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

if as.lg != nil {
as.lg.Info("added a user", zap.String("user-name", r.Name))
Expand Down Expand Up @@ -461,6 +483,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
delUser(tx, r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -513,6 +536,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
putUser(as.lg, tx, updatedUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -569,6 +593,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
as.invalidateCachedPerm(r.User)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

if as.lg != nil {
as.lg.Info(
Expand Down Expand Up @@ -655,6 +680,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
as.invalidateCachedPerm(r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

if as.lg != nil {
as.lg.Info(
Expand Down Expand Up @@ -729,6 +755,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

if as.lg != nil {
as.lg.Info(
Expand Down Expand Up @@ -788,6 +815,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}

as.commitRevision(tx)
as.saveConsistentIndex(tx)

if as.lg != nil {
as.lg.Info("deleted a role", zap.String("role-name", r.Role))
Expand Down Expand Up @@ -818,6 +846,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
putRole(as.lg, tx, newRole)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

if as.lg != nil {
as.lg.Info("created a role", zap.String("role-name", r.Name))
Expand Down Expand Up @@ -881,6 +910,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

if as.lg != nil {
as.lg.Info(
Expand All @@ -904,8 +934,21 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
if revision == 0 {
return ErrUserEmpty
}

if revision < as.Revision() {
rev := as.Revision()
if revision < rev {
if as.lg != nil {
as.lg.Warn("request auth revision is less than current node auth revision",
zap.Uint64("current node auth revision", rev),
zap.Uint64("request auth revision", revision),
zap.ByteString("request key", key),
zap.Error(ErrAuthOldRevision))
} else {
plog.Warningf("request auth revision is less than current node auth revision,"+
"current node auth revision is %d,"+
"request auth revision is %d,"+
"request key is %s, "+
"err is %v", rev, revision, key, ErrAuthOldRevision)
}
return ErrAuthOldRevision
}

Expand Down Expand Up @@ -1145,6 +1188,8 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
as.commitRevision(tx)
}

as.setupMetricsReporter()

tx.Unlock()
be.ForceCommit()

Expand Down Expand Up @@ -1419,3 +1464,23 @@ func (as *authStore) HasRole(user, role string) bool {
func (as *authStore) BcryptCost() int {
return as.bcryptCost
}

func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
} else {
if as.lg != nil {
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
} else {
plog.Error("failed to save consistentIndex,syncConsistentIndex is nil")
}
}
}

func (as *authStore) setupMetricsReporter() {
reportCurrentAuthRevMu.Lock()
reportCurrentAuthRev = func() float64 {
return float64(as.Revision())
}
reportCurrentAuthRevMu.Unlock()
}
2 changes: 1 addition & 1 deletion clientv3/balancer/picker/err.go
Expand Up @@ -34,6 +34,6 @@ func (ep *errPicker) String() string {
return ep.p.String()
}

func (ep *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err
}
2 changes: 1 addition & 1 deletion clientv3/balancer/picker/roundrobin_balanced.go
Expand Up @@ -52,7 +52,7 @@ type rrBalanced struct {
func (rb *rrBalanced) String() string { return rb.p.String() }

// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
Expand Down
4 changes: 2 additions & 2 deletions clientv3/balancer/resolver/endpoint/endpoint.go
Expand Up @@ -111,7 +111,7 @@ func (e *ResolverGroup) Close() {
}

// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if len(target.Authority) < 1 {
return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func epsToAddrs(eps ...string) (addrs []resolver.Address) {
return addrs
}

func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
func (*Resolver) ResolveNow(o resolver.ResolveNowOptions) {}

func (r *Resolver) Close() {
es, err := bldr.getResolverGroup(r.endpointID)
Expand Down
21 changes: 17 additions & 4 deletions clientv3/maintenance.go
Expand Up @@ -20,6 +20,7 @@ import (
"io"

pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"

"google.golang.org/grpc"
)
Expand Down Expand Up @@ -68,13 +69,15 @@ type Maintenance interface {
}

type maintenance struct {
lg *zap.Logger
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
remote pb.MaintenanceClient
callOpts []grpc.CallOption
}

func NewMaintenance(c *Client) Maintenance {
api := &maintenance{
lg: c.lg,
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
conn, err := c.Dial(endpoint)
if err != nil {
Expand All @@ -93,6 +96,7 @@ func NewMaintenance(c *Client) Maintenance {

func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
api := &maintenance{
lg: c.lg,
dial: func(string) (pb.MaintenanceClient, func(), error) {
return remote, func() {}, nil
},
Expand Down Expand Up @@ -193,23 +197,32 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
return nil, toErr(ctx, err)
}

m.lg.Info("opened snapshot stream; downloading")
pr, pw := io.Pipe()
go func() {
for {
resp, err := ss.Recv()
if err != nil {
switch err {
case io.EOF:
m.lg.Info("completed snapshot read; closing")
default:
m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
}
pw.CloseWithError(err)
return
}
if resp == nil && err == nil {
break
}

// can "resp == nil && err == nil"
// before we receive snapshot SHA digest?
// No, server sends EOF with an empty response
// after it sends SHA digest at the end

if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr)
return
}
}
pw.Close()
}()
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
}
Expand Down

0 comments on commit 9b0a7f7

Please sign in to comment.