Skip to content

Commit

Permalink
Bugfix: send missing dial identifier when no agents are connected.
Browse files Browse the repository at this point in the history
This allows the client to notice immediately, instead of timing out. I
forked this from a slightly larger
#307
and will try to annotate PR so that mainred@'s contribution is recognized.
  • Loading branch information
jkh52 committed May 16, 2022
1 parent 5308cea commit 6933086
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/server/backend_manager.go
Expand Up @@ -272,7 +272,7 @@ type ErrNotFound struct{}

// Error returns the error message.
func (e *ErrNotFound) Error() string {
return "No backend available"
return "No agent available"
}

type ErrWrongIDType struct {
Expand Down
15 changes: 10 additions & 5 deletions pkg/server/server.go
Expand Up @@ -416,25 +416,30 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
switch pkt.Type {
case client.PacketType_DIAL_REQ:
klog.V(5).Infoln("Received DIAL_REQ")
random := pkt.GetDialRequest().Random
// TODO: if we track what agent has historically served
// the address, then we can send the Dial_REQ to the
// same agent. That way we save the agent from creating
// a new connection to the address.
backend, err = s.getBackend(pkt.GetDialRequest().Address)
if err != nil {
klog.ErrorS(err, "Failed to get a backend", "serverID", s.serverID)
klog.ErrorS(err, "Failed to get a backend", "serverID", s.serverID, "dialID", random)

resp := &client.Packet{
Type: client.PacketType_DIAL_RSP,
Payload: &client.Packet_DialResponse{DialResponse: &client.DialResponse{Error: err.Error()}},
Type: client.PacketType_DIAL_RSP,
Payload: &client.Packet_DialResponse{
DialResponse: &client.DialResponse{
Random: random,
Error: err.Error(),
},
},
}
if err := stream.Send(resp); err != nil {
klog.V(5).InfoS("Failed to send DIAL_RSP for no backend", "error", err, "serverID", s.serverID)
klog.V(5).InfoS("Failed to send DIAL_RSP for no backend", "error", err, "serverID", s.serverID, "dialID", random)
}
// The Dial is failing; no reason to keep this goroutine.
return
}
random := pkt.GetDialRequest().Random
s.PendingDial.Add(
random,
&ProxyClientConnection{
Expand Down
146 changes: 146 additions & 0 deletions pkg/server/server_test.go
Expand Up @@ -22,8 +22,10 @@ import (
"io"
"reflect"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"google.golang.org/grpc/metadata"

authv1 "k8s.io/api/authentication/v1"
Expand All @@ -32,6 +34,7 @@ import (
fakeauthenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1/fake"
k8stesting "k8s.io/client-go/testing"

client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
agentmock "sigs.k8s.io/apiserver-network-proxy/proto/agent/mocks"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
)
Expand Down Expand Up @@ -218,3 +221,146 @@ func TestAddRemoveFrontends(t *testing.T) {
t.Errorf("expected %v, got %v", e, a)
}
}

func prepareFrontendConn(ctrl *gomock.Controller) *agentmock.MockAgentService_ConnectServer {
// prepare the connection to fontend of proxy-server
frontendConn := agentmock.NewMockAgentService_ConnectServer(ctrl)
frontendConnMD := metadata.MD{
":authority": []string{"127.0.0.1:8090"},
"content-type": []string{"application/grpc"},
"user-agent": []string{"grpc-go/1.42.0"},
}
frontendConnCtx := metadata.NewIncomingContext(context.Background(), frontendConnMD)
frontendConn.EXPECT().Context().Return(frontendConnCtx).AnyTimes()
return frontendConn
}

func prepareAgentConnMD(ctrl *gomock.Controller, proxyServer *ProxyServer) *agentmock.MockAgentService_ConnectServer {
// prepare the the connection to agent of proxy-server
agentConn := agentmock.NewMockAgentService_ConnectServer(ctrl)
agentConnMD := metadata.MD{
":authority": []string{"127.0.0.1:8091"},
"agentid": []string{uuid.New().String()},
"agentidentifiers": []string{},
"content-type": []string{"application/grpc"},
"user-agent": []string{"grpc-go/1.42.0"},
}
agentConnCtx := metadata.NewIncomingContext(context.Background(), agentConnMD)
agentConn.EXPECT().Context().Return(agentConnCtx).AnyTimes()

_ = proxyServer.addBackend(uuid.New().String(), agentConn)
return agentConn
}

func baseServerProxyTestWithoutBackend(t *testing.T, validate func(*agentmock.MockAgentService_ConnectServer)) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

frontendConn := prepareFrontendConn(ctrl)
proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, true)

validate(frontendConn)

proxyServer.Proxy(frontendConn)

// add a sleep to make sure `serveRecvFrontend` ends after `Proxy` finished.
time.Sleep(1 * time.Second)
}

func baseServerProxyTestWithBackend(t *testing.T, validate func(*agentmock.MockAgentService_ConnectServer, *agentmock.MockAgentService_ConnectServer)) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

frontendConn := prepareFrontendConn(ctrl)

// prepare proxy server
proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, true)

agentConn := prepareAgentConnMD(ctrl, proxyServer)

validate(frontendConn, agentConn)

proxyServer.Proxy(frontendConn)

// add a sleep to make sure `serveRecvFrontend` ends after `Proxy` finished.
time.Sleep(1 * time.Second)
}

func TestServerProxyNoBackend(t *testing.T) {
validate := func(frontendConn *agentmock.MockAgentService_ConnectServer) {
// receive DIAL_REQ from frontend and proxy to backend
dialReq := &client.Packet{
Type: client.PacketType_DIAL_REQ,
Payload: &client.Packet_DialRequest{
DialRequest: &client.DialRequest{
Protocol: "tcp",
Address: "127.0.0.1:8080",
Random: 111,
},
},
}

dialResp := &client.Packet{
Type: client.PacketType_DIAL_RSP,
Payload: &client.Packet_DialResponse{
DialResponse: &client.DialResponse{
Random: 111,
Error: (&ErrNotFound{}).Error(),
}},
}

gomock.InOrder(
frontendConn.EXPECT().Recv().Return(dialReq, nil).Times(1),
frontendConn.EXPECT().Recv().Return(nil, io.EOF).Times(1),
// NOTE(mainred): `Send` should come before `Recv` io.EOF, but we cannot add wait between
// two Recvs, thus `Recv`` comes before `Send`
frontendConn.EXPECT().Send(dialResp).Return(nil).Times(1),
)

}
baseServerProxyTestWithoutBackend(t, validate)
}

func TestServerProxyNormalClose(t *testing.T) {
validate := func(frontendConn, agentConn *agentmock.MockAgentService_ConnectServer) {
// receive DIAL_REQ from frontend and proxy to backend
dialReq := &client.Packet{
Type: client.PacketType_DIAL_REQ,
Payload: &client.Packet_DialRequest{
DialRequest: &client.DialRequest{
Protocol: "tcp",
Address: "127.0.0.1:8080",
Random: 111,
},
},
}

// recevie CLOSE_REQ from frontend and proxy to backend
closeReq := &client.Packet{
Type: client.PacketType_CLOSE_REQ,
Payload: &client.Packet_CloseRequest{
CloseRequest: &client.CloseRequest{
ConnectID: 1,
}},
}
// This extra close is unwanted and should be removed; see
// https://github.com/kubernetes-sigs/apiserver-network-proxy/pull/307
extraCloseReq := &client.Packet{
Type: client.PacketType_CLOSE_REQ,
Payload: &client.Packet_CloseRequest{
CloseRequest: &client.CloseRequest{}},
}

gomock.InOrder(
frontendConn.EXPECT().Recv().Return(dialReq, nil).Times(1),
frontendConn.EXPECT().Recv().Return(closeReq, nil).Times(1),
frontendConn.EXPECT().Recv().Return(nil, io.EOF).Times(1),
)
gomock.InOrder(
agentConn.EXPECT().Send(dialReq).Return(nil).Times(1),
agentConn.EXPECT().Send(closeReq).Return(nil).Times(1),
agentConn.EXPECT().Send(extraCloseReq).Return(nil).Times(1),
)
}
baseServerProxyTestWithBackend(t, validate)
}

0 comments on commit 6933086

Please sign in to comment.