-
Notifications
You must be signed in to change notification settings - Fork 204
Decouple read remote from write to server. #310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Unknown CLA label state. Rechecking for CLA labels. Send feedback to sig-contributor-experience at kubernetes/community. /check-cla |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work, left a few comments.
pkg/agent/client.go
Outdated
| defer a.sendLock.Unlock() | ||
|
|
||
| err := a.stream.Send(pkt) | ||
| /*err := a.stream.Send(pkt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just remove these commented codes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. (My bad for not removing prior to sending this out)
| if err != nil && err != io.EOF { | ||
| metrics.Metrics.ObserveFailure(metrics.DirectionToServer) | ||
| a.cs.RemoveClient(a.serverID) | ||
| a.serverError = err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will serverError be recovered/reset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm making an assumption that this channel/connection to the server is no longer functional. By removing the serverID from the ClientSet we are enabling
| if err := cs.connectOnce(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explanation, do you mind adding a log when removing server ID happens, like around https://github.com/kubernetes-sigs/apiserver-network-proxy/pull/310/files#diff-c2eff45b2c7db0ae864aecb2e8b733dfbdfe7070ffe6f7feb1204e086c403a0aR277
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add a UT to protect the UT which removes and adds a server back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| Data: buf[:n], | ||
| ConnectID: connID, | ||
| }} | ||
| data := make([]byte, 0, n) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This decoupling is based on the fact we can store/cache the data received from the remote before sending it to the server.
If the connection to the server is unhealthy or closed, maybe it's in some sense a good idea to block receiving more data from remote until the healthy connection to the server is back, because the cached data won't be sent to the server for connection is closed otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a really nice idea for an improvement. However it would require we implement a resume all the way through. It involved being able to move in flight connections from 1 channel to newer channel. That means we need to coordinate that in both agent and server. While I really like this idea I think its better done in a subsequent change.
| if err != nil && err != io.EOF { | ||
| metrics.Metrics.ObserveFailure(metrics.DirectionToServer) | ||
| a.cs.RemoveClient(a.serverID) | ||
| a.serverError = err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explanation, do you mind adding a log when removing server ID happens, like around https://github.com/kubernetes-sigs/apiserver-network-proxy/pull/310/files#diff-c2eff45b2c7db0ae864aecb2e8b733dfbdfe7070ffe6f7feb1204e086c403a0aR277
| if err != nil && err != io.EOF { | ||
| metrics.Metrics.ObserveFailure(metrics.DirectionToServer) | ||
| a.cs.RemoveClient(a.serverID) | ||
| a.serverError = err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add a UT to protect the UT which removes and adds a server back later.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: cheftako, mainred The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
| a.conn = conn | ||
| a.stream = stream | ||
| a.serverChannel = make(chan *client.Packet, xfrChannelSize) | ||
| a.serverError = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serverError could deserve some comments about: why it doesn't need lock protection, and when set the client is effectively dead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
/hold (to allow address my single comment) /lgtm |
|
New changes are detected. LGTM label has been removed. |
pkg/agent/client.go
Outdated
| return serverCount, nil | ||
| } | ||
|
|
||
| func (a *Client) writeToKonnServer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is already hard to understand the naming in the code (proxy, server, agent, client, frontend, backend, remote etc), that's why I think the writeToKonnServer function name is not the best - also so far this Konn name is not appearing in the code. That's why I would like to suggest some other name like writeToProxyServer or something like that. What do you think? Thanks! Adam
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Currently we read from the remote and on the same routine, we write to the konn server. This means that reading from subsequent reads from remote are effectively blocked on the write to the konn server. This PR decouples the read and write. It combines all the writes on to a channel. Then the writes are on their own routine which reads from that channel. Factored in suggested changes from mainred. Factoring in comment from jkh52 and andrewsykim. Factoring in comment from mihivagyok. Renamed writeToKonnServer as writeToProxyServer.
|
/hold cancel |
| // It just means we do not know yet if it will fail. | ||
| // Slight back-flips here to ensure the write is closing the channel. | ||
| a.cleanChannel.Do(func() { | ||
| klog.V(2).InfoS("Data channel to server has errored out", "serverID", a.serverID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be klog.Error including a.serverError as one of the keys?
| if panicInfo := recover(); panicInfo != nil { | ||
| klog.V(2).InfoS("Exiting writeToProxyServer with recovery", "panicInfo", panicInfo, "serverID", a.serverID) | ||
| } else { | ||
| klog.V(2).InfoS("Exiting writeToProxyServer", "serverID", a.serverID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v=2 seems a little low for this doesn't it? I would expect this to be v=4 at least
| for pkt := range a.serverChannel { | ||
| klog.V(5).InfoS("writeToProxyServer recevied packet to send to KonnServer", "serverID", a.serverID) | ||
| err := a.stream.Send(pkt) | ||
| if err != nil && err != io.EOF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be cleaning up this goroutine if err == io.EOF?
|
The Kubernetes project currently lacks enough contributors to adequately respond to all issues and PRs. This bot triages issues and PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle stale |
|
The Kubernetes project currently lacks enough active contributors to adequately respond to all issues and PRs. This bot triages issues and PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle rotten |
|
The Kubernetes project currently lacks enough active contributors to adequately respond to all issues and PRs. This bot triages issues and PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /close |
|
@k8s-triage-robot: Closed this PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
|
/remove-lifecycle rotten |
|
@cheftako: PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
|
@cheftako: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
|
The Kubernetes project currently lacks enough contributors to adequately respond to all issues and PRs. This bot triages issues and PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle stale |
|
The Kubernetes project currently lacks enough active contributors to adequately respond to all issues and PRs. This bot triages issues and PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle rotten |
|
The Kubernetes project currently lacks enough active contributors to adequately respond to all issues and PRs. This bot triages PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /close |
|
@k8s-triage-robot: Closed this PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Currently we read from the remote and on the same routine, we write to
the konn server. This means that reading from subsequent reads from
remote are effectively blocked on the write to the konn server. This PR
decouples the read and write. It combines all the writes on to a
channel. Then the writes are on their own routine which reads from that
channel.