Skip to content

Commit

Permalink
Merge pull request #536 from tallclair/full-agent-testing
Browse files Browse the repository at this point in the history
Run integration tests against full agent & proxy-server apps
  • Loading branch information
k8s-ci-robot committed Dec 5, 2023
2 parents 6c63560 + 89dd3d3 commit ab4baf8
Show file tree
Hide file tree
Showing 20 changed files with 605 additions and 331 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ konnectivity.out
konnectivity.html
konnectivity-client/client.out
konnectivity-client/client.html

tests.test
30 changes: 21 additions & 9 deletions cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,50 @@ func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command
Use: "agent",
Long: `A gRPC agent, Connects to the proxy and then allows traffic to be forwarded to it.`,
RunE: func(cmd *cobra.Command, args []string) error {
return a.run(o)
stopCh := make(chan struct{})
return a.Run(o, stopCh)
},
}

return cmd
}

type Agent struct {
adminServer *http.Server
healthServer *http.Server

cs *agent.ClientSet
}

func (a *Agent) run(o *options.GrpcProxyAgentOptions) error {
func (a *Agent) Run(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) error {
o.Print()
if err := o.Validate(); err != nil {
return fmt.Errorf("failed to validate agent options with %v", err)
}

stopCh := make(chan struct{})

cs, err := a.runProxyConnection(o, stopCh)
if err != nil {
return fmt.Errorf("failed to run proxy connection with %v", err)
}
a.cs = cs

if err := a.runHealthServer(o, cs); err != nil {
return fmt.Errorf("failed to run health server with %v", err)
}
defer a.healthServer.Close()

if err := a.runAdminServer(o); err != nil {
return fmt.Errorf("failed to run admin server with %v", err)
}
defer a.adminServer.Close()

<-stopCh
klog.V(1).Infoln("Shutting down agent.")

return nil
}

func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (agent.ReadinessManager, error) {
func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (*agent.ClientSet, error) {
var tlsConfig *tls.Config
var err error
if tlsConfig, err = util.GetClientTLSConfig(o.CaCert, o.AgentCert, o.AgentKey, o.ProxyServerHost, o.AlpnProtos); err != nil {
Expand Down Expand Up @@ -149,7 +156,7 @@ func (a *Agent) runHealthServer(o *options.GrpcProxyAgentOptions, cs agent.Readi
// "/ready" is deprecated but being maintained for backward compatibility
muxHandler.HandleFunc("/ready", readinessHandler)
muxHandler.HandleFunc("/readyz", readinessHandler)
healthServer := &http.Server{
a.healthServer = &http.Server{
Addr: net.JoinHostPort(o.HealthServerHost, strconv.Itoa(o.HealthServerPort)),
Handler: muxHandler,
MaxHeaderBytes: 1 << 20,
Expand All @@ -160,7 +167,7 @@ func (a *Agent) runHealthServer(o *options.GrpcProxyAgentOptions, cs agent.Readi
"core", "healthListener",
"port", strconv.Itoa(o.HealthServerPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveHealth(healthServer) })
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveHealth(a.healthServer) })

return nil
}
Expand Down Expand Up @@ -197,7 +204,7 @@ func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error {
}
}

adminServer := &http.Server{
a.adminServer = &http.Server{
Addr: net.JoinHostPort(o.AdminBindAddress, strconv.Itoa(o.AdminServerPort)),
Handler: muxHandler,
MaxHeaderBytes: 1 << 20,
Expand All @@ -208,7 +215,7 @@ func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error {
"core", "adminListener",
"port", strconv.Itoa(o.AdminServerPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveAdmin(adminServer) })
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveAdmin(a.adminServer) })

return nil
}
Expand All @@ -220,3 +227,8 @@ func (a *Agent) serveAdmin(adminServer *http.Server) {
}
klog.V(0).Infoln("Admin server stopped listening")
}

// ClientSet exposes internal state for testing.
func (a *Agent) ClientSet() *agent.ClientSet {
return a.cs
}
2 changes: 1 addition & 1 deletion cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (o *ProxyRunOptions) Validate() error {
return fmt.Errorf("error checking cluster CA cert %s, got %v", o.ClusterCaCert, err)
}
}
if o.Mode != "grpc" && o.Mode != "http-connect" {
if o.Mode != server.ModeGRPC && o.Mode != server.ModeHTTPConnect {
return fmt.Errorf("mode must be set to either 'grpc' or 'http-connect' not %q", o.Mode)
}
if o.UdsName != "" {
Expand Down
47 changes: 31 additions & 16 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
Use: "proxy",
Long: `A gRPC proxy server, receives requests from the API server and forwards to the agent.`,
RunE: func(cmd *cobra.Command, args []string) error {
return p.run(o)
stopCh := SetupSignalHandler()
return p.Run(o, stopCh)
},
}

Expand All @@ -81,11 +82,16 @@ func tlsCipherSuites(cipherNames []string) []uint16 {
}

type Proxy struct {
agentServer *grpc.Server
adminServer *http.Server
healthServer *http.Server

server *server.ProxyServer
}

type StopFunc func()

func (p *Proxy) run(o *options.ProxyRunOptions) error {
func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
o.Print()
if err := o.Validate(); err != nil {
return fmt.Errorf("failed to validate server options with %v", err)
Expand Down Expand Up @@ -126,37 +132,40 @@ func (p *Proxy) run(o *options.ProxyRunOptions) error {
if err != nil {
return err
}
server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt)
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt)

frontendStop, err := p.runFrontendServer(ctx, o, server)
frontendStop, err := p.runFrontendServer(ctx, o, p.server)
if err != nil {
return fmt.Errorf("failed to run the frontend server: %v", err)
}
if frontendStop != nil {
defer frontendStop()
}

klog.V(1).Infoln("Starting agent server for tunnel connections.")
err = p.runAgentServer(o, server)
err = p.runAgentServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the agent server: %v", err)
}
defer p.agentServer.Stop()

klog.V(1).Infoln("Starting admin server for debug connections.")
err = p.runAdminServer(o, server)
err = p.runAdminServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the admin server: %v", err)
}
defer p.adminServer.Close()

