-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
change kubeletEndpoint port when cloudstream conneted #3277
Conversation
Signed-off-by: chenchunxiu <chenchunxiu_yewu@cmss.chinamobile.com>
a96c160
to
4174812
Compare
anybody has time to review? @fisherxu |
Thanks for fixing. Is it because the node has been created, and then |
I have some doubts. The original logic is to update the What is the difference between them? |
For edge nodes, only the address of LB. e.g. maybe This is my understanding of your explanation, is that the case? |
This is caused by inconsistent load-balancing on multiple requests from a some edge node. |
CloudStream/EdgeStream use one connection, and the cloudhub/edgehub use another connection. The two connections may on two servers behide LB, and we use the cloud/edge hub to update the nodestatus, so CloudStream/EdgeStream will not work. @zc2638 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, please resolve the confict :) @chenchunxiu
fc3d411
to
4174812
Compare
Done |
getNode.Status.DaemonEndpoints.KubeletEndpoint.Port = int32(s.tunnelPort) | ||
node, err := client.GetKubeClient().CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metav1.UpdateOptions{}) | ||
if err != nil { | ||
klog.Errorf("update node KubeletEndpoint Port failed with error: %s, node: %v, tunnelPort:%v", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have some try then exit directly if err occurs in this func. @chenchunxiu
Ref: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/nodelifecycle/node_lifecycle_controller.go#L792
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if errors.IsNotFound(err) { | ||
klog.Warningf("node %s not found", nodeName) | ||
return false, nil | ||
} | ||
if err != nil { | ||
klog.Errorf("Failed while getting a Node to retry updating node KubeletEndpoint Port. error: %v", nodeName, err) | ||
return false, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if errors.IsNotFound(err) { | |
klog.Warningf("node %s not found", nodeName) | |
return false, nil | |
} | |
if err != nil { | |
klog.Errorf("Failed while getting a Node to retry updating node KubeletEndpoint Port. error: %v", nodeName, err) | |
return false, err | |
} | |
if err != nil { | |
klog.Errorf("Failed while getting a Node to retry updating node KubeletEndpoint Port, node: %s, error: %v", nodeName, err) | |
return false, nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
getNode.Status.DaemonEndpoints.KubeletEndpoint.Port = int32(s.tunnelPort) | ||
_, err = client.GetKubeClient().CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metav1.UpdateOptions{}) | ||
if err != nil { | ||
klog.Errorf("update node KubeletEndpoint Port failed with error: %s, node: %v, tunnelPort: %v", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
klog.Errorf("update node KubeletEndpoint Port failed with error: %s, node: %v, tunnelPort: %v", | |
klog.Errorf("Failed to update node KubeletEndpoint Port, node: %s, tunnelPort: %s, err: %v", getNode.Name, s.tunnelPort, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
klog.Errorf("Update KubeletEndpoint Port of Node '%v' error: %v. ", nodeName, err) | ||
os.Exit(1) | ||
} | ||
klog.Infof("update node KubeletEndpoint Port success. node: %s, tunnelPort: %v", nodeName, s.tunnelPort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
klog.Infof("update node KubeletEndpoint Port success. node: %s, tunnelPort: %v", nodeName, s.tunnelPort) | |
klog.V(2).Infof("Update node KubeletEndpoint Port successfully, node: %s, tunnelPort: %s", nodeName, s.tunnelPort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V(4) maybe more fine, or large scale edge nodes will print lots of success logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
return true, nil | ||
}); err != nil { | ||
klog.Errorf("Update KubeletEndpoint Port of Node '%v' error: %v. ", nodeName, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
klog.Errorf("Update KubeletEndpoint Port of Node '%v' error: %v. ", nodeName, err) | |
klog.Errorf("Failed to update node KubeletEndpoint Port, node: %s, tunnelPort: %s, err: %v", nodeName, s.tunnelPort, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
"github.com/kubeedge/kubeedge/pkg/stream" | ||
) | ||
|
||
const ( | ||
// The amount of time the tunnelserver should sleep between retrying node status updates | ||
retrySleepTime = 20 * time.Millisecond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retrySleepTime = 20 * time.Millisecond | |
retrySleepTime = 20 * time.Second |
The retry time can be 20s here, if there is a delay in edge node creation, here need to wait more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -169,3 +184,30 @@ func (s *TunnelServer) Start() { | |||
return | |||
} | |||
} | |||
|
|||
func (s *TunnelServer) updateNodeKubeletEndpoint(nodeName string) { | |||
if err := wait.PollImmediate(retrySleepTime, nodeStatusUpdateRetry, func() (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := wait.PollImmediate(retrySleepTime, nodeStatusUpdateRetry, func() (bool, error) { | |
if err := wait.Poll(retrySleepTime, nodeStatusUpdateRetry, func() (bool, error) { |
Let's use wait.Poll
here, PollImmediate will exit when err occur, here we need to retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reply.
I test in my machine, if err happens retry will execute.
wait.PollImmediate
will run the func immediately, and wait.Poll
will wait for the interval before run the func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okey, sounds good, and so we need to return false,nil
as I comment above. Or the retry will exit directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reply, I have changed the return false, err
to return false, nil
.
type TunnelServer struct { | ||
container *restful.Container | ||
upgrader websocket.Upgrader | ||
sync.Mutex | ||
sessions map[string]*Session | ||
nodeNameIP sync.Map | ||
tunnelPort int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stupid question 😄 Is this tunnelPort
from the configuration file and need to be configured by users themselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, users do not need to configure manually.
tunnelPort will be automatically generated when cloudcore is started at first time and saved in configmap. It will be read directly from configmap the next time cloudcore started.
Signed-off-by: chenchunxiu <chenchunxiu_yewu@cmss.chinamobile.com>
fedff6d
to
30a32b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Sorry for the delay.
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: fisherxu The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Signed-off-by: chenchunxiu chenchunxiu_yewu@cmss.chinamobile.com
What type of PR is this?
/kind bug
What this PR does / why we need it:
If cloudcore is multi-active (cloudcore1, cloudcore2, cloudcore3) and LB is used.
In bellowing case, kubectl exec/logs failed.
LB forward connection of cloudhub (from edge-node-01) to cloudcore1. --- cloudhub port 10000
LB forward connection of cloudstream (from edge-node-01) to cloudcore2. --- cloudstream port 10004
So, we should change kubeletEndpoint when cloudstrem conneted, not cloudhub connected.
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing change?: