Skip to content

Commit

Permalink
feat(core): Service Level Child Loggers (#740)
Browse files Browse the repository at this point in the history
- Adds child loggers to services
- Closes #710

---------

Co-authored-by: Sean Trantalis <strantalis@virtru.com>
  • Loading branch information
pbacon-blaber and strantalis committed May 7, 2024
1 parent 90d8fe5 commit aa0f210
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 78 deletions.
24 changes: 13 additions & 11 deletions service/authorization/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
otdf "github.com/opentdf/platform/sdk"
"github.com/opentdf/platform/service/internal/access"
"github.com/opentdf/platform/service/internal/entitlements"
"github.com/opentdf/platform/service/internal/logger"
"github.com/opentdf/platform/service/internal/opa"
"github.com/opentdf/platform/service/pkg/db"
"github.com/opentdf/platform/service/pkg/serviceregistry"
Expand All @@ -31,6 +32,7 @@ type AuthorizationService struct { //nolint:revive // AuthorizationService is a
eng *opa.Engine
sdk *otdf.SDK
ersURL string
logger *logger.Logger
tokenSource *oauth2.TokenSource
}

Expand Down Expand Up @@ -108,15 +110,15 @@ var retrieveEntitlements = func(ctx context.Context, req *authorization.GetEntit
}

func (as *AuthorizationService) GetDecisions(ctx context.Context, req *authorization.GetDecisionsRequest) (*authorization.GetDecisionsResponse, error) {
slog.DebugContext(ctx, "getting decisions")
as.logger.DebugContext(ctx, "getting decisions")

// Temporary canned echo response with permit decision for all requested decision/entity/ra combos
rsp := &authorization.GetDecisionsResponse{
DecisionResponses: make([]*authorization.DecisionResponse, 0),
}
for _, dr := range req.GetDecisionRequests() {
for _, ra := range dr.GetResourceAttributes() {
slog.DebugContext(ctx, "getting resource attributes", slog.String("FQNs", strings.Join(ra.GetAttributeValueFqns(), ", ")))
as.logger.DebugContext(ctx, "getting resource attributes", slog.String("FQNs", strings.Join(ra.GetAttributeValueFqns(), ", ")))

// get attribute definition/value combinations
dataAttrDefsAndVals, err := retrieveAttributeDefinitions(ctx, ra, as.sdk)
Expand Down Expand Up @@ -207,11 +209,11 @@ func (as *AuthorizationService) GetDecisions(ctx context.Context, req *authoriza
}

func (as *AuthorizationService) GetEntitlements(ctx context.Context, req *authorization.GetEntitlementsRequest) (*authorization.GetEntitlementsResponse, error) {
slog.DebugContext(ctx, "getting entitlements")
as.logger.DebugContext(ctx, "getting entitlements")
// Scope is required for because of performance. Remove and handle 360 no scope
// https://github.com/opentdf/platform/issues/365
if req.GetScope() == nil {
slog.ErrorContext(ctx, "requires scope")
as.logger.ErrorContext(ctx, "requires scope")
return nil, errors.New(db.ErrTextFqnMissingValue)
}
// get subject mappings
Expand All @@ -226,9 +228,9 @@ func (as *AuthorizationService) GetEntitlements(ctx context.Context, req *author
return nil, err
}
subjectMappings := avf.GetFqnAttributeValues()
slog.DebugContext(ctx, "retrieved from subject mappings service", slog.Any("subject_mappings: ", subjectMappings))
as.logger.DebugContext(ctx, "retrieved from subject mappings service", slog.Any("subject_mappings: ", subjectMappings))
if req.Entities == nil {
slog.ErrorContext(ctx, "requires entities")
as.logger.ErrorContext(ctx, "requires entities")
return nil, errors.New("entity chain is required")
}
rsp := &authorization.GetEntitlementsResponse{
Expand All @@ -245,7 +247,7 @@ func (as *AuthorizationService) GetEntitlements(ctx context.Context, req *author
if err != nil {
return nil, err
}
slog.DebugContext(ctx, "entitlements", "entity_id", entity.GetId(), "input", fmt.Sprintf("%+v", in))
as.logger.DebugContext(ctx, "entitlements", "entity_id", entity.GetId(), "input", fmt.Sprintf("%+v", in))
// uncomment for debugging
// if slog.Default().Enabled(ctx, slog.LevelDebug) {
// _ = json.NewEncoder(os.Stdout).Encode(in)
Expand All @@ -272,15 +274,15 @@ func (as *AuthorizationService) GetEntitlements(ctx context.Context, req *author
// }
results, ok := decision.Result.([]interface{})
if !ok {
slog.DebugContext(ctx, "not ok", "entity_id", entity.GetId(), "decision.Result", fmt.Sprintf("%+v", decision.Result))
as.logger.DebugContext(ctx, "not ok", "entity_id", entity.GetId(), "decision.Result", fmt.Sprintf("%+v", decision.Result))
return nil, err
}
slog.DebugContext(ctx, "opa results", "entity_id", entity.GetId(), "results", fmt.Sprintf("%+v", results))
as.logger.DebugContext(ctx, "opa results", "entity_id", entity.GetId(), "results", fmt.Sprintf("%+v", results))
saa := make([]string, len(results))
for k, v := range results {
str, okk := v.(string)
if !okk {
slog.DebugContext(ctx, "not ok", slog.String("entity_id", entity.GetId()), slog.String(strconv.Itoa(k), fmt.Sprintf("%+v", v)))
as.logger.DebugContext(ctx, "not ok", slog.String("entity_id", entity.GetId()), slog.String(strconv.Itoa(k), fmt.Sprintf("%+v", v)))
}
saa[k] = str
}
Expand All @@ -289,7 +291,7 @@ func (as *AuthorizationService) GetEntitlements(ctx context.Context, req *author
AttributeValueFqns: saa,
}
}
slog.DebugContext(ctx, "opa", "rsp", fmt.Sprintf("%+v", rsp))
as.logger.DebugContext(ctx, "opa", "rsp", fmt.Sprintf("%+v", rsp))
return rsp, nil
}

Expand Down
26 changes: 12 additions & 14 deletions service/authorization/authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"fmt"
"log/slog"
"os"
"testing"

"github.com/opentdf/platform/protocol/go/authorization"
"github.com/opentdf/platform/protocol/go/policy"
attr "github.com/opentdf/platform/protocol/go/policy/attributes"
otdf "github.com/opentdf/platform/sdk"
"github.com/opentdf/platform/service/internal/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -36,20 +36,17 @@ func mockRetrieveEntitlements(ctx context.Context, _ *authorization.GetEntitleme
return &entitlementsResponse, nil
}

func showLogsInTest() {
logLevel := &slog.LevelVar{} // INFO
logLevel.Set(slog.LevelDebug)

opts := &slog.HandlerOptions{
Level: logLevel,
func createTestLogger() (*logger.Logger, error) {
logger, err := logger.NewLogger(logger.Config{Level: "debug", Output: "stdout", Type: "json"})
if err != nil {
return nil, err
}
logger := slog.New(slog.NewJSONHandler(os.Stdout, opts))

slog.SetDefault(logger)
return logger, nil
}

func Test_GetDecisionsAllOf_Pass(t *testing.T) {
showLogsInTest()
logger, err := createTestLogger()
require.NoError(t, err)

retrieveAttributeDefinitions = mockRetrieveAttributeDefinitions
retrieveEntitlements = mockRetrieveEntitlements
Expand Down Expand Up @@ -104,7 +101,7 @@ func Test_GetDecisionsAllOf_Pass(t *testing.T) {
},
}}

as := AuthorizationService{}
as := AuthorizationService{logger: logger}
retrieveEntitlements = mockRetrieveEntitlements
ctxb := context.Background()

Expand Down Expand Up @@ -169,7 +166,8 @@ func Test_GetDecisionsAllOf_Pass(t *testing.T) {
}

func Test_GetDecisions_AllOf_Fail(t *testing.T) {
showLogsInTest()
logger, err := createTestLogger()
require.NoError(t, err)

retrieveAttributeDefinitions = mockRetrieveAttributeDefinitions
retrieveEntitlements = mockRetrieveEntitlements
Expand Down Expand Up @@ -233,7 +231,7 @@ func Test_GetDecisions_AllOf_Fail(t *testing.T) {
},
}}

as := AuthorizationService{}
as := AuthorizationService{logger: logger}
ctxb := context.Background()

resp, err := as.GetDecisions(ctxb, &req)
Expand Down
8 changes: 5 additions & 3 deletions service/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/opentdf/platform/service/internal/logger"
"github.com/opentdf/platform/service/pkg/db"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"google.golang.org/grpc/codes"
Expand All @@ -14,7 +15,8 @@ import (

type HealthService struct { //nolint:revive // HealthService is a valid name for this struct
healthpb.UnimplementedHealthServer
db *db.Client
db *db.Client
logger *logger.Logger
}

func NewRegistration() serviceregistry.Registration {
Expand All @@ -31,7 +33,7 @@ func NewRegistration() serviceregistry.Registration {
if err != nil {
panic(err)
}
return &HealthService{db: srp.DBClient}, func(_ context.Context, _ *runtime.ServeMux, _ any) error {
return &HealthService{db: srp.DBClient, logger: srp.Logger}, func(_ context.Context, _ *runtime.ServeMux, _ any) error {
return nil
}
},
Expand All @@ -49,7 +51,7 @@ func (s HealthService) Check(ctx context.Context, req *healthpb.HealthCheckReque
if req.GetService() == "readiness" {
// Check the database connection
if err := s.db.Pgx.Ping(ctx); err != nil {
slog.Error("database connection is not ready", slog.String("error", err.Error()))
s.logger.Error("database connection is not ready", slog.String("error", err.Error()))
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_NOT_SERVING,
}, nil
Expand Down
6 changes: 6 additions & 0 deletions service/internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ type Config struct {
Type string `yaml:"type" default:"json"`
}

func (l *Logger) With(key string, value string) *Logger {
return &Logger{
Logger: l.Logger.With(key, value),
}
}

func NewLogger(config Config) (*Logger, error) {
var logger *slog.Logger

Expand Down
28 changes: 15 additions & 13 deletions service/pkg/server/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/opentdf/platform/service/entityresolution"
"github.com/opentdf/platform/service/health"
"github.com/opentdf/platform/service/internal/config"
"github.com/opentdf/platform/service/internal/logger"
"github.com/opentdf/platform/service/internal/opa"
"github.com/opentdf/platform/service/internal/server"
"github.com/opentdf/platform/service/kas"
Expand Down Expand Up @@ -38,17 +39,17 @@ func registerServices() error {
return nil
}

func startServices(ctx context.Context, cfg config.Config, otdf *server.OpenTDFServer, eng *opa.Engine, client *sdk.SDK) (func(), []serviceregistry.Service, error) {
func startServices(ctx context.Context, cfg config.Config, otdf *server.OpenTDFServer, eng *opa.Engine, client *sdk.SDK, logger *logger.Logger) (func(), []serviceregistry.Service, error) {
// CloseServices is a function that will close all registered services
closeServices := func() {
slog.Info("stopping services")
logger.Info("stopping services")
for ns, registers := range serviceregistry.RegisteredServices {
for _, r := range registers {
// Only report on started services
if !r.Started {
continue
}
slog.Info("stopping service", slog.String("namespace", ns), slog.String("service", r.ServiceDesc.ServiceName))
logger.Info("stopping service", slog.String("namespace", ns), slog.String("service", r.ServiceDesc.ServiceName))
if r.Close != nil {
r.Close()
}
Expand All @@ -62,15 +63,15 @@ func startServices(ctx context.Context, cfg config.Config, otdf *server.OpenTDFS
for ns, registers := range serviceregistry.RegisteredServices {
// Check if the service is enabled
if !cfg.Services[ns].Enabled {
slog.Debug("start service skipped", slog.String("namespace", ns))
logger.Debug("start service skipped", slog.String("namespace", ns))
continue
}

// Use a single database client per namespace
var d *db.Client

for _, r := range registers {
s, err := startService(ctx, cfg, r, otdf, eng, client, d)
s, err := startService(ctx, cfg, r, otdf, eng, client, d, logger)
if err != nil {
return closeServices, services, err
}
Expand All @@ -81,7 +82,7 @@ func startServices(ctx context.Context, cfg config.Config, otdf *server.OpenTDFS
return closeServices, services, nil
}

func startService(ctx context.Context, cfg config.Config, s serviceregistry.Service, otdf *server.OpenTDFServer, eng *opa.Engine, client *sdk.SDK, d *db.Client) (serviceregistry.Service, error) {
func startService(ctx context.Context, cfg config.Config, s serviceregistry.Service, otdf *server.OpenTDFServer, eng *opa.Engine, client *sdk.SDK, d *db.Client, logger *logger.Logger) (serviceregistry.Service, error) {
// Create the database client if required
if s.DB.Required && d == nil {
var err error
Expand All @@ -91,7 +92,7 @@ func startService(ctx context.Context, cfg config.Config, s serviceregistry.Serv
// config at the NS layer. This poses a problem where services under a NS want to share a
// database connection.
// TODO: this should be reassessed with how we handle registering a single namespace
slog.Info("creating database client", slog.String("namespace", s.Namespace))
logger.Info("creating database client", slog.String("namespace", s.Namespace))
// Make sure we only create a single db client per namespace
d, err = db.New(ctx, cfg.DB,
db.WithService(s.Namespace),
Expand All @@ -108,16 +109,16 @@ func startService(ctx context.Context, cfg config.Config, s serviceregistry.Serv
return s, fmt.Errorf("migrations FS is required when runMigrations is enabled")
}

slog.Info("running database migrations")
logger.Info("running database migrations")
appliedMigrations, err := d.RunMigrations(ctx, s.DB.Migrations)
if err != nil {
return s, fmt.Errorf("issue running database migrations: %w", err)
}
slog.Info("database migrations complete",
logger.Info("database migrations complete",
slog.Int("applied", appliedMigrations),
)
} else {
slog.Info("skipping migrations",
logger.Info("skipping migrations",
slog.String("namespace", s.Namespace),
slog.String("reason", "runMigrations is false"),
slog.Bool("runMigrations", false),
Expand All @@ -132,6 +133,7 @@ func startService(ctx context.Context, cfg config.Config, s serviceregistry.Serv
Engine: eng,
SDK: client,
WellKnownConfig: wellknown.RegisterConfiguration,
Logger: logger.With("namespace", s.Namespace),
})

// Register the service with the gRPC server
Expand All @@ -142,15 +144,15 @@ func startService(ctx context.Context, cfg config.Config, s serviceregistry.Serv

// Register the service with the gRPC gateway
if err := handler(ctx, otdf.Mux, impl); err != nil {
slog.Error("failed to start service", slog.String("namespace", s.Namespace), slog.String("error", err.Error()))
logger.Error("failed to start service", slog.String("namespace", s.Namespace), slog.String("error", err.Error()))
return s, err
}

slog.Info("started service", slog.String("namespace", s.Namespace), slog.String("service", s.ServiceDesc.ServiceName))
logger.Info("started service", slog.String("namespace", s.Namespace), slog.String("service", s.ServiceDesc.ServiceName))
s.Started = true
s.Close = func() {
if d != nil {
slog.Info("closing database client", slog.String("namespace", s.Namespace), slog.String("service", s.ServiceDesc.ServiceName))
logger.Info("closing database client", slog.String("namespace", s.Namespace), slog.String("service", s.ServiceDesc.ServiceName))
// TODO: this might be a problem if we can't call close on the db client multiple times
d.Close()
}
Expand Down
6 changes: 5 additions & 1 deletion service/pkg/server/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/opentdf/platform/service/internal/config"
"github.com/opentdf/platform/service/internal/logger"
"github.com/opentdf/platform/service/pkg/db"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -123,6 +124,9 @@ func (suite *ServiceTestSuite) TestStartServicesWithVariousCases() {
otdf, err := mockOpenTDFServer()
require.NoError(t, err)

logger, err := logger.NewLogger(logger.Config{Output: "stdout", Level: "info", Type: "json"})
require.NoError(t, err)

cF, services, err := startServices(ctx, config.Config{
DB: db.Config{
Host: "localhost",
Expand All @@ -143,7 +147,7 @@ func (suite *ServiceTestSuite) TestStartServicesWithVariousCases() {
Enabled: false,
},
},
}, otdf, nil, nil)
}, otdf, nil, nil, logger)
require.NoError(t, err)
require.NotNil(t, cF)
assert.Lenf(t, services, 2, "expected 2 services enabled, got %d", len(services))
Expand Down
2 changes: 1 addition & 1 deletion service/pkg/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Start(f ...StartOptions) error {
defer client.Close()

slog.Info("starting services")
closeServices, services, err := startServices(ctx, *conf, otdf, eng, client)
closeServices, services, err := startServices(ctx, *conf, otdf, eng, client, logger)
if err != nil {
slog.Error("issue starting services", slog.String("error", err.Error()))
return fmt.Errorf("issue starting services: %w", err)
Expand Down
6 changes: 5 additions & 1 deletion service/pkg/server/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/opentdf/platform/service/internal/auth"
"github.com/opentdf/platform/service/internal/config"
"github.com/opentdf/platform/service/internal/logger"
"github.com/opentdf/platform/service/internal/server"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -93,6 +94,9 @@ func (suite *StartTestSuite) Test_Start_When_Extra_Service_Registered_Expect_Res
s, err := mockOpenTDFServer()
require.NoError(t, err)

logger, err := logger.NewLogger(logger.Config{Output: "stdout", Level: "info", Type: "json"})
require.NoError(t, err)

// Register Test Service
registerTestService, _ := mockTestServiceRegistry(mockTestServiceOptions{
serviceObject: &TestService{},
Expand All @@ -114,7 +118,7 @@ func (suite *StartTestSuite) Test_Start_When_Extra_Service_Registered_Expect_Res
Enabled: true,
},
},
}, s, nil, nil)
}, s, nil, nil, logger)
require.NoError(t, err)

s.Start()
Expand Down

0 comments on commit aa0f210

Please sign in to comment.