From 6933086e596e5ed9b4160541b5ba9da6fddbc679 Mon Sep 17 00:00:00 2001 From: Joseph Anttila Hall Date: Fri, 13 May 2022 10:37:40 -0700 Subject: [PATCH] Bugfix: send missing dial identifier when no agents are connected. This allows the client to notice immediately, instead of timing out. I forked this from a slightly larger https://github.com/kubernetes-sigs/apiserver-network-proxy/pull/307 and will try to annotate PR so that mainred@'s contribution is recognized. --- pkg/server/backend_manager.go | 2 +- pkg/server/server.go | 15 ++-- pkg/server/server_test.go | 146 ++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 6 deletions(-) diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index 62954acf3..6fd92026b 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -272,7 +272,7 @@ type ErrNotFound struct{} // Error returns the error message. func (e *ErrNotFound) Error() string { - return "No backend available" + return "No agent available" } type ErrWrongIDType struct { diff --git a/pkg/server/server.go b/pkg/server/server.go index 44623efe9..6dd4846db 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -416,25 +416,30 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, switch pkt.Type { case client.PacketType_DIAL_REQ: klog.V(5).Infoln("Received DIAL_REQ") + random := pkt.GetDialRequest().Random // TODO: if we track what agent has historically served // the address, then we can send the Dial_REQ to the // same agent. That way we save the agent from creating // a new connection to the address. backend, err = s.getBackend(pkt.GetDialRequest().Address) if err != nil { - klog.ErrorS(err, "Failed to get a backend", "serverID", s.serverID) + klog.ErrorS(err, "Failed to get a backend", "serverID", s.serverID, "dialID", random) resp := &client.Packet{ - Type: client.PacketType_DIAL_RSP, - Payload: &client.Packet_DialResponse{DialResponse: &client.DialResponse{Error: err.Error()}}, + Type: client.PacketType_DIAL_RSP, + Payload: &client.Packet_DialResponse{ + DialResponse: &client.DialResponse{ + Random: random, + Error: err.Error(), + }, + }, } if err := stream.Send(resp); err != nil { - klog.V(5).InfoS("Failed to send DIAL_RSP for no backend", "error", err, "serverID", s.serverID) + klog.V(5).InfoS("Failed to send DIAL_RSP for no backend", "error", err, "serverID", s.serverID, "dialID", random) } // The Dial is failing; no reason to keep this goroutine. return } - random := pkt.GetDialRequest().Random s.PendingDial.Add( random, &ProxyClientConnection{ diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 401572a91..5ea985d82 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -22,8 +22,10 @@ import ( "io" "reflect" "testing" + "time" "github.com/golang/mock/gomock" + "github.com/google/uuid" "google.golang.org/grpc/metadata" authv1 "k8s.io/api/authentication/v1" @@ -32,6 +34,7 @@ import ( fakeauthenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1/fake" k8stesting "k8s.io/client-go/testing" + client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" agentmock "sigs.k8s.io/apiserver-network-proxy/proto/agent/mocks" "sigs.k8s.io/apiserver-network-proxy/proto/header" ) @@ -218,3 +221,146 @@ func TestAddRemoveFrontends(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } } + +func prepareFrontendConn(ctrl *gomock.Controller) *agentmock.MockAgentService_ConnectServer { + // prepare the connection to fontend of proxy-server + frontendConn := agentmock.NewMockAgentService_ConnectServer(ctrl) + frontendConnMD := metadata.MD{ + ":authority": []string{"127.0.0.1:8090"}, + "content-type": []string{"application/grpc"}, + "user-agent": []string{"grpc-go/1.42.0"}, + } + frontendConnCtx := metadata.NewIncomingContext(context.Background(), frontendConnMD) + frontendConn.EXPECT().Context().Return(frontendConnCtx).AnyTimes() + return frontendConn +} + +func prepareAgentConnMD(ctrl *gomock.Controller, proxyServer *ProxyServer) *agentmock.MockAgentService_ConnectServer { + // prepare the the connection to agent of proxy-server + agentConn := agentmock.NewMockAgentService_ConnectServer(ctrl) + agentConnMD := metadata.MD{ + ":authority": []string{"127.0.0.1:8091"}, + "agentid": []string{uuid.New().String()}, + "agentidentifiers": []string{}, + "content-type": []string{"application/grpc"}, + "user-agent": []string{"grpc-go/1.42.0"}, + } + agentConnCtx := metadata.NewIncomingContext(context.Background(), agentConnMD) + agentConn.EXPECT().Context().Return(agentConnCtx).AnyTimes() + + _ = proxyServer.addBackend(uuid.New().String(), agentConn) + return agentConn +} + +func baseServerProxyTestWithoutBackend(t *testing.T, validate func(*agentmock.MockAgentService_ConnectServer)) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + frontendConn := prepareFrontendConn(ctrl) + proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, true) + + validate(frontendConn) + + proxyServer.Proxy(frontendConn) + + // add a sleep to make sure `serveRecvFrontend` ends after `Proxy` finished. + time.Sleep(1 * time.Second) +} + +func baseServerProxyTestWithBackend(t *testing.T, validate func(*agentmock.MockAgentService_ConnectServer, *agentmock.MockAgentService_ConnectServer)) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + frontendConn := prepareFrontendConn(ctrl) + + // prepare proxy server + proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, true) + + agentConn := prepareAgentConnMD(ctrl, proxyServer) + + validate(frontendConn, agentConn) + + proxyServer.Proxy(frontendConn) + + // add a sleep to make sure `serveRecvFrontend` ends after `Proxy` finished. + time.Sleep(1 * time.Second) +} + +func TestServerProxyNoBackend(t *testing.T) { + validate := func(frontendConn *agentmock.MockAgentService_ConnectServer) { + // receive DIAL_REQ from frontend and proxy to backend + dialReq := &client.Packet{ + Type: client.PacketType_DIAL_REQ, + Payload: &client.Packet_DialRequest{ + DialRequest: &client.DialRequest{ + Protocol: "tcp", + Address: "127.0.0.1:8080", + Random: 111, + }, + }, + } + + dialResp := &client.Packet{ + Type: client.PacketType_DIAL_RSP, + Payload: &client.Packet_DialResponse{ + DialResponse: &client.DialResponse{ + Random: 111, + Error: (&ErrNotFound{}).Error(), + }}, + } + + gomock.InOrder( + frontendConn.EXPECT().Recv().Return(dialReq, nil).Times(1), + frontendConn.EXPECT().Recv().Return(nil, io.EOF).Times(1), + // NOTE(mainred): `Send` should come before `Recv` io.EOF, but we cannot add wait between + // two Recvs, thus `Recv`` comes before `Send` + frontendConn.EXPECT().Send(dialResp).Return(nil).Times(1), + ) + + } + baseServerProxyTestWithoutBackend(t, validate) +} + +func TestServerProxyNormalClose(t *testing.T) { + validate := func(frontendConn, agentConn *agentmock.MockAgentService_ConnectServer) { + // receive DIAL_REQ from frontend and proxy to backend + dialReq := &client.Packet{ + Type: client.PacketType_DIAL_REQ, + Payload: &client.Packet_DialRequest{ + DialRequest: &client.DialRequest{ + Protocol: "tcp", + Address: "127.0.0.1:8080", + Random: 111, + }, + }, + } + + // recevie CLOSE_REQ from frontend and proxy to backend + closeReq := &client.Packet{ + Type: client.PacketType_CLOSE_REQ, + Payload: &client.Packet_CloseRequest{ + CloseRequest: &client.CloseRequest{ + ConnectID: 1, + }}, + } + // This extra close is unwanted and should be removed; see + // https://github.com/kubernetes-sigs/apiserver-network-proxy/pull/307 + extraCloseReq := &client.Packet{ + Type: client.PacketType_CLOSE_REQ, + Payload: &client.Packet_CloseRequest{ + CloseRequest: &client.CloseRequest{}}, + } + + gomock.InOrder( + frontendConn.EXPECT().Recv().Return(dialReq, nil).Times(1), + frontendConn.EXPECT().Recv().Return(closeReq, nil).Times(1), + frontendConn.EXPECT().Recv().Return(nil, io.EOF).Times(1), + ) + gomock.InOrder( + agentConn.EXPECT().Send(dialReq).Return(nil).Times(1), + agentConn.EXPECT().Send(closeReq).Return(nil).Times(1), + agentConn.EXPECT().Send(extraCloseReq).Return(nil).Times(1), + ) + } + baseServerProxyTestWithBackend(t, validate) +}