klog.V(1).Infoln("Starting health server for healthchecks.")
err = p.runHealthServer(o, server)
err = p.runHealthServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the health server: %v", err)
}
defer p.healthServer.Close()

stopCh := SetupSignalHandler()
<-stopCh
klog.V(1).Infoln("Shutting down server.")

if frontendStop != nil {
frontendStop()
}

return nil
}

Expand Down Expand Up @@ -379,6 +388,7 @@ func (p *Proxy) runAgentServer(o *options.ProxyRunOptions, server *server.ProxyS
"port", strconv.FormatUint(uint64(o.AgentPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
p.agentServer = grpcServer

return nil
}
Expand All @@ -396,7 +406,7 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS
runtime.SetBlockProfileRate(1)
}
}
adminServer := &http.Server{
p.adminServer = &http.Server{
Addr: net.JoinHostPort(o.AdminBindAddress, strconv.Itoa(o.AdminPort)),
Handler: muxHandler,
MaxHeaderBytes: 1 << 20,
Expand All @@ -408,7 +418,7 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS
"port", strconv.FormatUint(uint64(o.AdminPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
err := adminServer.ListenAndServe()
err := p.adminServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "admin server could not listen")
}
Expand Down Expand Up @@ -438,7 +448,7 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy
// "/ready" is deprecated but being maintained for backward compatibility
muxHandler.HandleFunc("/ready", readinessHandler)
muxHandler.HandleFunc("/readyz", readinessHandler)
healthServer := &http.Server{
p.healthServer = &http.Server{
Addr: net.JoinHostPort(o.HealthBindAddress, strconv.Itoa(o.HealthPort)),
Handler: muxHandler,
MaxHeaderBytes: 1 << 20,
Expand All @@ -450,7 +460,7 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy
"port", strconv.FormatUint(uint64(o.HealthPort), 10),
)
go runpprof.Do(context.Background(), labels, func(context.Context) {
err := healthServer.ListenAndServe()
err := p.healthServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "health server could not listen")
}
Expand All @@ -459,3 +469,8 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy

return nil
}

// ProxyServer exposes internal state for testing.
func (p *Proxy) ProxyServer() *server.ProxyServer {
return p.server
}
1 change: 0 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
)

func main() {
// flag.CommandLine.Parse(os.Args[1:])
proxy := &app.Proxy{}
o := options.NewProxyRunOptions()
command := app.NewProxyCommand(proxy, o)
Expand Down
9 changes: 7 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func (g *GrpcFrontend) Recv() (*client.Packet, error) {
return pkt, nil
}

const (
ModeGRPC = "grpc"
ModeHTTPConnect = "http-connect"
)

type ProxyClientConnection struct {
Mode string
HTTP io.ReadWriter
Expand All @@ -107,10 +112,10 @@ const (

func (c *ProxyClientConnection) send(pkt *client.Packet) error {
defer func(start time.Time) { metrics.Metrics.ObserveFrontendWriteLatency(time.Since(start)) }(time.Now())
if c.Mode == "grpc" {
if c.Mode == ModeGRPC {
return c.frontend.Send(pkt)
}
if c.Mode == "http-connect" {
if c.Mode == ModeHTTPConnect {
if pkt.Type == client.PacketType_CLOSE_RSP {
return c.CloseHTTP()
} else if pkt.Type == client.PacketType_DIAL_CLS {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
closed := make(chan struct{})
connected := make(chan struct{})
connection := &ProxyClientConnection{
Mode: "http-connect",
Mode: ModeHTTPConnect,
HTTP: io.ReadWriter(conn), // pass as ReadWriter so the caller must close with CloseHTTP
CloseHTTP: func() error {
closeOnce.Do(func() { conn.Close() })
Expand Down
8 changes: 3 additions & 5 deletions tests/agent_disconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"testing"
"time"

"google.golang.org/grpc"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
"sigs.k8s.io/apiserver-network-proxy/tests/framework"
)

Expand Down Expand Up @@ -92,7 +90,7 @@ func TestProxy_Agent_Disconnect_Persistent_Connection(t *testing.T) {
}
}

func TestProxy_Agent_Reconnect(t *testing.T) {
func TestAgentRestartReconnect(t *testing.T) {
testcases := []struct {
name string
proxyServerFunction func(testing.TB) framework.ProxyServer
Expand Down Expand Up @@ -176,7 +174,7 @@ func clientRequest(c *http.Client, addr string) ([]byte, error) {
}

func createGrpcTunnelClient(ctx context.Context, proxyAddr, addr string) (*http.Client, error) {
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, proxyAddr, grpc.WithInsecure())
tunnel, err := createSingleUseGrpcTunnel(ctx, proxyAddr)
if err != nil {
return nil, err
}
Expand All @@ -192,7 +190,7 @@ func createGrpcTunnelClient(ctx context.Context, proxyAddr, addr string) (*http.
}

func createHTTPConnectClient(ctx context.Context, proxyAddr, addr string) (*http.Client, error) {
conn, err := net.Dial("tcp", proxyAddr)
conn, err := net.Dial("unix", proxyAddr)
if err != nil {
return nil, err
}
Expand Down
7 changes: 2 additions & 5 deletions tests/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"net/http"
"net/http/httptest"
"testing"

"google.golang.org/grpc"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
)

func BenchmarkLargeResponse_GRPC(b *testing.B) {
Expand Down Expand Up @@ -53,7 +50,7 @@ func BenchmarkLargeResponse_GRPC(b *testing.B) {

for n := 0; n < b.N; n++ {
// run test client
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr())
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -122,7 +119,7 @@ func BenchmarkLargeRequest_GRPC(b *testing.B) {
req.Close = true
for n := 0; n < b.N; n++ {
// run test client
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr())
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 1 addition & 3 deletions tests/concurrent_client_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"testing"
"time"

"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
)

type simpleServer struct {
Expand All @@ -52,7 +50,7 @@ func (s *simpleServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// TODO: test http-connect as well.
func getTestClient(front string, t *testing.T) *http.Client {
ctx := context.Background()
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, front, grpc.WithInsecure())
tunnel, err := createSingleUseGrpcTunnel(ctx, front)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 1 addition & 3 deletions tests/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync"
"testing"

"google.golang.org/grpc"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
"sigs.k8s.io/apiserver-network-proxy/tests/framework"
)

Expand All @@ -48,7 +46,7 @@ func TestProxy_ConcurrencyGRPC(t *testing.T) {
defer wg.Done()

// run test client
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr())
if err != nil {
t.Error(err)
return
Expand Down

0 comments on commit ab4baf8

Please sign in to comment.