Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS support for gRPC Query server #2297

Merged
merged 6 commits into from Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/all-in-one/main.go
Expand Up @@ -224,7 +224,10 @@ func startQuery(
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server := queryApp.NewServer(svc.Logger, qs, qOpts, opentracing.GlobalTracer())
server, err := queryApp.NewServer(svc.Logger, qs, qOpts, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
go func() {
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
Expand Down
10 changes: 10 additions & 0 deletions cmd/query/app/flags.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage"
)
Expand All @@ -47,6 +48,12 @@ const (
queryMaxClockSkewAdjust = "query.max-clock-skew-adjustment"
)

var tlsFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "query.grpc",
ShowEnabled: true,
ShowClientCA: true,
}

// QueryOptions holds configuration for query service
type QueryOptions struct {
// HostPort is the host:port address that the query service listens o n
Expand All @@ -59,6 +66,8 @@ type QueryOptions struct {
UIConfig string
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
// TLS configures secure transport
TLS tlscfg.Options
// AdditionalHeaders
AdditionalHeaders http.Header
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span
Expand All @@ -84,6 +93,7 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *Qu
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
qOpts.TLS = tlsFlagsConfig.InitFromViper(v)
qOpts.MaxClockSkewAdjust = v.GetDuration(queryMaxClockSkewAdjust)

stringSlice := v.GetStringSlice(queryAdditionalHeaders)
Expand Down
35 changes: 28 additions & 7 deletions cmd/query/app/server.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
Expand All @@ -47,28 +48,48 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) *Server {
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
if err != nil {
return nil, err
}

return &Server{
logger: logger,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: createGRPCServer(querySvc, logger, tracer),
grpcServer: grpcServer,
httpServer: createHTTPServer(querySvc, options, tracer, logger),
unavailableChannel: make(chan healthcheck.Status),
}
}, nil
}

// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *grpc.Server {
srv := grpc.NewServer()
func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
var server *grpc.Server

if options.TLS.Enabled {
// user requested a server with TLS, setup creds
jan25 marked this conversation as resolved.
Show resolved Hide resolved
tlsCfg, err := options.TLS.Config()
if err != nil {
return nil, err
}

creds := credentials.NewTLS(tlsCfg)
server = grpc.NewServer(grpc.Creds(creds))
jan25 marked this conversation as resolved.
Show resolved Hide resolved
} else {
// server without TLS
server = grpc.NewServer()
}

handler := NewGRPCHandler(querySvc, logger, tracer)
api_v2.RegisterQueryServiceServer(srv, handler)
return srv
api_v2.RegisterQueryServiceServer(server, handler)
return server, nil
}

func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) *http.Server {
Expand Down
9 changes: 6 additions & 3 deletions cmd/query/app/server_test.go
Expand Up @@ -55,9 +55,10 @@ func TestServer(t *testing.T) {

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

server := NewServer(flagsSvc.Logger, querySvc,
server, err := NewServer(flagsSvc.Logger, querySvc,
&QueryOptions{HostPort: hostPort, BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
for s := range server.HealthCheckStatus() {
Expand Down Expand Up @@ -95,7 +96,8 @@ func TestServerGracefulExit(t *testing.T) {

querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}
server := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ports.PortToHostPort(ports.QueryAdminHTTP)}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ports.PortToHostPort(ports.QueryAdminHTTP)}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
for s := range server.HealthCheckStatus() {
Expand All @@ -121,7 +123,8 @@ func TestServerHandlesPortZero(t *testing.T) {

querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}
server := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ":0"}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ":0"}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
server.Close()

Expand Down
6 changes: 5 additions & 1 deletion cmd/query/main.go
Expand Up @@ -106,7 +106,11 @@ func main() {
dependencyReader,
*queryServiceOptions)

server := app.NewServer(svc.Logger, queryService, queryOpts, tracer)
server, err := app.NewServer(svc.Logger, queryService, queryOpts, tracer)
if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))
}

go func() {
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
Expand Down