Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query/tests] Use grpc.NewClient #5391

Merged
merged 24 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci-unit-tests.yml
Expand Up @@ -32,6 +32,10 @@ jobs:
go-version: 1.22.x
cache-dependency-path: ./go.sum

# download dependencies separately to keep unit test step's output cleaner
- name: go mod download
run: go mod download

- name: Install test deps
# even though the same target runs from test-ci, running it separately makes for cleaner log in GH workflow
run: make install-test-tools
Expand Down
5 changes: 1 addition & 4 deletions cmd/query/app/grpc_handler_test.go
Expand Up @@ -174,10 +174,7 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQu
}

func newGRPCClient(t *testing.T, addr string) *grpcClient {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

return &grpcClient{
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/server.go
Expand Up @@ -308,7 +308,7 @@ func (s *Server) Start() error {
if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) {
s.logger.Error("Could not start HTTP server", zap.Error(err))
}

s.logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort))
s.healthCheck.Set(healthcheck.Unavailable)
s.bgFinished.Done()
}()
Expand All @@ -321,6 +321,7 @@ func (s *Server) Start() error {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort))
s.healthCheck.Set(healthcheck.Unavailable)
s.bgFinished.Done()
}()
Expand Down
170 changes: 90 additions & 80 deletions cmd/query/app/server_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) {
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
}

_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg},
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.Error(t, err)
Expand All @@ -80,7 +81,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg},
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.Error(t, err)
Expand All @@ -94,7 +95,7 @@ func TestCreateTLSHttpServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg},
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.Error(t, err)
Expand Down Expand Up @@ -283,6 +284,27 @@ var testCases = []struct {
},
}

type fakeQueryService struct {
qs *querysvc.QueryService
spanReader *spanstoremocks.Reader
dependencyReader *depsmocks.Reader
expectedServices []string
}

func makeQuerySvc() *fakeQueryService {
spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)
qs := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
return &fakeQueryService{
qs: qs,
spanReader: spanReader,
dependencyReader: dependencyReader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dependencyReader used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not

expectedServices: expectedServices,
}
}

func TestServerHTTPTLS(t *testing.T) {
testlen := len(testCases)

Expand Down Expand Up @@ -331,29 +353,25 @@ func TestServerHTTPTLS(t *testing.T) {
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
flagsSvc.Logger = zaptest.NewLogger(t)

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs,
nil, serverOptions, tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

var clientError error
var clientClose func() error
var clientTLSCfg *tls.Config

if serverOptions.TLSHTTP.Enabled {

var err0 error

clientTLSCfg, err0 = test.clientTLS.Config(zap.NewNop())
clientTLSCfg, err0 = test.clientTLS.Config(flagsSvc.Logger)
defer test.clientTLS.Close()

require.NoError(t, err0)
Expand Down Expand Up @@ -390,8 +408,7 @@ func TestServerHTTPTLS(t *testing.T) {
TLSClientConfig: clientTLSCfg,
},
}
readMock := spanReader
readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).Return([]*model.Trace{mockTrace}, nil).Once()
querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once()
queryString := "/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms"
req, err := http.NewRequest(http.MethodGet, "https://localhost:"+fmt.Sprintf("%d", ports.QueryHTTP)+queryString, nil)
require.NoError(t, err)
Expand All @@ -408,24 +425,18 @@ func TestServerHTTPTLS(t *testing.T) {
require.NoError(t, err2)
}
}
server.Close()
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
})
}
}

func newGRPCClientWithTLS(t *testing.T, addr string, creds credentials.TransportCredentials) *grpcClient {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
var conn *grpc.ClientConn
var err error

if creds != nil {
// TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(creds))
conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
} else {
// TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

require.NoError(t, err)
Expand Down Expand Up @@ -481,69 +492,67 @@ func TestServerGRPCTLS(t *testing.T) {
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)
flagsSvc.Logger = zaptest.NewLogger(t)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs,
nil, serverOptions, tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

var clientError error
var client *grpcClient

if serverOptions.TLSGRPC.Enabled {
clientTLSCfg, err0 := test.clientTLS.Config(zap.NewNop())
clientTLSCfg, err0 := test.clientTLS.Config(flagsSvc.Logger)
require.NoError(t, err0)
defer test.clientTLS.Close()
creds := credentials.NewTLS(clientTLSCfg)
client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), creds)

} else {
client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), nil)
}
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

