-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ETCD-349: add etcdmemberlister controller #962
ETCD-349: add etcdmemberlister controller #962
Conversation
func (c *ClusterMemberController) allNodesMapToVotingMembers(nodes []*corev1.Node) ([]*corev1.Node, error) { | ||
// allNodesMapToNonVotingMembers returns nodes that don't map to voting members (i.e. non learner) in the etcd cluster membership. | ||
// The voting members are read from the etcd cluster membership. | ||
func (c *ClusterMemberController) allNodesMapToNonVotingMembers(nodes []*corev1.Node) ([]*corev1.Node, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type EtcdMemberListerController struct { | ||
operatorClient v1helpers.OperatorClient | ||
etcdClient etcdcli.EtcdClient | ||
membersCache map[string]*etcdserverpb.Member |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will need some kind of locking, given this will be consumed by other controllers?
err := c.syncEtcdMembers(ctx, syncCtx.Recorder()) | ||
if err != nil { | ||
_, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ | ||
Type: "EtcdMemberListerControllerDegraded", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should only degrade if this fails for several consecutive times, a transient failure every 30s shouldn't be a big issue to degrade the whole operator on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done 👍🏽
@tjungblu thanks a lot for your review .. I just want to clarify what this controller is going to do
I have chosen to use t Also please correct me if I am wrong :) |
this 👍 cache All etcd members |
errKeyNotExist = errors.New("key does not exist") | ||
) | ||
|
||
type membersCache struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tjungblu would this make sense ? .. I am gonna embed this within the controller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the struct makes sense, I'm not sure why you wanted to have it as a dedicated controller (or maybe I did).
it's more sensible to me to put this into etcdClientGetter
struct in the etcdcli, then MemberList
can always return a cached response or fetch when outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alternatively, for choice of different consumers, we could also have another layer on top of etcdcli
that would implement a cache for the MemberLister
/ HealthyMemberLister/UnhealthyMemberLister interface in the same package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something along those lines:
package etcdcli
import (
"context"
"go.etcd.io/etcd/api/v3/etcdserverpb"
)
type CachedMembers struct {
// some caching logic
}
func (c CachedMembers) MemberList(ctx context.Context) ([]*etcdserverpb.Member, error) {
//TODO implement me
panic("implement me")
}
func (c CachedMembers) VotingMemberList(ctx context.Context) ([]*etcdserverpb.Member, error) {
//TODO implement me
panic("implement me")
}
func (c CachedMembers) HealthyMembers(ctx context.Context) ([]*etcdserverpb.Member, error) {
//TODO implement me
panic("implement me")
}
func (c CachedMembers) HealthyVotingMembers(ctx context.Context) ([]*etcdserverpb.Member, error) {
//TODO implement me
panic("implement me")
}
func (c CachedMembers) UnhealthyMembers(ctx context.Context) ([]*etcdserverpb.Member, error) {
//TODO implement me
panic("implement me")
}
func (c CachedMembers) UnhealthyVotingMembers(ctx context.Context) ([]*etcdserverpb.Member, error) {
//TODO implement me
panic("implement me")
}
18709d2
to
eadaf17
Compare
/label tide/merge-method-squash |
de78602
to
4065364
Compare
/retest-required |
func (mc *membersCache) add(key string, value *etcdserverpb.Member) (bool, error) { | ||
if mc == nil || mc.cache == nil { | ||
return false, errNilMembersCache | ||
} | ||
defer mc.mux.Unlock() | ||
|
||
mc.mux.Lock() | ||
_, ok := mc.cache[key] | ||
if ok { | ||
return false, errKeyExist | ||
} | ||
// safe to add | ||
mc.cache[key] = *value | ||
return true, nil | ||
} | ||
|
||
func (mc *membersCache) update(key string, value *etcdserverpb.Member) (bool, error) { | ||
if mc == nil || mc.cache == nil { | ||
return false, errNilMembersCache | ||
} | ||
defer mc.mux.Unlock() | ||
|
||
mc.mux.Lock() | ||
_, ok := mc.cache[key] | ||
if !ok { | ||
return false, errKeyNotExist | ||
} | ||
// safe to update | ||
mc.cache[key] = *value | ||
return true, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need add/update/delete/get methods on the member cache if we're just outright replacing the cache when the membership changes?
If not, then we can remove these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes u r right, initially i was using them to update the cache with new entries and remove stale entries .. but we dont need them now, removing ...
// build map from current live members | ||
currentMembersMap, errs := c.currentMembersMap(members) | ||
if len(errs) > 0 { | ||
syncErrs = append(syncErrs, errs...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we continuing to check the cache here if we have errors from building the current members map?
Shouldn't we return the errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well I did not know what to do tbh :)
So I can proceed with building the map while accumulating errors as I do now, or fail-fast once i encounter an error
wdyt ?
cc @tjungblu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fail-fast, but only degrade the operator after several consecutive failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
d83c682
to
c99c538
Compare
|
||
// check membership changes against members cache | ||
if !reflect.DeepEqual(currentMembersMap, c.membersCache) { | ||
klog.V(2).Infof("detected changes in etcd cluster membership, updating controller cache") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add the previous members and the current members? I've done something similar in the client and it's super useful when going through must gathers
var ( | ||
errNilMembersCache = errors.New("member cache is nil") | ||
errKeyExist = errors.New("key already exist") | ||
errKeyNotExist = errors.New("key does not exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the errors can also be deleted now that you've removed the diff/add/remove logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
c.lastConsecutiveFailures++ | ||
if c.lastConsecutiveFailures >= 5 { | ||
// reset counter before going degraded | ||
c.lastConsecutiveFailures = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't count ConsecutiveFailures, but accumulates all failures ever happened until you reset after five times. You need to reset the counter when it was successful too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks a lot 👍🏽
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/retest-required |
9eac35d
to
eec479c
Compare
@@ -52,6 +55,9 @@ type ClusterMemberController struct { | |||
|
|||
masterNodeLister corev1listers.NodeLister | |||
masterNodeSelector labels.Selector | |||
memberListerCtrl *etcdmemberlistercontroller.EtcdMemberListerController | |||
members []etcdserverpb.Member |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this controller has two usage of memberlist()
- one uses to validate if a member is already part of the cluster
- one iterates over the members and attempt to promote learners
Therefore i added both these fields and i update them on notifications from etcdmemberlistercontroller
pkg/operator/starter.go
Outdated
@@ -316,6 +323,7 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle | |||
configInformers.Config().V1().Networks(), | |||
etcdClient, | |||
controllerContext.EventRecorder, | |||
&etcdmemberlistercontroller, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line does not compile, i do not know how to fix it :/
So the NewClusterMemberController
returns a factory.Controller
type . I need to have a ref for the etcdmemberlistercontroller
to update the local cache in each controller upon notifications
please let me know wdyt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest you to create an interface that is returned by NewEtcdMemberListerController
instead. You're also missing to actually run that controller.
Again, just mimick what envVarController
already does here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed 👍🏽
if err != nil { | ||
return fmt.Errorf("could not get etcd member list: %v", err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this call will be removed with my changes .. so instead of issuing a request to etcd-server, the controller retrieve them locally .. etcdmemberlistercontroller
makes sure other controllers do not have stale cached data
@@ -9,6 +9,7 @@ import ( | |||
"go.etcd.io/etcd/api/v3/etcdserverpb" | |||
|
|||
errorsutil "k8s.io/apimachinery/pkg/util/errors" | |||
interfaces "k8s.io/apiserver/pkg/server/dynamiccertificates" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that looks like a weird thing to import, why is that needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i found Notifier
and Listener
already defined there so I used them
I can also create my own interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the envvar controller using?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's own interfaces .. I created mine, I can use others .. maybe move them into commons ?
} | ||
} | ||
c.listeners = append(c.listeners, listener) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
members := make([]etcdserverpb.Member, 0, len(c.membersCache.cache)) | ||
defer c.membersCache.mux.RUnlock() | ||
|
||
c.membersCache.mux.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not wrong what you wrote here, but for better readability I would put the locking to the start of the method:
c.membersCache.mux.RLock()
defer c.membersCache.mux.RUnlock()
members := make([]etcdserverpb.Member, 0, len(c.membersCache.cache))
...
So when people add more lines later, they have an easier time to reason about the scope of the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if peerURL == m.PeerURLs[0] { | ||
return true | ||
} | ||
func (c *ClusterMemberController) Enqueue() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can create a race condition. Imagine this controller is running every 10s on a different goroutine than the member lister controller.
The updates done in this Enqueue method might not be fully visible during the controller sync. I like this idea with the set, but you maybe just want to implement it in the boring way by listing all members when the controller sync runs and just reuse that return throughout the whole lifecycle of the sync.
OR alternatively add some locking around it, not sure it's really worth it because the controller runs never the less. If you were to tie the controller sync period to only run when the member listing changes, that would be useful but likely not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused now :D
So Enqueue
is invoked by the etcdmemberlistercontroller
when the member list changes only. it updates it own cache and then sync Listener's caches by invoking Enqueue()
.. Moreover, Enqueue calls
(c *EtcdMemberListerController) GetMembers()
which uses the lock before reading the cache and returning a copy to the Listener
Listener's Sync will use the local updated cache to reconcile
Where is the race condition ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simply put, you're adjusting the internal structure of clustermembercontroller
here that runs on a different controller goroutine.
Enqueue is called from the etcdmemberlistercontroller
controller goroutine, so you need to lock here in this controller as well if you want to update the struct that it's reading. Otherwise you'd have stale reads from cpu cache, that's the race condition.
I'm sure if you were to run both controllers in goroutines in a test with -race
it would flag that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see it now, but what is the right way then, without using extra locking ? .. i think we should reduce this, or ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in clustermembercontroller.sync
I would just call getMembers()
as it was with the client. This is properly locked and you always get consistent data back for the lifetime of the sync call. We can add the notification fanciness later to only run the controller when the member list changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I agree, I am also using []Member
instead of map[string]Member
since the controller need not lookup by memberIP ..
I am adding also GetHealthyMember
and GetUnhealthyMembers
so that other controllers need not to use the etcdclient directly for any member listing
WIP .. :D
eec479c
to
d71b42f
Compare
/retest-required |
1 similar comment
/retest-required |
i gave up :D |
/retest-required |
Issues go stale after 90d of inactivity. Mark the issue as fresh by commenting If this issue is safe to close now please do so with /lifecycle stale |
Stale issues rot after 30d of inactivity. Mark the issue as fresh by commenting If this issue is safe to close now please do so with /lifecycle rotten |
Rotten issues close after 30d of inactivity. Reopen the issue by commenting /close |
@openshift-bot: Closed this PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/reopen |
/remove-lifecycle rotten |
/reopen |
@Elbehery: This pull request references ETCD-349 which is a valid jira issue. Warning: The referenced jira issue has an invalid target version for the target branch this PR targets: expected the story to target the "4.16.0" version, but no target version was set. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the openshift-eng/jira-lifecycle-plugin repository. |
@Elbehery: Reopened this PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
@Elbehery: Reopened this PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Elbehery The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
8f8dac4
to
058308c
Compare
058308c
to
374195c
Compare
resolves https://issues.redhat.com/browse/ETCD-349
cc @tjungblu @hasbro17 minimal controller to discuss on this PR the right directions