Skip to content

Commit

Permalink
feat: support provider paths under /var/run
Browse files Browse the repository at this point in the history
This allows the driver to check multiple paths when looking for a provider,
addressing kubernetes-sigs#823 as the semantically correct path is /var not /etc.

-additional-provider-volume-paths is added to so that providers that have not
migrated to the /var location will continue to operate.

In a future release when all supported providers are migrated to the /var path
the -additional-provider-volume-paths flag can be removed or changed to an
empty string.
  • Loading branch information
tam7t committed Feb 14, 2022
1 parent 40a6806 commit be5f7ee
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 48 deletions.
16 changes: 10 additions & 6 deletions cmd/secrets-store-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
_ "net/http/pprof" // #nosec
"os"
"strings"
"time"

secretsstorev1 "sigs.k8s.io/secrets-store-csi-driver/apis/v1"
Expand Down Expand Up @@ -51,11 +52,12 @@ import (
)

var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "secrets-store.csi.k8s.io", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
logFormatJSON = flag.Bool("log-format-json", false, "set log formatter to json")
providerVolumePath = flag.String("provider-volume", "/etc/kubernetes/secrets-store-csi-providers", "Volume path for provider")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "secrets-store.csi.k8s.io", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
logFormatJSON = flag.Bool("log-format-json", false, "set log formatter to json")
providerVolumePath = flag.String("provider-volume", "/etc/kubernetes/secrets-store-csi-providers", "Volume path for provider")
additionalProviderPaths = flag.String("additional-provider-volume-paths", "/var/run/secrets-store-csi-providers", "Comma separated list of additional paths to communicate with providers")
// this will be removed in a future release
metricsAddr = flag.String("metrics-addr", ":8095", "The address the metric endpoint binds to")
enableSecretRotation = flag.Bool("enable-secret-rotation", false, "Enable secret rotation feature [alpha]")
Expand Down Expand Up @@ -159,7 +161,9 @@ func main() {
ctx := withShutdownSignal(context.Background())

// create provider clients
providerClients := secretsstore.NewPluginClientBuilder(*providerVolumePath, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*maxCallRecvMsgSize)))
providerPaths := strings.Split(strings.TrimSpace(*additionalProviderPaths), ",")
providerPaths = append(providerPaths, *providerVolumePath)
providerClients := secretsstore.NewPluginClientBuilder(providerPaths, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*maxCallRecvMsgSize)))
defer providerClients.Cleanup()

