Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: add optional healtcheck for provider plugins
Signed-off-by: Anish Ramasekar <anish.ramasekar@gmail.com>
  • Loading branch information
aramase committed Apr 13, 2021
1 parent 73b7af5 commit 7a2321a
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cmd/secrets-store-csi-driver/main.go
Expand Up @@ -64,6 +64,10 @@ var (
// This feature flag will be enabled by default after n+2 releases giving time for users to label all their existing credential secrets.
filteredWatchSecret = flag.Bool("filtered-watch-secret", false, "enable filtered watch for NodePublishSecretRef secrets with label secrets-store.csi.k8s.io/used=true")

// Enable optional healthcheck for provider clients that exist in memory
providerHealthCheck = flag.Bool("provider-health-check", false, "Enable health check for configured providers")
providerHealthCheckInterval = flag.Duration("provider-health-check-interval", 2*time.Minute, "Provider healthcheck interval duration")

scheme = runtime.NewScheme()
)

Expand Down Expand Up @@ -147,6 +151,12 @@ func main() {
providerClients := secretsstore.NewPluginClientBuilder(*providerVolumePath)
defer providerClients.Cleanup()

// enable provider health check
if *providerHealthCheck {
klog.InfoS("provider health check enabled", "interval", *providerHealthCheckInterval)
go providerClients.HealthCheck(ctx, *providerHealthCheckInterval)
}

go func() {
klog.Infof("starting manager")
if err := mgr.Start(ctx); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions manifest_staging/charts/secrets-store-csi-driver/README.md
Expand Up @@ -83,3 +83,5 @@ The following table lists the configurable parameters of the csi-secrets-store-p
| `enableSecretRotation` | Enable secret rotation feature [alpha] | `false` |
| `rotationPollInterval` | Secret rotation poll interval duration | `"120s"` |
| `filteredWatchSecret` | Enable filtered watch for NodePublishSecretRef secrets with label `secrets-store.csi.k8s.io/used=true` | `false` |
| `providerHealthCheck` | Enable health check for configured providers | `false` |
| `providerHealthCheckInterval` | Provider healthcheck interval duration | `2m` |
Expand Up @@ -76,6 +76,12 @@ spec:
{{- if and (semverCompare ">= v0.0.21-0" .Values.windows.image.tag) .Values.filteredWatchSecret }}
- "--filtered-watch-secret={{ .Values.filteredWatchSecret }}"
{{- end }}
{{- if and (semverCompare ">= v0.0.22-0" .Values.windows.image.tag) .Values.providerHealthCheck }}
- "--provider-health-check={{ .Values.providerHealthCheck }}"
{{- end }}
{{- if and (semverCompare ">= v0.0.22-0" .Values.windows.image.tag) .Values.providerHealthCheckInterval }}
- "--provider-health-check-interval={{ .Values.providerHealthCheckInterval }}"
{{- end }}
env:
{{- with .Values.windows.env }}
{{- toYaml . | nindent 10 }}
Expand Down
Expand Up @@ -76,6 +76,12 @@ spec:
{{- if and (semverCompare ">= v0.0.21-0" .Values.linux.image.tag) .Values.filteredWatchSecret }}
- "--filtered-watch-secret={{ .Values.filteredWatchSecret }}"
{{- end }}
{{- if and (semverCompare ">= v0.0.22-0" .Values.linux.image.tag) .Values.providerHealthCheck }}
- "--provider-health-check={{ .Values.providerHealthCheck }}"
{{- end }}
{{- if and (semverCompare ">= v0.0.22-0" .Values.linux.image.tag) .Values.providerHealthCheckInterval }}
- "--provider-health-check-interval={{ .Values.providerHealthCheckInterval }}"
{{- end }}
env:
{{- with .Values.linux.env }}
{{- toYaml . | nindent 10 }}
Expand Down
6 changes: 6 additions & 0 deletions manifest_staging/charts/secrets-store-csi-driver/values.yaml
Expand Up @@ -152,3 +152,9 @@ rotationPollInterval:

## Filtered watch nodePublishSecretRef secrets
filteredWatchSecret: false

## Provider HealthCheck
providerHealthCheck: false

## Provider HealthCheck interval
providerHealthCheckInterval: 2m
2 changes: 2 additions & 0 deletions manifest_staging/deploy/secrets-store-csi-driver-windows.yaml
Expand Up @@ -51,6 +51,8 @@ spec:
- "--enable-secret-rotation=false"
- "--rotation-poll-interval=2m"
- "--filtered-watch-secret=false"
- "--provider-health-check=false"
- "--provider-health-check-interval=2m"
env:
- name: CSI_ENDPOINT
value: unix://C:\\csi\\csi.sock
Expand Down
2 changes: 2 additions & 0 deletions manifest_staging/deploy/secrets-store-csi-driver.yaml
Expand Up @@ -51,6 +51,8 @@ spec:
- "--enable-secret-rotation=false"
- "--rotation-poll-interval=2m"
- "--filtered-watch-secret=false"
- "--provider-health-check=false"
- "--provider-health-check-interval=2m"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
Expand Down
47 changes: 47 additions & 0 deletions pkg/secrets-store/provider_client.go
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"regexp"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -154,6 +155,38 @@ func (p *PluginClientBuilder) Cleanup() {
p.conns = make(map[string]*grpc.ClientConn)
}

// HealthCheck enables periodic healthcheck for configured provider clients by making
// a Version() RPC call. If the provider healthcheck fails, we log an error.
//
// This method blocks until the parent context is cancelled during termination.
func (p *PluginClientBuilder) HealthCheck(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.lock.RLock()

for provider, client := range p.clients {
c, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

runtimeVersion, err := Version(c, client)
if err != nil {
klog.V(4).ErrorS(err, "provider healthcheck failed", "provider", provider)
continue
}
klog.V(4).InfoS("provider healthcheck successful", "provider", provider, "runtimeVersion", runtimeVersion)
}

p.lock.RUnlock()
}
}
}

