diff --git a/Makefile b/Makefile index 32bb1b4e0..6fac510a0 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ test: # call with TESTCONTAINERS_RYUK_DISABLED="true" to avoid problems with podman on Macs e2e: - go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=2m -tags=e2e {} + go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=3m -tags=e2e {} lint: go install -v github.com/golangci/golangci-lint/v2/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION) diff --git a/providers/flagd/e2e/inprocess_test.go b/providers/flagd/e2e/inprocess_test.go index d4767e4e3..19eb40e60 100644 --- a/providers/flagd/e2e/inprocess_test.go +++ b/providers/flagd/e2e/inprocess_test.go @@ -3,6 +3,7 @@ package e2e import ( + flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg" "testing" "github.com/open-feature/go-sdk-contrib/tests/flagd/testframework" @@ -17,6 +18,9 @@ func TestInProcessProviderE2E(t *testing.T) { runner := testframework.NewTestbedRunner(testframework.TestbedConfig{ ResolverType: testframework.InProcess, TestbedConfig: "default", + ExtraOptions: []flagd.ProviderOption{ + flagd.WithRetryBackoffMaxMs(5000), + }, }) defer runner.Cleanup() diff --git a/providers/flagd/e2e/rpc_test.go b/providers/flagd/e2e/rpc_test.go index f3c58b9ab..994c3627c 100644 --- a/providers/flagd/e2e/rpc_test.go +++ b/providers/flagd/e2e/rpc_test.go @@ -26,7 +26,7 @@ func TestRPCProviderE2E(t *testing.T) { } // Run tests with RPC-specific tags - exclude unimplemented scenarios - tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching" + tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching && ~@forbidden" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/flagd-testbed b/providers/flagd/flagd-testbed index b62f5dbe8..1759a9491 160000 --- a/providers/flagd/flagd-testbed +++ b/providers/flagd/flagd-testbed @@ -1 +1 @@ -Subproject commit b62f5dbe860ecf4f36ec757dfdc0b38f7b3dec6e +Subproject commit 1759a9491388e6c9714d5c048916120f77c8d892 diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index ca758d38d..2b652e696 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -26,6 +26,9 @@ const ( defaultHost = "localhost" defaultResolver = rpc defaultGracePeriod = 5 + defaultRetryBackoffMs = 1000 + defaultRetryBackoffMaxMs = 120000 + defaultFatalStatusCodes = "UNAUTHENTICATED,PERMISSION_DENIED" rpc ResolverType = "rpc" inProcess ResolverType = "in-process" @@ -45,6 +48,9 @@ const ( flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI" flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD" + flagdRetryBackoffMsVariableName = "FLAGD_RETRY_BACKOFF_MS" + flagdRetryBackoffMaxMsVariableName = "FLAGD_RETRY_BACKOFF_MAX_MS" + flagdFatalStatusCodesVariableName = "FLAGD_FATAL_STATUS_CODES" ) type ProviderConfiguration struct { @@ -66,6 +72,9 @@ type ProviderConfiguration struct { CustomSyncProviderUri string GrpcDialOptionsOverride []grpc.DialOption RetryGracePeriod int + RetryBackoffMs int + RetryBackoffMaxMs int + FatalStatusCodes []string log logr.Logger } @@ -80,6 +89,9 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration { Resolver: defaultResolver, Tls: defaultTLS, RetryGracePeriod: defaultGracePeriod, + RetryBackoffMs: defaultRetryBackoffMs, + RetryBackoffMaxMs: defaultRetryBackoffMaxMs, + FatalStatusCodes: strings.Split(defaultFatalStatusCodes, ","), } p.updateFromEnvVar() @@ -130,6 +142,7 @@ func validateProviderConfiguration(p *ProviderConfiguration) error { // updateFromEnvVar is a utility to update configurations based on current environment variables func (cfg *ProviderConfiguration) updateFromEnvVar() { + portS := os.Getenv(flagdPortEnvironmentVariableName) if portS != "" { port, err := strconv.Atoi(portS) @@ -159,17 +172,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { cfg.CertPath = certificatePath } - if maxCacheSizeS := os.Getenv(flagdMaxCacheSizeEnvironmentVariableName); maxCacheSizeS != "" { - maxCacheSizeFromEnv, err := strconv.Atoi(maxCacheSizeS) - if err != nil { - cfg.log.Error(err, - fmt.Sprintf("invalid env config for %s provided, using default value: %d", - flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, - )) - } else { - cfg.MaxCacheSize = maxCacheSizeFromEnv - } - } + cfg.MaxCacheSize = getIntFromEnvVarOrDefault(flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, cfg.log) if cacheValue := os.Getenv(flagdCacheEnvironmentVariableName); cacheValue != "" { switch cache.Type(cacheValue) { @@ -185,18 +188,8 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { } } - if maxEventStreamRetriesS := os.Getenv( - flagdMaxEventStreamRetriesEnvironmentVariableName); maxEventStreamRetriesS != "" { - - maxEventStreamRetries, err := strconv.Atoi(maxEventStreamRetriesS) - if err != nil { - cfg.log.Error(err, - fmt.Sprintf("invalid env config for %s provided, using default value: %d", - flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries)) - } else { - cfg.EventStreamConnectionMaxAttempts = maxEventStreamRetries - } - } + cfg.EventStreamConnectionMaxAttempts = getIntFromEnvVarOrDefault( + flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries, cfg.log) if resolver := os.Getenv(flagdResolverEnvironmentVariableName); resolver != "" { switch strings.ToLower(resolver) { @@ -227,17 +220,34 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" { cfg.TargetUri = targetUri } - if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" { - if seconds, err := strconv.Atoi(gracePeriod); err == nil { - cfg.RetryGracePeriod = seconds + + cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log) + cfg.RetryBackoffMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMsVariableName, defaultRetryBackoffMs, cfg.log) + cfg.RetryBackoffMaxMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMaxMsVariableName, defaultRetryBackoffMaxMs, cfg.log) + + if fatalStatusCodes := os.Getenv(flagdFatalStatusCodesVariableName); fatalStatusCodes != "" { + cfg.FatalStatusCodes = strings.Split(fatalStatusCodes, ",") + } +} + +// Helper + +func getIntFromEnvVarOrDefault(envVarName string, defaultValue int, log logr.Logger) int { + if valueFromEnv := os.Getenv(envVarName); valueFromEnv != "" { + intValue, err := strconv.Atoi(valueFromEnv) + if err != nil { + log.Error(err, + fmt.Sprintf("invalid env config for %s provided, using default value: %d", + envVarName, defaultValue, + )) } else { - // Handle parsing error - cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod)) + return intValue } } - + return defaultValue } + // ProviderOptions type ProviderOption func(*ProviderConfiguration) @@ -415,3 +425,25 @@ func WithRetryGracePeriod(gracePeriod int) ProviderOption { p.RetryGracePeriod = gracePeriod } } + +// WithRetryBackoffMs sets the initial backoff duration (in milliseconds) for retrying failed connections +func WithRetryBackoffMs(retryBackoffMs int) ProviderOption { + return func(p *ProviderConfiguration) { + p.RetryBackoffMs = retryBackoffMs + } +} + +// WithRetryBackoffMaxMs sets the maximum backoff duration (in milliseconds) for retrying failed connections +func WithRetryBackoffMaxMs(retryBackoffMaxMs int) ProviderOption { + return func(p *ProviderConfiguration) { + p.RetryBackoffMaxMs = retryBackoffMaxMs + } +} + +// WithFatalStatusCodes allows to set a list of gRPC status codes, which will cause streams to give up +// and put the provider in a PROVIDER_FATAL state +func WithFatalStatusCodes(fatalStatusCodes []string) ProviderOption { + return func(p *ProviderConfiguration) { + p.FatalStatusCodes = fatalStatusCodes + } +} \ No newline at end of file diff --git a/providers/flagd/pkg/provider.go b/providers/flagd/pkg/provider.go index 1742f2f15..f2a199184 100644 --- a/providers/flagd/pkg/provider.go +++ b/providers/flagd/pkg/provider.go @@ -74,6 +74,9 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) { CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri, GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride, RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod, + RetryBackOffMs: provider.providerConfiguration.RetryBackoffMs, + RetryBackOffMaxMs: provider.providerConfiguration.RetryBackoffMaxMs, + FatalStatusCodes: provider.providerConfiguration.FatalStatusCodes, }) default: service = process.NewInProcessService(process.Configuration{ diff --git a/providers/flagd/pkg/service/in_process/grpc_config.go b/providers/flagd/pkg/service/in_process/grpc_config.go new file mode 100644 index 000000000..6ab6e6137 --- /dev/null +++ b/providers/flagd/pkg/service/in_process/grpc_config.go @@ -0,0 +1,67 @@ +package process + +import ( + "encoding/json" + "strings" + "time" +) + +const ( + // Default timeouts and retry intervals + defaultKeepaliveTime = 30 * time.Second + defaultKeepaliveTimeout = 5 * time.Second +) + +type RetryPolicy struct { + MaxAttempts int `json:"MaxAttempts"` + InitialBackoff string `json:"InitialBackoff"` + MaxBackoff string `json:"MaxBackoff"` + BackoffMultiplier float64 `json:"BackoffMultiplier"` + RetryableStatusCodes []string `json:"RetryableStatusCodes"` +} + +func (g *Sync) buildRetryPolicy() string { + var policy = map[string]interface{}{ + "methodConfig": []map[string]interface{}{ + { + "name": []map[string]string{ + {"service": "flagd.sync.v1.FlagSyncService"}, + }, + "retryPolicy": RetryPolicy{ + MaxAttempts: 3, + InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(), + MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(), + BackoffMultiplier: 2.0, + RetryableStatusCodes: []string{"UNKNOWN","UNAVAILABLE"}, + }, + }, + }, + } + retryPolicyBytes, _ := json.Marshal(policy) + retryPolicy := string(retryPolicyBytes) + + return retryPolicy +} + +// Set of non-retryable gRPC status codes for faster lookup +var nonRetryableCodes map[string]struct{} + +// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup +func (g *Sync) initNonRetryableStatusCodesSet() { + nonRetryableCodes = make(map[string]struct{}) + for _, code := range g.FatalStatusCodes { + normalized := toCamelCase(code) + nonRetryableCodes[normalized] = struct{}{} + } +} + +// toCamelCase converts a SNAKE_CASE string to CamelCase +func toCamelCase(s string) string { + parts := strings.Split(strings.ToLower(s), "_") + for i, part := range parts { + if len(part) > 0 { + parts[i] = strings.ToUpper(part[:1]) + part[1:] + } + } + return strings.Join(parts, "") +} \ No newline at end of file diff --git a/providers/flagd/pkg/service/in_process/grpc_config_test.go b/providers/flagd/pkg/service/in_process/grpc_config_test.go new file mode 100644 index 000000000..b49e069af --- /dev/null +++ b/providers/flagd/pkg/service/in_process/grpc_config_test.go @@ -0,0 +1,125 @@ +package process + +import ( + "encoding/json" + "reflect" + "strings" + "testing" +) + + +func TestBuildRetryPolicy(t *testing.T) { + g := &Sync{ + RetryBackOffMs: 100, + RetryBackOffMaxMs: 500, + } + + result := g.buildRetryPolicy() + + // Unmarshal to check structure + var policy map[string]interface{} + if err := json.Unmarshal([]byte(result), &policy); err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + + methodConfig, ok := policy["methodConfig"].([]interface{}) + if !ok || len(methodConfig) == 0 { + t.Fatalf("methodConfig missing or empty") + } + + config := methodConfig[0].(map[string]interface{}) + retryPolicy, ok := config["retryPolicy"].(map[string]interface{}) + if !ok { + t.Fatalf("retryPolicy missing") + } + + if retryPolicy["MaxAttempts"].(float64) != 3 { + t.Errorf("MaxAttempts = %v; want 3", retryPolicy["MaxAttempts"]) + } + if retryPolicy["InitialBackoff"].(string) != "100ms" { + t.Errorf("InitialBackoff = %v; want 100ms", retryPolicy["InitialBackoff"]) + } + if retryPolicy["MaxBackoff"].(string) != "500ms" { + t.Errorf("MaxBackoff = %v; want 500ms", retryPolicy["MaxBackoff"]) + } + if retryPolicy["BackoffMultiplier"].(float64) != 2.0 { + t.Errorf("BackoffMultiplier = %v; want 2.0", retryPolicy["BackoffMultiplier"]) + } + codes := retryPolicy["RetryableStatusCodes"].([]interface{}) + expectedCodes := []string{"UNKNOWN", "UNAVAILABLE"} + for i, code := range expectedCodes { + if codes[i].(string) != code { + t.Errorf("RetryableStatusCodes[%d] = %v; want %v", i, codes[i], code) + } + } + + // Also check that the result is valid JSON and contains expected substrings + if !strings.Contains(result, `"MaxAttempts":3`) { + t.Error("Result does not contain MaxAttempts") + } + if !strings.Contains(result, `"InitialBackoff":"100ms"`) { + t.Error("Result does not contain InitialBackoff") + } + if !strings.Contains(result, `"MaxBackoff":"500ms"`) { + t.Error("Result does not contain MaxBackoff") + } + if !strings.Contains(result, `"RetryableStatusCodes":["UNKNOWN","UNAVAILABLE"]`) { + t.Error("Result does not contain RetryableStatusCodes") + } +} + +type syncTestCase struct { + input []string + expected map[string]struct{} +} + +func TestInitNonRetryableStatusCodesSet(t *testing.T) { + testCases := []syncTestCase{ + { + input: []string{"PERMISSION_DENIED", "UNKNOWN"}, + expected: map[string]struct{}{"PermissionDenied": {}, "Unknown": {}}, + }, + { + input: []string{"ALREADY_EXISTS"}, + expected: map[string]struct{}{"AlreadyExists": {}}, + }, + { + input: []string{}, + expected: map[string]struct{}{}, + }, + } + + for _, tc := range testCases { + g := &Sync{FatalStatusCodes: tc.input} + nonRetryableCodes = nil // reset global + g.initNonRetryableStatusCodesSet() + if !reflect.DeepEqual(nonRetryableCodes, tc.expected) { + t.Errorf("input: %v, got: %v, want: %v", tc.input, nonRetryableCodes, tc.expected) + } + } +} + +func TestToCamelCase(t *testing.T) { + testCases := []struct { + input string + expected string + }{ + {"INVALID_ARGUMENT", "InvalidArgument"}, + {"NOT_FOUND", "NotFound"}, + {"ALREADY_EXISTS", "AlreadyExists"}, + {"UNKNOWN", "Unknown"}, + {"", ""}, + {"SINGLE", "Single"}, + {"MULTI_WORD_EXAMPLE", "MultiWordExample"}, + {"_LEADING_UNDERSCORE", "LeadingUnderscore"}, + {"TRAILING_UNDERSCORE_", "TrailingUnderscore"}, + {"__DOUBLE__UNDERSCORES__", "DoubleUnderscores"}, + } + + for _, tc := range testCases { + got := toCamelCase(tc.input) + if got != tc.expected { + t.Errorf("toCamelCase(%q) = %q; want %q", tc.input, got, tc.expected) + } + } +} \ No newline at end of file diff --git a/providers/flagd/pkg/service/in_process/grpc_sync.go b/providers/flagd/pkg/service/in_process/grpc_sync.go index 9b6b93caa..413e1e67a 100644 --- a/providers/flagd/pkg/service/in_process/grpc_sync.go +++ b/providers/flagd/pkg/service/in_process/grpc_sync.go @@ -12,52 +12,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" msync "sync" "time" ) -const ( - // Default timeouts and retry intervals - defaultKeepaliveTime = 30 * time.Second - defaultKeepaliveTimeout = 5 * time.Second - - retryPolicy = `{ - "methodConfig": [ - { - "name": [ - { - "service": "flagd.sync.v1.FlagSyncService" - } - ], - "retryPolicy": { - "MaxAttempts": 3, - "InitialBackoff": "1s", - "MaxBackoff": "5s", - "BackoffMultiplier": 2.0, - "RetryableStatusCodes": [ - "CANCELLED", - "UNKNOWN", - "INVALID_ARGUMENT", - "NOT_FOUND", - "ALREADY_EXISTS", - "PERMISSION_DENIED", - "RESOURCE_EXHAUSTED", - "FAILED_PRECONDITION", - "ABORTED", - "OUT_OF_RANGE", - "UNIMPLEMENTED", - "INTERNAL", - "UNAVAILABLE", - "DATA_LOSS", - "UNAUTHENTICATED" - ] - } - } - ] - }` -) - -// Type aliases for interfaces required by this component - needed for mock generation with gomock +// FlagSyncServiceClient Type aliases for interfaces required by this component - needed for mock generation with gomock type FlagSyncServiceClient interface { syncv1grpc.FlagSyncServiceClient } @@ -78,6 +38,10 @@ type Sync struct { Selector string URI string MaxMsgSize int + RetryGracePeriod int + RetryBackOffMs int + RetryBackOffMaxMs int + FatalStatusCodes []string // Runtime state client FlagSyncServiceClient @@ -92,6 +56,7 @@ type Sync struct { // Init initializes the gRPC connection and starts background monitoring func (g *Sync) Init(ctx context.Context) error { g.Logger.Info(fmt.Sprintf("initializing gRPC client for %s", g.URI)) + g.initNonRetryableStatusCodesSet() // Initialize channels g.shutdownComplete = make(chan struct{}) @@ -155,11 +120,13 @@ func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) { } dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) - dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(retryPolicy)) + dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(g.buildRetryPolicy())) return dialOptions, nil } + + // ReSync performs a one-time fetch of all flags func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { g.Logger.Debug("performing ReSync - fetching all flags") @@ -193,6 +160,8 @@ func (g *Sync) IsReady() bool { func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { g.Logger.Info("starting continuous flag synchronization") + time.Sleep(500 * time.Millisecond) + // Ensure shutdown completion is signaled when THIS method exits defer g.markShutdownComplete() @@ -207,12 +176,31 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } // Attempt to create sync stream - if err := g.performSyncCycle(ctx, dataSync); err != nil { + err := g.performSyncCycle(ctx, dataSync) + if err != nil { if ctx.Err() != nil { g.Logger.Info("sync cycle failed due to context cancellation") return ctx.Err() } + // Check if error is a gRPC status error and if code is retryable + st, ok := status.FromError(err) + if ok { + codeStr := st.Code().String() + if _, found := nonRetryableCodes[codeStr]; found { + errStr := fmt.Sprintf("sync cycle failed with non-retryable status: %v, " + + "returning provider fatal.", codeStr) + g.Logger.Error(errStr) + return &of.ProviderInitError{ + ErrorCode: of.ProviderFatalCode, + Message: errStr, + } + } + } + + // Backoff before retrying + time.Sleep(time.Duration(g.RetryBackOffMs) * time.Millisecond) + g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err)) g.sendEvent(ctx, SyncEvent{event: of.ProviderError}) diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index 6cb6d0e1b..4412f887e 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -112,6 +112,9 @@ type Configuration struct { GrpcDialOptionsOverride []googlegrpc.DialOption CertificatePath string RetryGracePeriod int + RetryBackOffMs int + RetryBackOffMaxMs int + FatalStatusCodes []string } // EventSync interface for sync providers that support events @@ -569,6 +572,10 @@ func createSyncProvider(cfg Configuration, log *logger.Logger) (isync.ISync, str ProviderID: cfg.ProviderID, Selector: cfg.Selector, URI: uri, + RetryGracePeriod: cfg.RetryGracePeriod, + RetryBackOffMs: cfg.RetryBackOffMs, + RetryBackOffMaxMs: cfg.RetryBackOffMaxMs, + FatalStatusCodes: cfg.FatalStatusCodes, }, uri } diff --git a/providers/flagd/pkg/service/in_process/service_grpc_test.go b/providers/flagd/pkg/service/in_process/service_grpc_test.go index a1e7c1c1b..66f33046b 100644 --- a/providers/flagd/pkg/service/in_process/service_grpc_test.go +++ b/providers/flagd/pkg/service/in_process/service_grpc_test.go @@ -54,6 +54,8 @@ func TestInProcessProviderEvaluation(t *testing.T) { Port: port, Selector: scope, TLSEnabled: false, + RetryBackOffMaxMs: 5000, + RetryBackOffMs: 1000, }) // when @@ -141,6 +143,8 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) { TargetUri: "envoy://localhost:9211/foo.service", Selector: scope, TLSEnabled: false, + RetryBackOffMaxMs: 5000, + RetryBackOffMs: 1000, }) // when diff --git a/providers/multi-provider/pkg/strategies/strategies.go b/providers/multi-provider/pkg/strategies/strategies.go index f396cd279..86083989f 100644 --- a/providers/multi-provider/pkg/strategies/strategies.go +++ b/providers/multi-provider/pkg/strategies/strategies.go @@ -112,7 +112,7 @@ func setFlagMetadata(strategyUsed EvaluationStrategy, successProviderName string func cleanErrorMessage(msg string) string { codeRegex := strings.Join([]string{ string(of.ProviderNotReadyCode), - // string(of.ProviderFatalCode), // TODO: not available until go-sdk 14 + string(of.ProviderFatalCode), string(of.FlagNotFoundCode), string(of.ParseErrorCode), string(of.TypeMismatchCode), diff --git a/tests/flagd/testframework/config_steps.go b/tests/flagd/testframework/config_steps.go index ddf7ac492..0bed8f1c1 100644 --- a/tests/flagd/testframework/config_steps.go +++ b/tests/flagd/testframework/config_steps.go @@ -17,8 +17,6 @@ var ignoredOptions = []string{ "deadlineMs", "streamDeadlineMs", "keepAliveTime", - "retryBackoffMs", - "retryBackoffMaxMs", "offlinePollIntervalMs", } diff --git a/tests/flagd/testframework/provider_steps.go b/tests/flagd/testframework/provider_steps.go index 165853b2e..bd91fe1c7 100644 --- a/tests/flagd/testframework/provider_steps.go +++ b/tests/flagd/testframework/provider_steps.go @@ -27,6 +27,10 @@ func InitializeProviderSteps(ctx *godog.ScenarioContext) { // Generic provider step definition - accepts any provider type including "stable" ctx.Step(`^a (\w+) flagd provider$`, withState1Arg((*TestState).createSpecializedFlagdProvider)) + + // TODO: deprecate 'is' variant after flagd-testbed/pull/#311 is merged + ctx.Step(`^the client (?:is|should be) in (\w+) state$`, + withState1Arg((*TestState).assertClientState)) } // State methods - these now expect context as first parameter after state @@ -111,6 +115,13 @@ func (s *TestState) simulateConnectionLoss(ctx context.Context, seconds int) err return s.Container.Restart(seconds) } +func (s *TestState) assertClientState(ctx context.Context, state string) error { + if string(s.Client.State()) == strings.ToUpper(state) { + return nil + } + return fmt.Errorf("expected client state %s but got %s", state, s.Client.State()) +} + // createSpecializedFlagdProvider creates specialized flagd providers based on type func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, providerType string) error { // Apply specialized configuration based on provider type @@ -128,7 +139,7 @@ func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, provider return fmt.Errorf("failed to create instance for %s provider: %w", providerType, err) } - if providerType != "unavailable" { + if providerType != "unavailable" && providerType != "forbidden" { if s.ProviderType == RPC { // Small delay to allow flagd server to fully load flags after connection time.Sleep(50 * time.Millisecond) @@ -150,6 +161,7 @@ func (s *TestState) applySpecializedConfig(providerType string) error { return nil case "unavailable": return s.configureUnavailableProvider() + case "forbidden": return s.configureForbiddenProvider() case "socket": return s.configureSocketProvider() case "ssl", "tls": @@ -173,6 +185,12 @@ func (s *TestState) configureUnavailableProvider() error { return nil } +func (s *TestState) configureForbiddenProvider() error { + // Set an Envoy port which always responds with forbidden + s.addProviderOption("port", "Integer", "9212") + return nil +} + func (s *TestState) configureSocketProvider() error { // Configure for unix socket connection s.addProviderOption("socketPath", "String", "/tmp/flagd.sock") diff --git a/tests/flagd/testframework/step_definitions.go b/tests/flagd/testframework/step_definitions.go index 3a76e088e..54c5c86ef 100644 --- a/tests/flagd/testframework/step_definitions.go +++ b/tests/flagd/testframework/step_definitions.go @@ -1,105 +1,105 @@ -package testframework - -import ( - "context" - "github.com/cucumber/godog" - "os" - "sync" -) - -// All type definitions have been moved to types.go for better organization -var scenarioMutex sync.Mutex - -// InitializeScenario registers all step definitions for gherkin scenarios -func InitializeScenario(ctx *godog.ScenarioContext) { - - // Configuration steps (existing config_steps.go steps work fine with TestState via context) - InitializeConfigScenario(ctx) - - // Provider lifecycle steps - InitializeProviderSteps(ctx) - - // Flag evaluation steps - InitializeFlagSteps(ctx) - - // Context management steps - InitializeContextSteps(ctx) - - // Event handling steps - InitializeEventSteps(ctx) - - // Setup scenario hooks - ctx.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { - scenarioMutex.Lock() - defer scenarioMutex.Unlock() - state := &TestState{ - EnvVars: make(map[string]string), - EvalContext: make(map[string]interface{}), - EventChannel: make(chan EventRecord, 100), - } - state.ProviderType = ctx.Value("resolver").(ProviderType) - state.FlagDir = ctx.Value("flagDir").(string) - - return context.WithValue(ctx, TestStateKey{}, state), nil - }) - - ctx.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { - scenarioMutex.Lock() - defer scenarioMutex.Unlock() - if state, ok := ctx.Value(TestStateKey{}).(*TestState); ok { - state.clearEvents() - state.CleanupEnvironmentVariables() - state.cleanupProvider() - } - return ctx, nil - }) -} - -// Type conversion utilities are now centralized in utils.go -// Legacy compatibility wrappers -func convertValueForSteps(value string, valueType string) (interface{}, error) { - return DefaultConverter.ConvertForSteps(value, valueType) -} - -// applyDefaults sets default values for TestState fields -func (s *TestState) applyDefaults() { - if s.EnvVars == nil { - s.EnvVars = make(map[string]string) - } - if s.EvalContext == nil { - s.EvalContext = make(map[string]interface{}) - } - if s.EventChannel == nil { - s.EventChannel = make(chan EventRecord, 100) // Buffered channel to prevent blocking - } - if s.ProviderType == 0 { - s.ProviderType = RPC // Default to RPC - } -} - -// cleanupEnvironmentVariables restores original environment variables -func (s *TestState) CleanupEnvironmentVariables() { - for envVar, originalValue := range s.EnvVars { - if originalValue == "" { - os.Unsetenv(envVar) - } else { - os.Setenv(envVar, originalValue) - } - } - s.EnvVars = make(map[string]string) -} - -// cleanupProvider properly shuts down the provider and client to prevent event contamination -func (s *TestState) cleanupProvider() { - // Shutdown the provider if it has a shutdown method - if s.Provider != nil { - // Try to cast to common provider interfaces that might have shutdown methods - // This is defensive - not all providers will have explicit shutdown - - if shutdownable, ok := s.Provider.(interface{ Shutdown() }); ok { - shutdownable.Shutdown() - } - - s.Provider = nil - } -} +package testframework + +import ( + "context" + "github.com/cucumber/godog" + "os" + "sync" +) + +// All type definitions have been moved to types.go for better organization +var scenarioMutex sync.Mutex + +// InitializeScenario registers all step definitions for gherkin scenarios +func InitializeScenario(ctx *godog.ScenarioContext) { + + // Configuration steps (existing config_steps.go steps work fine with TestState via context) + InitializeConfigScenario(ctx) + + // Provider lifecycle steps + InitializeProviderSteps(ctx) + + // Flag evaluation steps + InitializeFlagSteps(ctx) + + // Context management steps + InitializeContextSteps(ctx) + + // Event handling steps + InitializeEventSteps(ctx) + + // Setup scenario hooks + ctx.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { + scenarioMutex.Lock() + defer scenarioMutex.Unlock() + state := &TestState{ + EnvVars: make(map[string]string), + EvalContext: make(map[string]interface{}), + EventChannel: make(chan EventRecord, 100), + } + state.ProviderType = ctx.Value("resolver").(ProviderType) + state.FlagDir = ctx.Value("flagDir").(string) + + return context.WithValue(ctx, TestStateKey{}, state), nil + }) + + ctx.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { + scenarioMutex.Lock() + defer scenarioMutex.Unlock() + if state, ok := ctx.Value(TestStateKey{}).(*TestState); ok { + state.clearEvents() + state.CleanupEnvironmentVariables() + state.cleanupProvider() + } + return ctx, nil + }) +} + +// Type conversion utilities are now centralized in utils.go +// Legacy compatibility wrappers +func convertValueForSteps(value string, valueType string) (interface{}, error) { + return DefaultConverter.ConvertForSteps(value, valueType) +} + +// applyDefaults sets default values for TestState fields +func (s *TestState) applyDefaults() { + if s.EnvVars == nil { + s.EnvVars = make(map[string]string) + } + if s.EvalContext == nil { + s.EvalContext = make(map[string]interface{}) + } + if s.EventChannel == nil { + s.EventChannel = make(chan EventRecord, 100) // Buffered channel to prevent blocking + } + if s.ProviderType == 0 { + s.ProviderType = RPC // Default to RPC + } +} + +// cleanupEnvironmentVariables restores original environment variables +func (s *TestState) CleanupEnvironmentVariables() { + for envVar, originalValue := range s.EnvVars { + if originalValue == "" { + os.Unsetenv(envVar) + } else { + os.Setenv(envVar, originalValue) + } + } + s.EnvVars = make(map[string]string) +} + +// cleanupProvider properly shuts down the provider and client to prevent event contamination +func (s *TestState) cleanupProvider() { + // Shutdown the provider if it has a shutdown method + if s.Provider != nil { + // Try to cast to common provider interfaces that might have shutdown methods + // This is defensive - not all providers will have explicit shutdown + + if shutdownable, ok := s.Provider.(interface{ Shutdown() }); ok { + shutdownable.Shutdown() + } + + s.Provider = nil + } +} diff --git a/tests/flagd/testframework/testbed_runner.go b/tests/flagd/testframework/testbed_runner.go index 2db1006e1..f069bed3e 100644 --- a/tests/flagd/testframework/testbed_runner.go +++ b/tests/flagd/testframework/testbed_runner.go @@ -278,7 +278,12 @@ func (tr *TestbedRunner) buildProviderOptions(state TestState, resolverType Prov }) break } - + } + if option.Option == "port" { + if option.Value == "9212" { + option.Value = strconv.Itoa(tr.container.forbiddenPort) + state.ProviderOptions[i] = option + } } } opts = append(opts, state.GenerateOpts()...) diff --git a/tests/flagd/testframework/testcontainer.go b/tests/flagd/testframework/testcontainer.go index 6f393e0c5..627c3f4bc 100644 --- a/tests/flagd/testframework/testcontainer.go +++ b/tests/flagd/testframework/testcontainer.go @@ -23,6 +23,7 @@ type FlagdTestContainer struct { launchpadPort int healthPort int envoyPort int + forbiddenPort int } // Container config type moved to types.go @@ -87,6 +88,10 @@ func NewFlagdContainer(ctx context.Context, config FlagdContainerConfig) (*Flagd if err != nil { return nil, err } + forbiddenPort, err := getMappedPort(ctx, composeStack, envoy, "9212") + if err != nil { + return nil, err + } flagdContainer := &FlagdTestContainer{ container: composeStack, @@ -96,6 +101,7 @@ func NewFlagdContainer(ctx context.Context, config FlagdContainerConfig) (*Flagd healthPort: healthPort, envoyPort: envoyPort, launchpadURL: fmt.Sprintf("http://%s:%d", host, launchpadPort), + forbiddenPort: forbiddenPort, } // Additional wait time if configured diff --git a/tests/flagd/testframework/utils.go b/tests/flagd/testframework/utils.go index 1b5a7bfa6..1bcea7cd1 100644 --- a/tests/flagd/testframework/utils.go +++ b/tests/flagd/testframework/utils.go @@ -78,6 +78,9 @@ func (vc *ValueConverter) ConvertToReflectValue(valueType, value string, fieldTy panic(fmt.Errorf("failed to convert %s to long: %w", value, err)) } return reflect.ValueOf(longVal).Convert(fieldType) + case "StringList": + arrayVal := strings.Split(value, ",") + return reflect.ValueOf(arrayVal).Convert(fieldType) default: return reflect.ValueOf(value).Convert(fieldType) }