flagsSvc.Logger.Info("calling client.GetServices()")
res, clientError := client.GetServices(ctx, &api_v2.GetServicesRequest{})
flagsSvc.Logger.Info("returned from GetServices()")

if test.expectClientError {
require.Error(t, clientError)
} else {
require.NoError(t, clientError)
assert.Equal(t, expectedServices, res.Services)
assert.Equal(t, querySvc.expectedServices, res.Services)
}
require.NoError(t, client.conn.Close())
server.Close()
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
})
}
}

func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{
HTTPHostPort: "8080",
HTTPHostPort: "8080", // bad string, not :port
GRPCHostPort: "127.0.0.1:8081",
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())

require.Error(t, err)
_, err = NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,

_, err = NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{
HTTPHostPort: "127.0.0.1:8081",
GRPCHostPort: "9123",
GRPCHostPort: "9123", // bad string, not :port
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
Expand Down Expand Up @@ -571,7 +580,7 @@ func TestServerInUseHostPort(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server, err := NewServer(
zap.NewNop(),
zaptest.NewLogger(t),
healthcheck.New(),
&querysvc.QueryService{},
nil,
Expand All @@ -586,26 +595,18 @@ func TestServerInUseHostPort(t *testing.T) {
jtracer.NoOp(),
)
require.NoError(t, err)

err = server.Start()
require.Error(t, err)

require.Error(t, server.Start())
server.Close()
})
}
}

func TestServerSinglePort(t *testing.T) {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
flagsSvc.Logger = zaptest.NewLogger(t)
hostPort := ports.GetAddressFromCLIOptions(ports.QueryHTTP, "")
spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, nil,
&QueryOptions{
GRPCHostPort: hostPort,
HTTPHostPort: hostPort,
Expand All @@ -617,19 +618,22 @@ func TestServerSinglePort(t *testing.T) {
jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

client := newGRPCClient(t, hostPort)
defer client.conn.Close()
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
require.NoError(t, err)
assert.Equal(t, expectedServices, res.Services)

server.Close()
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
assert.Equal(t, querySvc.expectedServices, res.Services)
}

func TestServerGracefulExit(t *testing.T) {
Expand All @@ -641,20 +645,27 @@ func TestServerGracefulExit(t *testing.T) {
flagsSvc.Logger = zap.New(zapCore)
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := &querysvc.QueryService{}
tracer := jtracer.NoOp()

server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, nil,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort},
tenancy.NewManager(&tenancy.Options{}), tracer)
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())

// Wait for servers to come up before we can call .Close()
// TODO Find a way to wait only as long as necessary. Unconditional sleep slows down the tests.
time.Sleep(1 * time.Second)
server.Close()
{
client := newGRPCClient(t, hostPort)
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})
// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
require.NoError(t, err)
}

server.Close()
for _, logEntry := range logs.All() {
assert.NotEqual(t, zap.ErrorLevel, logEntry.Level,
"Error log found on server exit: %v", logEntry)
Expand Down Expand Up @@ -724,15 +735,15 @@ func TestServerHTTPTenancy(t *testing.T) {
},
}
tenancyMgr := tenancy.NewManager(&serverOptions.Tenancy)

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(zap.NewNop(), healthcheck.New(), querySvc,
querySvc := makeQuerySvc()
querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once()
server, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), querySvc.qs,
nil, serverOptions, tenancyMgr, jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -766,5 +777,4 @@ func TestServerHTTPTenancy(t *testing.T) {
}
})
}
server.Close()
}