// MountContent calls the client's Mount() RPC with helpers to format the
// request and interpret the response.
func MountContent(ctx context.Context, client v1alpha1.CSIDriverProviderClient, attributes, secrets, targetPath, permission string, oldObjectVersions map[string]string) (map[string]string, string, error) {
Expand Down Expand Up @@ -208,3 +241,17 @@ func MountContent(ctx context.Context, client v1alpha1.CSIDriverProviderClient,

return objectVersions, "", nil
}

// Version calls the client's Version() RPC
// returns provider runtime version and error.
func Version(ctx context.Context, client v1alpha1.CSIDriverProviderClient) (string, error) {
req := &v1alpha1.VersionRequest{
Version: "v1alpha1",
}

resp, err := client.Version(ctx, req)
if err != nil {
return "", err
}
return resp.RuntimeVersion, nil
}
72 changes: 72 additions & 0 deletions pkg/secrets-store/provider_client_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"reflect"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"

Expand Down Expand Up @@ -334,3 +335,74 @@ func TestPluginClientBuilderErrorInvalid(t *testing.T) {
t.Errorf("Get(%s) = %v, want %v", "bad/provider/name", err, ErrInvalidProvider)
}
}

func TestVersion(t *testing.T) {
cases := []struct {
name string
expectedRuntimeVersion string
}{
{
name: "provider version successful response",
expectedRuntimeVersion: "0.0.10",
},
}

for _, test := range cases {
t.Run(test.name, func(t *testing.T) {
socketPath := tmpdir.New(t, "", "ut")

pool := NewPluginClientBuilder(socketPath)
defer pool.Cleanup()

server, cleanup := fakeServer(t, socketPath, "provider1")
defer cleanup()

server.Start()

client, err := pool.Get(context.Background(), "provider1")
if err != nil {
t.Fatalf("expected err to be nil, got: %+v", err)
}

runtimeVersion, err := Version(context.TODO(), client)
if err != nil {
t.Errorf("expected err to be nil, got: %+v", err)
}
if test.expectedRuntimeVersion != runtimeVersion {
t.Errorf("expected version: %s, got: %s", test.expectedRuntimeVersion, runtimeVersion)
}
})
}
}

func TestPluginClientBuilder_HealthCheck(t *testing.T) {
// this test asserts the read lock and unlock semantics in the
// HealthCheck() method work as expected
path := tmpdir.New(t, "", "ut")

cb := NewPluginClientBuilder(path)
ctx := context.Background()
healthCheckInterval := 2 * time.Second

provider := "server"
server, cleanup := fakeServer(t, path, provider)
defer cleanup()
server.Start()

// run the provider healthcheck
go cb.HealthCheck(ctx, healthCheckInterval)
var wg sync.WaitGroup

// try a concurrent get with the healthcheck running in the background
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if _, err := cb.Get(ctx, provider); err != nil {
t.Errorf("Get(%q) = %v, want nil", provider, err)
}
}()
}

wg.Wait()
}

0 comments on commit 7a2321a

Please sign in to comment.