// enable provider health check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ spec:
{{- end }}
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(KUBE_NODE_NAME)"
- "--provider-volume=C:\\k\\secrets-store-csi-providers"
- "--provider-volume={{ .Values.windows.providersDir }}"
- "--additional-provider-volume-paths={{ join "," .Values.linux.additionalProvidersDirs }}"
{{- if and (semverCompare ">= v0.0.9-0" .Values.windows.image.tag) .Values.minimumProviderVersions }}
- "--min-provider-version={{ .Values.minimumProviderVersions }}"
{{- end }}
Expand Down Expand Up @@ -131,7 +132,7 @@ spec:
- name: mountpoint-dir
mountPath: {{ .Values.windows.kubeletRootDir }}\pods
- name: providers-dir
mountPath: C:\k\secrets-store-csi-providers
mountPath: {{ .Values.windows.providersDir }}
{{- if .Values.windows.volumeMounts }}
{{- toYaml .Values.windows.volumeMounts | nindent 12}}
{{- end }}
Expand Down Expand Up @@ -176,6 +177,11 @@ spec:
hostPath:
path: {{ .Values.windows.providersDir }}
type: DirectoryOrCreate
{{- range $i, $path := .Values.windows.additionalProvidersDirs }}
- name: provider-dir-{{ .$i }}
hostPath: {{ .$path }}
type: DirectoryOrCreate
{{- end }}
{{- if .Values.windows.volumes }}
{{- toYaml .Values.windows.volumes | nindent 8}}
{{- end }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ spec:
{{- end }}
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(KUBE_NODE_NAME)"
- "--provider-volume=/etc/kubernetes/secrets-store-csi-providers"
- "--provider-volume={{ .Values.linux.providersDir }}"
- "--additional-provider-volume-paths={{ range "," .Values.linux.additionalProvidersDirs }}"
{{- if and (semverCompare ">= v0.0.8-0" .Values.linux.image.tag) .Values.minimumProviderVersions }}
- "--min-provider-version={{ .Values.minimumProviderVersions }}"
{{- end }}
Expand Down Expand Up @@ -134,7 +135,12 @@ spec:
mountPath: {{ .Values.linux.kubeletRootDir }}/pods
mountPropagation: Bidirectional
- name: providers-dir
mountPath: /etc/kubernetes/secrets-store-csi-providers
mountPath: {{ .Values.linux.providersDir }}
{{- range $i, $path := .Values.linux.additionalProvidersDirs }}
- name: provider-dir-{{ .$i }}
hostPath: {{ .$path }}
type: DirectoryOrCreate
{{- end }}
{{- if .Values.linux.volumeMounts }}
{{- toYaml .Values.linux.volumeMounts | nindent 12}}
{{- end }}
Expand Down
32 changes: 18 additions & 14 deletions manifest_staging/charts/secrets-store-csi-driver/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ linux:
repository: k8s.gcr.io/csi-secrets-store/driver
tag: v1.0.1
pullPolicy: IfNotPresent

crds:
image:
repository: k8s.gcr.io/csi-secrets-store/driver-crds
Expand All @@ -13,15 +13,15 @@ linux:
annotations: {}

## Prevent the CSI driver from being scheduled on virtual-kubelet nodes
affinity:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: type
operator: NotIn
values:
- virtual-kubelet
- matchExpressions:
- key: type
operator: NotIn
values:
- virtual-kubelet

driver:
resources:
Expand Down Expand Up @@ -61,14 +61,15 @@ linux:
cpu: 10m
memory: 20Mi


updateStrategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1

kubeletRootDir: /var/lib/kubelet
providersDir: /etc/kubernetes/secrets-store-csi-providers
additionalProvidersDirs:
- /var/run/secrets-store-csi-providers
nodeSelector: {}
tolerations: []
metricsAddr: ":8095"
Expand Down Expand Up @@ -97,15 +98,15 @@ windows:
pullPolicy: IfNotPresent

## Prevent the CSI driver from being scheduled on virtual-kubelet nodes
affinity:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: type
operator: NotIn
values:
- virtual-kubelet
- matchExpressions:
- key: type
operator: NotIn
values:
- virtual-kubelet

driver:
resources:
Expand Down Expand Up @@ -152,6 +153,9 @@ windows:

kubeletRootDir: C:\var\lib\kubelet
providersDir: C:\k\secrets-store-csi-providers
# TODO: keep this empy on windows?
additionalProvidersDirs:
- C:\k\secrets-store-csi-providers
nodeSelector: {}
tolerations: []
metricsAddr: ":8095"
Expand Down
2 changes: 1 addition & 1 deletion pkg/rotation/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func newTestReconciler(client client.Reader, s *runtime.Scheme, kubeClient kuber
return &Reconciler{
providerVolumePath: socketPath,
rotationPollInterval: rotationPollInterval,
providerClients: secretsstore.NewPluginClientBuilder(socketPath),
providerClients: secretsstore.NewPluginClientBuilder([]string{socketPath}),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
reporter: newStatsReporter(),
eventRecorder: fakeRecorder,
Expand Down
2 changes: 1 addition & 1 deletion pkg/secrets-store/nodeserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (

func testNodeServer(t *testing.T, tmpDir string, mountPoints []mount.MountPoint, client client.Client, reporter StatsReporter) (*nodeServer, error) {
t.Helper()
providerClients := NewPluginClientBuilder(tmpDir)
providerClients := NewPluginClientBuilder([]string{tmpDir})
return newNodeServer(tmpDir, "testnode", mount.NewFakeMounter(mountPoints), providerClients, client, client, reporter, k8s.NewTokenClient(fakeclient.NewSimpleClientset(), "test-driver", 1*time.Second))
}

Expand Down
35 changes: 23 additions & 12 deletions pkg/secrets-store/provider_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -72,11 +73,11 @@ var (
// PluginClientBuilder builds and stores grpc clients for communicating with
// provider plugins.
type PluginClientBuilder struct {
clients map[string]v1alpha1.CSIDriverProviderClient
conns map[string]*grpc.ClientConn
socketPath string
lock sync.RWMutex
opts []grpc.DialOption
clients map[string]v1alpha1.CSIDriverProviderClient
conns map[string]*grpc.ClientConn
socketPaths []string
lock sync.RWMutex
opts []grpc.DialOption
}

// NewPluginClientBuilder creates a PluginClientBuilder that will connect to
Expand All @@ -89,12 +90,12 @@ type PluginClientBuilder struct {
//
// Additional grpc dial options can also be set through opts and will be used
// when creating all clients.
func NewPluginClientBuilder(path string, opts ...grpc.DialOption) *PluginClientBuilder {
func NewPluginClientBuilder(paths []string, opts ...grpc.DialOption) *PluginClientBuilder {
return &PluginClientBuilder{
clients: make(map[string]v1alpha1.CSIDriverProviderClient),
conns: make(map[string]*grpc.ClientConn),
socketPath: path,
lock: sync.RWMutex{},
clients: make(map[string]v1alpha1.CSIDriverProviderClient),
conns: make(map[string]*grpc.ClientConn),
socketPaths: paths,
lock: sync.RWMutex{},
opts: append(opts, []grpc.DialOption{
grpc.WithInsecure(), // the interface is only secured through filesystem ACLs
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
Expand Down Expand Up @@ -124,12 +125,22 @@ func (p *PluginClientBuilder) Get(ctx context.Context, provider string) (v1alpha
return nil, fmt.Errorf("%w: provider %q", ErrInvalidProvider, provider)
}

if _, err := os.Stat(fmt.Sprintf("%s/%s.sock", p.socketPath, provider)); os.IsNotExist(err) {
// check all paths
socketPath := ""
for k := range p.socketPaths {
tryPath := filepath.Join(p.socketPaths[k], provider+".sock")
if _, err := os.Stat(tryPath); err == nil {
socketPath = tryPath
break
}
}

if socketPath == "" {
return nil, fmt.Errorf("%w: provider %q", ErrProviderNotFound, provider)
}

conn, err := grpc.Dial(
fmt.Sprintf("%s/%s.sock", p.socketPath, provider),
socketPath,
p.opts...,
)
if err != nil {
Expand Down
51 changes: 42 additions & 9 deletions pkg/secrets-store/provider_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestMountContent(t *testing.T) {
socketPath := tmpdir.New(t, "", "ut")
targetPath := tmpdir.New(t, "", "ut")

pool := NewPluginClientBuilder(socketPath)
pool := NewPluginClientBuilder([]string{socketPath})
defer pool.Cleanup()

server, cleanup := fakeServer(t, socketPath, "provider1")
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestMountContent_TooLarge(t *testing.T) {
targetPath := tmpdir.New(t, "", "ut")

// set a very small max message size
pool := NewPluginClientBuilder(socketPath, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(5)))
pool := NewPluginClientBuilder([]string{socketPath}, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(5)))
defer pool.Cleanup()

server, cleanup := fakeServer(t, socketPath, "provider1")
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestMountContentError(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
socketPath := tmpdir.New(t, "", "ut")

pool := NewPluginClientBuilder(socketPath)
pool := NewPluginClientBuilder([]string{socketPath})
defer pool.Cleanup()

providerName := "provider1"
Expand Down Expand Up @@ -365,7 +365,40 @@ func TestMountContentError(t *testing.T) {
func TestPluginClientBuilder(t *testing.T) {
path := tmpdir.New(t, "", "ut")

cb := NewPluginClientBuilder(path)
cb := NewPluginClientBuilder([]string{path})
ctx := context.Background()

for i := 0; i < 10; i++ {
server, cleanup := fakeServer(t, path, fmt.Sprintf("server-%d", i))
defer cleanup()
if err := server.Start(); err != nil {
t.Fatalf("expected err to be nil, got: %+v", err)
}
}

var wg sync.WaitGroup

for i := 0; i < 10; i++ {
wg.Add(1)
provider := fmt.Sprintf("server-%d", i)
go func() {
defer wg.Done()
if _, err := cb.Get(ctx, provider); err != nil {
t.Errorf("Get(%q) = %v, want nil", provider, err)
}
}()
}

wg.Wait()
}

func TestPluginClientBuilderMultiPath(t *testing.T) {
emptyPath := tmpdir.New(t, "", "ut")
path := tmpdir.New(t, "", "ut")

// Ensure that if the path containing the listening socket is not the first
// path checked that the operation still succeeds.
cb := NewPluginClientBuilder([]string{emptyPath, path})
ctx := context.Background()

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -395,7 +428,7 @@ func TestPluginClientBuilder(t *testing.T) {
func TestPluginClientBuilder_ConcurrentGet(t *testing.T) {
path := tmpdir.New(t, "", "ut")

cb := NewPluginClientBuilder(path)
cb := NewPluginClientBuilder([]string{path})
ctx := context.Background()

provider := "server"
Expand Down Expand Up @@ -423,7 +456,7 @@ func TestPluginClientBuilder_ConcurrentGet(t *testing.T) {
func TestPluginClientBuilderErrorNotFound(t *testing.T) {
path := tmpdir.New(t, "", "ut")

cb := NewPluginClientBuilder(path)
cb := NewPluginClientBuilder([]string{path})
ctx := context.Background()

if _, err := cb.Get(ctx, "notfoundprovider"); !errors.Is(err, ErrProviderNotFound) {
Expand All @@ -445,7 +478,7 @@ func TestPluginClientBuilderErrorNotFound(t *testing.T) {
func TestPluginClientBuilderErrorInvalid(t *testing.T) {
path := tmpdir.New(t, "", "ut")

cb := NewPluginClientBuilder(path)
cb := NewPluginClientBuilder([]string{path})
ctx := context.Background()

if _, err := cb.Get(ctx, "bad/provider/name"); !errors.Is(err, ErrInvalidProvider) {
Expand All @@ -468,7 +501,7 @@ func TestVersion(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
socketPath := tmpdir.New(t, "", "ut")

pool := NewPluginClientBuilder(socketPath)
pool := NewPluginClientBuilder([]string{socketPath})
defer pool.Cleanup()

server, cleanup := fakeServer(t, socketPath, "provider1")
Expand Down Expand Up @@ -499,7 +532,7 @@ func TestPluginClientBuilder_HealthCheck(t *testing.T) {
// HealthCheck() method work as expected
path := tmpdir.New(t, "", "ut")

cb := NewPluginClientBuilder(path)
cb := NewPluginClientBuilder([]string{path})
ctx := context.Background()
healthCheckInterval := 1 * time.Millisecond

Expand Down
2 changes: 1 addition & 1 deletion test/e2eprovider/e2e-provider-installer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ spec:
volumes:
- name: providervol
hostPath:
path: "/etc/kubernetes/secrets-store-csi-providers"
path: "/var/run/secrets-store-csi-providers"
nodeSelector:
kubernetes.io/os: linux

0 comments on commit be5f7ee

Please sign in to comment.