diff --git a/.release-please-manifest.json b/.release-please-manifest.json index 7a5082b7b..d1f9791b6 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -2,7 +2,7 @@ "hooks/open-telemetry": "0.3.6", "hooks/validator": "0.1.7", "providers/configcat": "0.2.3", - "providers/flagd": "0.3.0", + "providers/flagd": "0.3.1", "providers/flipt": "0.1.5", "providers/from-env": "0.1.6", "providers/go-feature-flag": "0.2.6", @@ -14,7 +14,7 @@ "providers/statsig": "0.0.4", "providers/ofrep": "0.1.7", "providers/prefab": "0.0.4", - "tests/flagd": "1.5.1", + "tests/flagd": "1.6.0", "providers/go-feature-flag-in-process": "0.1.2", "providers/multi-provider": "0.0.5", "tools/flagd-http-connector": "0.0.2", diff --git a/Makefile b/Makefile index 32bb1b4e0..6fac510a0 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ test: # call with TESTCONTAINERS_RYUK_DISABLED="true" to avoid problems with podman on Macs e2e: - go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=2m -tags=e2e {} + go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=3m -tags=e2e {} lint: go install -v github.com/golangci/golangci-lint/v2/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION) diff --git a/providers/flagd/CHANGELOG.md b/providers/flagd/CHANGELOG.md index 13c2fee9b..4b93a8447 100644 --- a/providers/flagd/CHANGELOG.md +++ b/providers/flagd/CHANGELOG.md @@ -1,5 +1,33 @@ # Changelog +## [0.3.1](https://github.com/open-feature/go-sdk-contrib/compare/providers/flagd/v0.3.0...providers/flagd/v0.3.1) (2025-11-25) + + +### 🐛 Bug Fixes + +* **deps:** bump open-feature/go-sdk from v1.11 to v1.15 ([#686](https://github.com/open-feature/go-sdk-contrib/issues/686)) ([ce87102](https://github.com/open-feature/go-sdk-contrib/commit/ce871021d0c45d3c992bb00b33c8b7a8e337e9a3)) +* **deps:** update golang.org/x/exp digest to b7579e2 ([#679](https://github.com/open-feature/go-sdk-contrib/issues/679)) ([a6372f9](https://github.com/open-feature/go-sdk-contrib/commit/a6372f91b262d2f81b90bfa9e76d722ad480378b)) +* **deps:** update jsonlogic module to fix race detection ([#691](https://github.com/open-feature/go-sdk-contrib/issues/691)) ([21f3de0](https://github.com/open-feature/go-sdk-contrib/commit/21f3de0d39a6d23000957bd6f278df466af385e4)) +* **deps:** update module buf.build/gen/go/open-feature/flagd/connectrpc/go to v1.18.1-20250529171031-ebdc14163473.1 ([#699](https://github.com/open-feature/go-sdk-contrib/issues/699)) ([6c7044d](https://github.com/open-feature/go-sdk-contrib/commit/6c7044de8bf10d12ed07f4c66335e297c444a6fe)) +* **deps:** update module buf.build/gen/go/open-feature/flagd/grpc/go to v1.5.1-20250529171031-ebdc14163473.2 ([#700](https://github.com/open-feature/go-sdk-contrib/issues/700)) ([4747395](https://github.com/open-feature/go-sdk-contrib/commit/474739580f3c7f72e031929b99ab0b86ba4812bb)) +* **deps:** update module buf.build/gen/go/open-feature/flagd/protocolbuffers/go to v1.36.6-20250529171031-ebdc14163473.1 ([#706](https://github.com/open-feature/go-sdk-contrib/issues/706)) ([902021b](https://github.com/open-feature/go-sdk-contrib/commit/902021be1083336d9a53c1fd8388cbaaa8dc7959)) +* **deps:** update module github.com/open-feature/flagd/core to v0.11.5 ([#666](https://github.com/open-feature/go-sdk-contrib/issues/666)) ([94b44c4](https://github.com/open-feature/go-sdk-contrib/commit/94b44c4aed982ac54b91bd82a2cf8400c1b622c0)) +* **deps:** update module github.com/open-feature/go-sdk to v1.15.1 ([#681](https://github.com/open-feature/go-sdk-contrib/issues/681)) ([8fd544f](https://github.com/open-feature/go-sdk-contrib/commit/8fd544ff81fd25eed655a214aa1ae1906a436f0d)) +* fix goroutine leaks around shutdown ([#716](https://github.com/open-feature/go-sdk-contrib/issues/716)) ([c3ea532](https://github.com/open-feature/go-sdk-contrib/commit/c3ea53271ed91d20c9a9afd762ea5e2c4c3c488a)) +* **flagd:** missed error events, add e2e tests ([#760](https://github.com/open-feature/go-sdk-contrib/issues/760)) ([3750972](https://github.com/open-feature/go-sdk-contrib/commit/3750972d25d847ea56f6b9b5a7640407db67ab11)) +* **security:** update module github.com/containerd/containerd/v2 to v2.1.5 [security] ([#797](https://github.com/open-feature/go-sdk-contrib/issues/797)) ([f74c0c3](https://github.com/open-feature/go-sdk-contrib/commit/f74c0c306759914c48364320f2f3a2db252f3d35)) +* **security:** update module github.com/docker/compose/v2 to v2.40.2 [security] ([#785](https://github.com/open-feature/go-sdk-contrib/issues/785)) ([805823f](https://github.com/open-feature/go-sdk-contrib/commit/805823f5ded2d81359fd7663804beb50f30d52f7)) +* **security:** update module golang.org/x/crypto to v0.45.0 [security] ([#803](https://github.com/open-feature/go-sdk-contrib/issues/803)) ([20b0ccd](https://github.com/open-feature/go-sdk-contrib/commit/20b0ccdf1261cacde5273f61882194b92dbd6650)) +* **security:** update vulnerability-updates [security] ([#724](https://github.com/open-feature/go-sdk-contrib/issues/724)) ([629a535](https://github.com/open-feature/go-sdk-contrib/commit/629a5351c2c4b8fed00522f7453d5545920ceaaf)) +* **security:** update vulnerability-updates [security] ([#773](https://github.com/open-feature/go-sdk-contrib/issues/773)) ([21628dc](https://github.com/open-feature/go-sdk-contrib/commit/21628dc0bc058c042f14c1afa45df2dfc3d93c72)) + + +### ✨ New Features + +* comprehensive flagd e2e testing framework with testcontainers integration ([#732](https://github.com/open-feature/go-sdk-contrib/issues/732)) ([e3ec17b](https://github.com/open-feature/go-sdk-contrib/commit/e3ec17bdc7140582582a5df1154b6044cbf5b640)) +* **flagd:** add eventing with graceperiod for inprocess resolver ([#744](https://github.com/open-feature/go-sdk-contrib/issues/744)) ([a9fabb6](https://github.com/open-feature/go-sdk-contrib/commit/a9fabb623d22b6a1ef888722ffe68686031309b8)) +* upgrade flagd dependencies to 0.12.1 ([#731](https://github.com/open-feature/go-sdk-contrib/issues/731)) ([8e8d888](https://github.com/open-feature/go-sdk-contrib/commit/8e8d888dea080a03ea2a709b79598c7de6a9eed8)) + ## [0.3.0](https://github.com/open-feature/go-sdk-contrib/compare/providers/flagd/v0.2.6...providers/flagd/v0.3.0) (2025-06-07) diff --git a/providers/flagd/e2e/config_test.go b/providers/flagd/e2e/config_test.go index c68262eb3..9c8886713 100644 --- a/providers/flagd/e2e/config_test.go +++ b/providers/flagd/e2e/config_test.go @@ -20,19 +20,19 @@ func TestConfiguration(t *testing.T) { testCases := []configTestCase{ { name: "All", - tags: "", + tags: "~@sync-port", }, { name: "RPC", - tags: "@rpc", + tags: "@rpc && ~@sync-port", }, { name: "InProcess", - tags: "@in-process", + tags: "@in-process && ~@sync-port", }, { name: "File", - tags: "@file", + tags: "@file && ~@sync-port", }, } diff --git a/providers/flagd/e2e/inprocess_test.go b/providers/flagd/e2e/inprocess_test.go index d4767e4e3..eb7743626 100644 --- a/providers/flagd/e2e/inprocess_test.go +++ b/providers/flagd/e2e/inprocess_test.go @@ -5,6 +5,8 @@ package e2e import ( "testing" + flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg" + "github.com/open-feature/go-sdk-contrib/tests/flagd/testframework" ) @@ -17,6 +19,9 @@ func TestInProcessProviderE2E(t *testing.T) { runner := testframework.NewTestbedRunner(testframework.TestbedConfig{ ResolverType: testframework.InProcess, TestbedConfig: "default", + ExtraOptions: []flagd.ProviderOption{ + flagd.WithRetryBackoffMaxMs(3000), + }, }) defer runner.Cleanup() @@ -26,7 +31,7 @@ func TestInProcessProviderE2E(t *testing.T) { } // Run tests with in-process specific tags - tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload" + tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload && ~@sync-port" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/e2e/rpc_test.go b/providers/flagd/e2e/rpc_test.go index f3c58b9ab..fe8451339 100644 --- a/providers/flagd/e2e/rpc_test.go +++ b/providers/flagd/e2e/rpc_test.go @@ -26,7 +26,7 @@ func TestRPCProviderE2E(t *testing.T) { } // Run tests with RPC-specific tags - exclude unimplemented scenarios - tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching" + tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching && ~@forbidden && ~@sync-port" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/flagd-testbed b/providers/flagd/flagd-testbed index b62f5dbe8..9b73b3a95 160000 --- a/providers/flagd/flagd-testbed +++ b/providers/flagd/flagd-testbed @@ -1 +1 @@ -Subproject commit b62f5dbe860ecf4f36ec757dfdc0b38f7b3dec6e +Subproject commit 9b73b3a95cd9e0885937d244b118713b26374b1d diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index ca758d38d..f28ffcc6f 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -3,14 +3,15 @@ package flagd import ( "errors" "fmt" + "os" + "strconv" + "strings" + "github.com/go-logr/logr" "github.com/open-feature/flagd/core/pkg/sync" "github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache" "github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger" "google.golang.org/grpc" - "os" - "strconv" - "strings" ) type ResolverType string @@ -26,6 +27,9 @@ const ( defaultHost = "localhost" defaultResolver = rpc defaultGracePeriod = 5 + defaultRetryBackoffMs = 1000 + defaultRetryBackoffMaxMs = 120000 + defaultFatalStatusCodes = "" rpc ResolverType = "rpc" inProcess ResolverType = "in-process" @@ -45,6 +49,9 @@ const ( flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI" flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD" + flagdRetryBackoffMsVariableName = "FLAGD_RETRY_BACKOFF_MS" + flagdRetryBackoffMaxMsVariableName = "FLAGD_RETRY_BACKOFF_MAX_MS" + flagdFatalStatusCodesVariableName = "FLAGD_FATAL_STATUS_CODES" ) type ProviderConfiguration struct { @@ -66,6 +73,9 @@ type ProviderConfiguration struct { CustomSyncProviderUri string GrpcDialOptionsOverride []grpc.DialOption RetryGracePeriod int + RetryBackoffMs int + RetryBackoffMaxMs int + FatalStatusCodes []string log logr.Logger } @@ -80,6 +90,8 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration { Resolver: defaultResolver, Tls: defaultTLS, RetryGracePeriod: defaultGracePeriod, + RetryBackoffMs: defaultRetryBackoffMs, + RetryBackoffMaxMs: defaultRetryBackoffMaxMs, } p.updateFromEnvVar() @@ -130,6 +142,7 @@ func validateProviderConfiguration(p *ProviderConfiguration) error { // updateFromEnvVar is a utility to update configurations based on current environment variables func (cfg *ProviderConfiguration) updateFromEnvVar() { + portS := os.Getenv(flagdPortEnvironmentVariableName) if portS != "" { port, err := strconv.Atoi(portS) @@ -159,17 +172,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { cfg.CertPath = certificatePath } - if maxCacheSizeS := os.Getenv(flagdMaxCacheSizeEnvironmentVariableName); maxCacheSizeS != "" { - maxCacheSizeFromEnv, err := strconv.Atoi(maxCacheSizeS) - if err != nil { - cfg.log.Error(err, - fmt.Sprintf("invalid env config for %s provided, using default value: %d", - flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, - )) - } else { - cfg.MaxCacheSize = maxCacheSizeFromEnv - } - } + cfg.MaxCacheSize = getIntFromEnvVarOrDefault(flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, cfg.log) if cacheValue := os.Getenv(flagdCacheEnvironmentVariableName); cacheValue != "" { switch cache.Type(cacheValue) { @@ -185,18 +188,8 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { } } - if maxEventStreamRetriesS := os.Getenv( - flagdMaxEventStreamRetriesEnvironmentVariableName); maxEventStreamRetriesS != "" { - - maxEventStreamRetries, err := strconv.Atoi(maxEventStreamRetriesS) - if err != nil { - cfg.log.Error(err, - fmt.Sprintf("invalid env config for %s provided, using default value: %d", - flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries)) - } else { - cfg.EventStreamConnectionMaxAttempts = maxEventStreamRetries - } - } + cfg.EventStreamConnectionMaxAttempts = getIntFromEnvVarOrDefault( + flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries, cfg.log) if resolver := os.Getenv(flagdResolverEnvironmentVariableName); resolver != "" { switch strings.ToLower(resolver) { @@ -227,15 +220,43 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" { cfg.TargetUri = targetUri } - if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" { - if seconds, err := strconv.Atoi(gracePeriod); err == nil { - cfg.RetryGracePeriod = seconds - } else { - // Handle parsing error - cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod)) + + cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log) + cfg.RetryBackoffMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMsVariableName, defaultRetryBackoffMs, cfg.log) + cfg.RetryBackoffMaxMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMaxMsVariableName, defaultRetryBackoffMaxMs, cfg.log) + + var fatalStatusCodes string + if envVal := os.Getenv(flagdFatalStatusCodesVariableName); envVal != "" { + fatalStatusCodes = envVal + } else { + fatalStatusCodes = defaultFatalStatusCodes + } + if fatalStatusCodes == "" { + cfg.FatalStatusCodes = []string{} + } else { + fatalStatusCodesArr := strings.Split(fatalStatusCodes, ",") + for i, fatalStatusCode := range fatalStatusCodesArr { + fatalStatusCodesArr[i] = strings.TrimSpace(fatalStatusCode) } + cfg.FatalStatusCodes = fatalStatusCodesArr } +} + +// Helper +func getIntFromEnvVarOrDefault(envVarName string, defaultValue int, log logr.Logger) int { + if valueFromEnv := os.Getenv(envVarName); valueFromEnv != "" { + intValue, err := strconv.Atoi(valueFromEnv) + if err != nil { + log.Error(err, + fmt.Sprintf("invalid env config for %s provided, using default value: %d", + envVarName, defaultValue, + )) + } else { + return intValue + } + } + return defaultValue } // ProviderOptions @@ -415,3 +436,25 @@ func WithRetryGracePeriod(gracePeriod int) ProviderOption { p.RetryGracePeriod = gracePeriod } } + +// WithRetryBackoffMs sets the initial backoff duration (in milliseconds) for retrying failed connections +func WithRetryBackoffMs(retryBackoffMs int) ProviderOption { + return func(p *ProviderConfiguration) { + p.RetryBackoffMs = retryBackoffMs + } +} + +// WithRetryBackoffMaxMs sets the maximum backoff duration (in milliseconds) for retrying failed connections +func WithRetryBackoffMaxMs(retryBackoffMaxMs int) ProviderOption { + return func(p *ProviderConfiguration) { + p.RetryBackoffMaxMs = retryBackoffMaxMs + } +} + +// WithFatalStatusCodes allows to set a list of gRPC status codes, which will cause streams to give up +// and put the provider in a PROVIDER_FATAL state +func WithFatalStatusCodes(fatalStatusCodes []string) ProviderOption { + return func(p *ProviderConfiguration) { + p.FatalStatusCodes = fatalStatusCodes + } +} diff --git a/providers/flagd/pkg/provider.go b/providers/flagd/pkg/provider.go index 1742f2f15..f2a199184 100644 --- a/providers/flagd/pkg/provider.go +++ b/providers/flagd/pkg/provider.go @@ -74,6 +74,9 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) { CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri, GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride, RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod, + RetryBackOffMs: provider.providerConfiguration.RetryBackoffMs, + RetryBackOffMaxMs: provider.providerConfiguration.RetryBackoffMaxMs, + FatalStatusCodes: provider.providerConfiguration.FatalStatusCodes, }) default: service = process.NewInProcessService(process.Configuration{ diff --git a/providers/flagd/pkg/service/in_process/grpc_config.go b/providers/flagd/pkg/service/in_process/grpc_config.go new file mode 100644 index 000000000..aaf314303 --- /dev/null +++ b/providers/flagd/pkg/service/in_process/grpc_config.go @@ -0,0 +1,66 @@ +package process + +import ( + "encoding/json" + "fmt" + "google.golang.org/grpc/codes" + "time" +) + +const ( + // Default timeouts and retry intervals + defaultKeepaliveTime = 30 * time.Second + defaultKeepaliveTimeout = 5 * time.Second +) + +type RetryPolicy struct { + MaxAttempts int `json:"MaxAttempts"` + InitialBackoff string `json:"InitialBackoff"` + MaxBackoff string `json:"MaxBackoff"` + BackoffMultiplier float64 `json:"BackoffMultiplier"` + RetryableStatusCodes []string `json:"RetryableStatusCodes"` +} + +func (g *Sync) buildRetryPolicy() string { + var policy = map[string]interface{}{ + "methodConfig": []map[string]interface{}{ + { + "name": []map[string]string{ + {"service": "flagd.sync.v1.FlagSyncService"}, + }, + "retryPolicy": RetryPolicy{ + MaxAttempts: 3, + InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(), + MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(), + BackoffMultiplier: 2.0, + RetryableStatusCodes: []string{"UNKNOWN", "UNAVAILABLE"}, + }, + }, + }, + } + retryPolicyBytes, _ := json.Marshal(policy) + retryPolicy := string(retryPolicyBytes) + + return retryPolicy +} + +// Set of non-retryable gRPC status codes for faster lookup +var nonRetryableCodes map[codes.Code]struct{} + +// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup +func (g *Sync) initNonRetryableStatusCodesSet() { + nonRetryableCodes = make(map[codes.Code]struct{}) + + for _, codeStr := range g.FatalStatusCodes { + // Wrap the string in quotes to match the expected JSON format + jsonStr := fmt.Sprintf(`"%s"`, codeStr) + + var code codes.Code + if err := code.UnmarshalJSON([]byte(jsonStr)); err != nil { + g.Logger.Warn(fmt.Sprintf("unknown status code: %s, error: %v", codeStr, err)) + continue + } + + nonRetryableCodes[code] = struct{}{} + } +} diff --git a/providers/flagd/pkg/service/in_process/grpc_config_test.go b/providers/flagd/pkg/service/in_process/grpc_config_test.go new file mode 100644 index 000000000..5c5a586b7 --- /dev/null +++ b/providers/flagd/pkg/service/in_process/grpc_config_test.go @@ -0,0 +1,139 @@ +package process + +import ( + "encoding/json" + "github.com/open-feature/flagd/core/pkg/logger" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "strings" + "testing" +) + +func TestBuildRetryPolicy(t *testing.T) { + g := &Sync{ + RetryBackOffMs: 100, + RetryBackOffMaxMs: 500, + } + + result := g.buildRetryPolicy() + + // Unmarshal to check structure + var policy map[string]interface{} + if err := json.Unmarshal([]byte(result), &policy); err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + + methodConfig, ok := policy["methodConfig"].([]interface{}) + if !ok || len(methodConfig) == 0 { + t.Fatalf("methodConfig missing or empty") + } + + config := methodConfig[0].(map[string]interface{}) + retryPolicy, ok := config["retryPolicy"].(map[string]interface{}) + if !ok { + t.Fatalf("retryPolicy missing") + } + + if retryPolicy["MaxAttempts"].(float64) != 3 { + t.Errorf("MaxAttempts = %v; want 3", retryPolicy["MaxAttempts"]) + } + if retryPolicy["InitialBackoff"].(string) != "100ms" { + t.Errorf("InitialBackoff = %v; want 100ms", retryPolicy["InitialBackoff"]) + } + if retryPolicy["MaxBackoff"].(string) != "500ms" { + t.Errorf("MaxBackoff = %v; want 500ms", retryPolicy["MaxBackoff"]) + } + if retryPolicy["BackoffMultiplier"].(float64) != 2.0 { + t.Errorf("BackoffMultiplier = %v; want 2.0", retryPolicy["BackoffMultiplier"]) + } + codes := retryPolicy["RetryableStatusCodes"].([]interface{}) + expectedCodes := []string{"UNKNOWN", "UNAVAILABLE"} + for i, code := range expectedCodes { + if codes[i].(string) != code { + t.Errorf("RetryableStatusCodes[%d] = %v; want %v", i, codes[i], code) + } + } + + // Also check that the result is valid JSON and contains expected substrings + if !strings.Contains(result, `"MaxAttempts":3`) { + t.Error("Result does not contain MaxAttempts") + } + if !strings.Contains(result, `"InitialBackoff":"100ms"`) { + t.Error("Result does not contain InitialBackoff") + } + if !strings.Contains(result, `"MaxBackoff":"500ms"`) { + t.Error("Result does not contain MaxBackoff") + } + if !strings.Contains(result, `"RetryableStatusCodes":["UNKNOWN","UNAVAILABLE"]`) { + t.Error("Result does not contain RetryableStatusCodes") + } +} + +func TestSync_initNonRetryableStatusCodesSet(t *testing.T) { + tests := []struct { + name string + fatalStatusCodes []string + expectedCodes []codes.Code + notExpectedCodes []codes.Code + }{ + { + name: "valid status codes", + fatalStatusCodes: []string{"UNAVAILABLE", "INTERNAL", "DEADLINE_EXCEEDED"}, + expectedCodes: []codes.Code{codes.Unavailable, codes.Internal, codes.DeadlineExceeded}, + notExpectedCodes: []codes.Code{codes.OK, codes.Unknown}, + }, + { + name: "empty array", + fatalStatusCodes: []string{}, + expectedCodes: []codes.Code{}, + notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal}, + }, + { + name: "invalid status codes", + fatalStatusCodes: []string{"INVALID_CODE", "UNKNOWN_STATUS"}, + expectedCodes: []codes.Code{}, + notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal}, + }, + { + name: "mixed valid and invalid codes", + fatalStatusCodes: []string{"UNAVAILABLE", "INVALID_CODE", "INTERNAL"}, + expectedCodes: []codes.Code{codes.Unavailable, codes.Internal}, + notExpectedCodes: []codes.Code{codes.OK, codes.Unknown}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Reset the global map before each test + nonRetryableCodes = nil + + s := &Sync{ + FatalStatusCodes: tt.fatalStatusCodes, + Logger: &logger.Logger{ + Logger: zap.NewNop(), + }, + } + + s.initNonRetryableStatusCodesSet() + + // Verify expected codes are present + for _, code := range tt.expectedCodes { + if _, exists := nonRetryableCodes[code]; !exists { + t.Errorf("expected code %v to be in nonRetryableCodes, but it was not found", code) + } + } + + // Verify not expected codes are absent + for _, code := range tt.notExpectedCodes { + if _, exists := nonRetryableCodes[code]; exists { + t.Errorf("did not expect code %v to be in nonRetryableCodes, but it was found", code) + } + } + + // Verify the map size matches expected + if len(nonRetryableCodes) != len(tt.expectedCodes) { + t.Errorf("expected map size %d, got %d", len(tt.expectedCodes), len(nonRetryableCodes)) + } + }) + } +} diff --git a/providers/flagd/pkg/service/in_process/grpc_sync.go b/providers/flagd/pkg/service/in_process/grpc_sync.go index 9b6b93caa..66bd5403f 100644 --- a/providers/flagd/pkg/service/in_process/grpc_sync.go +++ b/providers/flagd/pkg/service/in_process/grpc_sync.go @@ -1,10 +1,13 @@ package process import ( - "buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc" - v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1" "context" "fmt" + msync "sync" + "time" + + "buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1" "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/sync" grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" @@ -12,52 +15,10 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/keepalive" - msync "sync" - "time" + "google.golang.org/grpc/status" ) -const ( - // Default timeouts and retry intervals - defaultKeepaliveTime = 30 * time.Second - defaultKeepaliveTimeout = 5 * time.Second - - retryPolicy = `{ - "methodConfig": [ - { - "name": [ - { - "service": "flagd.sync.v1.FlagSyncService" - } - ], - "retryPolicy": { - "MaxAttempts": 3, - "InitialBackoff": "1s", - "MaxBackoff": "5s", - "BackoffMultiplier": 2.0, - "RetryableStatusCodes": [ - "CANCELLED", - "UNKNOWN", - "INVALID_ARGUMENT", - "NOT_FOUND", - "ALREADY_EXISTS", - "PERMISSION_DENIED", - "RESOURCE_EXHAUSTED", - "FAILED_PRECONDITION", - "ABORTED", - "OUT_OF_RANGE", - "UNIMPLEMENTED", - "INTERNAL", - "UNAVAILABLE", - "DATA_LOSS", - "UNAUTHENTICATED" - ] - } - } - ] - }` -) - -// Type aliases for interfaces required by this component - needed for mock generation with gomock +// FlagSyncServiceClient Type aliases for interfaces required by this component - needed for mock generation with gomock type FlagSyncServiceClient interface { syncv1grpc.FlagSyncServiceClient } @@ -78,6 +39,10 @@ type Sync struct { Selector string URI string MaxMsgSize int + RetryGracePeriod int + RetryBackOffMs int + RetryBackOffMaxMs int + FatalStatusCodes []string // Runtime state client FlagSyncServiceClient @@ -92,6 +57,7 @@ type Sync struct { // Init initializes the gRPC connection and starts background monitoring func (g *Sync) Init(ctx context.Context) error { g.Logger.Info(fmt.Sprintf("initializing gRPC client for %s", g.URI)) + g.initNonRetryableStatusCodesSet() // Initialize channels g.shutdownComplete = make(chan struct{}) @@ -155,7 +121,7 @@ func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) { } dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) - dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(retryPolicy)) + dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(g.buildRetryPolicy())) return dialOptions, nil } @@ -213,12 +179,30 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return ctx.Err() } - g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err)) + // check for non-retryable errors during initialization, if found return with FATAL + if !g.IsReady() { + st, ok := status.FromError(err) + if ok { + if _, found := nonRetryableCodes[st.Code()]; found { + errStr := fmt.Sprintf("first sync cycle failed with non-retryable status: %v, "+ + "returning provider fatal.", st.Code().String()) + g.Logger.Error(errStr) + return &of.ProviderInitError{ + ErrorCode: of.ProviderFatalCode, + Message: errStr, + } + } + } + } g.sendEvent(ctx, SyncEvent{event: of.ProviderError}) if ctx.Err() != nil { return ctx.Err() } + + // Backoff before retrying + g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying after %d backoff...", err, g.RetryBackOffMaxMs)) + time.Sleep(time.Duration(g.RetryBackOffMaxMs) * time.Millisecond) } } } @@ -248,12 +232,6 @@ func (g *Sync) performSyncCycle(ctx context.Context, dataSync chan<- sync.DataSy // handleFlagSync processes messages from the sync stream with proper context handling func (g *Sync) handleFlagSync(ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { - // Mark as ready on first successful stream - g.initializer.Do(func() { - g.ready = true - g.Logger.Info("sync service is now ready") - }) - // Create channels for stream communication streamChan := make(chan *v1.SyncFlagsResponse, 1) errChan := make(chan error, 1) @@ -293,6 +271,11 @@ func (g *Sync) handleFlagSync(ctx context.Context, stream syncv1grpc.FlagSyncSer return err } + // Mark as ready on first successful stream + g.initializer.Do(func() { + g.ready = true + g.Logger.Info("sync service is now ready") + }) case err := <-errChan: return fmt.Errorf("stream error: %w", err) diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index 6cb6d0e1b..65c11bb0b 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -112,6 +112,9 @@ type Configuration struct { GrpcDialOptionsOverride []googlegrpc.DialOption CertificatePath string RetryGracePeriod int + RetryBackOffMs int + RetryBackOffMaxMs int + FatalStatusCodes []string } // EventSync interface for sync providers that support events @@ -569,6 +572,9 @@ func createSyncProvider(cfg Configuration, log *logger.Logger) (isync.ISync, str ProviderID: cfg.ProviderID, Selector: cfg.Selector, URI: uri, + FatalStatusCodes: cfg.FatalStatusCodes, + RetryBackOffMaxMs: cfg.RetryBackOffMaxMs, + RetryBackOffMs: cfg.RetryBackOffMs, }, uri } diff --git a/providers/flagd/pkg/service/in_process/service_grpc_test.go b/providers/flagd/pkg/service/in_process/service_grpc_test.go index a1e7c1c1b..4ecf8b2af 100644 --- a/providers/flagd/pkg/service/in_process/service_grpc_test.go +++ b/providers/flagd/pkg/service/in_process/service_grpc_test.go @@ -50,10 +50,12 @@ func TestInProcessProviderEvaluation(t *testing.T) { } inProcessService := NewInProcessService(Configuration{ - Host: host, - Port: port, - Selector: scope, - TLSEnabled: false, + Host: host, + Port: port, + Selector: scope, + TLSEnabled: false, + RetryBackOffMaxMs: 5000, + RetryBackOffMs: 1000, }) // when @@ -138,9 +140,11 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) { } inProcessService := NewInProcessService(Configuration{ - TargetUri: "envoy://localhost:9211/foo.service", - Selector: scope, - TLSEnabled: false, + TargetUri: "envoy://localhost:9211/foo.service", + Selector: scope, + TLSEnabled: false, + RetryBackOffMaxMs: 5000, + RetryBackOffMs: 1000, }) // when @@ -201,7 +205,6 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) { } } - // bufferedServer - a mock grpc service backed by buffered connection type bufferedServer struct { listener net.Listener diff --git a/tests/flagd/CHANGELOG.md b/tests/flagd/CHANGELOG.md index 1e07f6f2d..3f1c680ea 100644 --- a/tests/flagd/CHANGELOG.md +++ b/tests/flagd/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## [1.6.0](https://github.com/open-feature/go-sdk-contrib/compare/tests/flagd/v1.5.1...tests/flagd/v1.6.0) (2025-11-27) + + +### 🐛 Bug Fixes + +* **security:** update module github.com/docker/compose/v2 to v2.40.2 [security] ([#785](https://github.com/open-feature/go-sdk-contrib/issues/785)) ([805823f](https://github.com/open-feature/go-sdk-contrib/commit/805823f5ded2d81359fd7663804beb50f30d52f7)) + + +### ✨ New Features + +* **flagd:** add eventing with graceperiod for inprocess resolver ([#744](https://github.com/open-feature/go-sdk-contrib/issues/744)) ([a9fabb6](https://github.com/open-feature/go-sdk-contrib/commit/a9fabb623d22b6a1ef888722ffe68686031309b8)) + ## [1.5.1](https://github.com/open-feature/go-sdk-contrib/compare/tests/flagd/v1.5.0...tests/flagd/v1.5.1) (2025-10-30) diff --git a/tests/flagd/testframework/config_steps.go b/tests/flagd/testframework/config_steps.go index ddf7ac492..0bed8f1c1 100644 --- a/tests/flagd/testframework/config_steps.go +++ b/tests/flagd/testframework/config_steps.go @@ -17,8 +17,6 @@ var ignoredOptions = []string{ "deadlineMs", "streamDeadlineMs", "keepAliveTime", - "retryBackoffMs", - "retryBackoffMaxMs", "offlinePollIntervalMs", } diff --git a/tests/flagd/testframework/provider_steps.go b/tests/flagd/testframework/provider_steps.go index 165853b2e..8f2d5ed18 100644 --- a/tests/flagd/testframework/provider_steps.go +++ b/tests/flagd/testframework/provider_steps.go @@ -27,6 +27,9 @@ func InitializeProviderSteps(ctx *godog.ScenarioContext) { // Generic provider step definition - accepts any provider type including "stable" ctx.Step(`^a (\w+) flagd provider$`, withState1Arg((*TestState).createSpecializedFlagdProvider)) + + ctx.Step(`^the client should be in (\w+) state$`, + withState1Arg((*TestState).assertClientState)) } // State methods - these now expect context as first parameter after state @@ -111,6 +114,13 @@ func (s *TestState) simulateConnectionLoss(ctx context.Context, seconds int) err return s.Container.Restart(seconds) } +func (s *TestState) assertClientState(ctx context.Context, state string) error { + if string(s.Client.State()) == strings.ToUpper(state) { + return nil + } + return fmt.Errorf("expected client state %s but got %s", state, s.Client.State()) +} + // createSpecializedFlagdProvider creates specialized flagd providers based on type func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, providerType string) error { // Apply specialized configuration based on provider type @@ -128,7 +138,7 @@ func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, provider return fmt.Errorf("failed to create instance for %s provider: %w", providerType, err) } - if providerType != "unavailable" { + if providerType != "unavailable" && providerType != "forbidden" { if s.ProviderType == RPC { // Small delay to allow flagd server to fully load flags after connection time.Sleep(50 * time.Millisecond) @@ -150,6 +160,8 @@ func (s *TestState) applySpecializedConfig(providerType string) error { return nil case "unavailable": return s.configureUnavailableProvider() + case "forbidden": + return s.configureForbiddenProvider() case "socket": return s.configureSocketProvider() case "ssl", "tls": @@ -173,6 +185,12 @@ func (s *TestState) configureUnavailableProvider() error { return nil } +func (s *TestState) configureForbiddenProvider() error { + // Set an Envoy port which always responds with forbidden + s.addProviderOption("port", "Integer", "9212") + return nil +} + func (s *TestState) configureSocketProvider() error { // Configure for unix socket connection s.addProviderOption("socketPath", "String", "/tmp/flagd.sock") diff --git a/tests/flagd/testframework/testbed_runner.go b/tests/flagd/testframework/testbed_runner.go index 2db1006e1..f069bed3e 100644 --- a/tests/flagd/testframework/testbed_runner.go +++ b/tests/flagd/testframework/testbed_runner.go @@ -278,7 +278,12 @@ func (tr *TestbedRunner) buildProviderOptions(state TestState, resolverType Prov }) break } - + } + if option.Option == "port" { + if option.Value == "9212" { + option.Value = strconv.Itoa(tr.container.forbiddenPort) + state.ProviderOptions[i] = option + } } } opts = append(opts, state.GenerateOpts()...) diff --git a/tests/flagd/testframework/testcontainer.go b/tests/flagd/testframework/testcontainer.go index 6f393e0c5..627c3f4bc 100644 --- a/tests/flagd/testframework/testcontainer.go +++ b/tests/flagd/testframework/testcontainer.go @@ -23,6 +23,7 @@ type FlagdTestContainer struct { launchpadPort int healthPort int envoyPort int + forbiddenPort int } // Container config type moved to types.go @@ -87,6 +88,10 @@ func NewFlagdContainer(ctx context.Context, config FlagdContainerConfig) (*Flagd if err != nil { return nil, err } + forbiddenPort, err := getMappedPort(ctx, composeStack, envoy, "9212") + if err != nil { + return nil, err + } flagdContainer := &FlagdTestContainer{ container: composeStack, @@ -96,6 +101,7 @@ func NewFlagdContainer(ctx context.Context, config FlagdContainerConfig) (*Flagd healthPort: healthPort, envoyPort: envoyPort, launchpadURL: fmt.Sprintf("http://%s:%d", host, launchpadPort), + forbiddenPort: forbiddenPort, } // Additional wait time if configured diff --git a/tests/flagd/testframework/utils.go b/tests/flagd/testframework/utils.go index 1b5a7bfa6..7fb0dda28 100644 --- a/tests/flagd/testframework/utils.go +++ b/tests/flagd/testframework/utils.go @@ -78,6 +78,12 @@ func (vc *ValueConverter) ConvertToReflectValue(valueType, value string, fieldTy panic(fmt.Errorf("failed to convert %s to long: %w", value, err)) } return reflect.ValueOf(longVal).Convert(fieldType) + case "StringList": + splitVal := strings.Split(value, ",") + for i, v := range splitVal { + splitVal[i] = strings.TrimSpace(v) + } + return reflect.ValueOf(splitVal).Convert(fieldType) default: return reflect.ValueOf(value).Convert(fieldType) }