Skip to content

Commit

Permalink
Gracefully handle unreachable k8s cluster (#946)
Browse files Browse the repository at this point in the history
Previously, the provider erroneously expected that the default provider pointed to a functioning Kubernetes cluster. This led to unexpected failures in cases where this wasn't true, such as the user manually setting the kubeconfig value for the stack to an invalid value. This change explicitly checks for a valid configuration, and falls back to a degraded state if this check fails. This still allows invoke logic to run during previews without requiring an active k8s cluster.
  • Loading branch information
lblackstone committed Jan 16, 2020
1 parent cb8c50b commit 56ef622
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

### Bug fixes

- Gracefully handle unreachable k8s cluster. (https://github.com/pulumi/pulumi-kubernetes/pull/946).
- Fix deprecation notice for CSINode. (https://github.com/pulumi/pulumi-kubernetes/pull/944).

## 1.4.3 (January 8, 2020)
Expand Down
2 changes: 1 addition & 1 deletion pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func Deletion(c DeleteConfig) error {
return nilIfGVKDeleted(err)
}

err = deleteResource(c.Name, client, cluster.GetServerVersion(c.ClientSet.DiscoveryClientCached))
err = deleteResource(c.Name, client, cluster.TryGetServerVersion(c.ClientSet.DiscoveryClientCached))
if err != nil {
return nilIfGVKDeleted(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (sia *serviceInitAwaiter) Await() error {
}
defer endpointWatcher.Stop()

version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached)
version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached)

timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentInputs, DefaultServiceTimeoutMins*60)
return sia.await(serviceWatcher, endpointWatcher, time.After(timeout), make(chan struct{}), version)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (sia *serviceInitAwaiter) Read() error {
endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached)
version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached)

return sia.read(service, endpointList, version)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func (v ServerVersion) Compare(version ServerVersion) int {
return res
}

// GetServerVersion attempts to retrieve the server version from k8s.
// TryGetServerVersion attempts to retrieve the server version from k8s.
// Returns the configured default version in case this fails.
func GetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion {
func TryGetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion {
defaultSV := ServerVersion{
Major: 1,
Minor: 14,
Expand Down
139 changes: 98 additions & 41 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ type kubeProvider struct {
suppressDeprecationWarnings bool
enableSecrets bool

config *rest.Config // Cluster config, e.g., through $KUBECONFIG file.
clusterUnreachable bool // Kubernetes cluster is unreachable
config *rest.Config // Cluster config, e.g., through $KUBECONFIG file.

clientSet *clients.DynamicClientSet
logClient *clients.LogClient
Expand Down Expand Up @@ -320,14 +321,19 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ
if configJSON, ok := vars["kubernetes:config:kubeconfig"]; ok {
config, err := clientcmd.Load([]byte(configJSON))
if err != nil {
return nil, pkgerrors.Wrap(err, "failed to parse kubeconfig data in "+
// Rather than erroring out here, mark the cluster as unreachable and conditionally bail out on
// operations that require a valid cluster. This will allow us to perform invoke operations
// using the default provider.
k.clusterUnreachable = true
glog.V(3).Infof(fmt.Sprintf("failed to parse kubeconfig data in "+
"`kubernetes:config:kubeconfig`; this must be a YAML literal string and not "+
"a filename or path")
}
kubeconfig = clientcmd.NewDefaultClientConfig(*config, overrides)
configurationNamespace, _, err := kubeconfig.Namespace()
if err == nil {
k.defaultNamespace = configurationNamespace
"a filename or path - %v", err))
} else {
kubeconfig = clientcmd.NewDefaultClientConfig(*config, overrides)
configurationNamespace, _, err := kubeconfig.Namespace()
if err == nil {
k.defaultNamespace = configurationNamespace
}
}
} else {
// Use client-go to resolve the final configuration values for the client. Typically these
Expand All @@ -343,28 +349,31 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ
k.defaultNamespace = defaultNamespace
}

config, err := kubeconfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("unable to load Kubernetes client configuration from kubeconfig file: %v", err)
}
k.config = config
if !k.clusterUnreachable {
config, err := kubeconfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("unable to load Kubernetes client configuration from kubeconfig file: %v", err)
}
k.config = config

cs, err := clients.NewDynamicClientSet(k.config)
if err != nil {
return nil, err
}
k.clientSet = cs
cs, err := clients.NewDynamicClientSet(k.config)
if err != nil {
return nil, err
}
k.clientSet = cs

lc, err := clients.NewLogClient(k.config)
if err != nil {
return nil, err
}
k.logClient = lc
lc, err := clients.NewLogClient(k.config)
if err != nil {
return nil, err
}
k.logClient = lc

k.k8sVersion = cluster.GetServerVersion(cs.DiscoveryClientCached)
k.k8sVersion = cluster.TryGetServerVersion(cs.DiscoveryClientCached)

if _, err = k.getResources(); err != nil {
return nil, fmt.Errorf("unable to load schema information from the API server: %v", err)
if _, err = k.getResources(); err != nil {
k.clusterUnreachable = true
glog.V(3).Infof("unable to load schema information from the API server: %v", err)
}
}

return &pulumirpc.ConfigureResponse{
Expand All @@ -376,16 +385,24 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ
func (k *kubeProvider) Invoke(ctx context.Context,
req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {

// Important: Some invoke logic is intended to run during preview, and the Kubernetes provider
// inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check
// k.clusterUnreachable and handle that condition appropriately.

// Always fail.
tok := req.GetTok()
return nil, fmt.Errorf("Unknown Invoke type '%s'", tok)
return nil, fmt.Errorf("unknown Invoke type '%s'", tok)
}

// StreamInvoke dynamically executes a built-in function in the provider. The result is streamed
// back as a series of messages.
func (k *kubeProvider) StreamInvoke(
req *pulumirpc.InvokeRequest, server pulumirpc.ResourceProvider_StreamInvokeServer) error {

// Important: Some invoke logic is intended to run during preview, and the Kubernetes provider
// inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check
// k.clusterUnreachable and handle that condition appropriately.

// Unmarshal arguments.
tok := req.GetTok()
label := fmt.Sprintf("%s.StreamInvoke(%s)", k.label(), tok)
Expand Down Expand Up @@ -414,13 +431,17 @@ func (k *kubeProvider) StreamInvoke(
// expected to never terminate, and users of the various SDKs need a way to tell the
// provider to stop streaming and reclaim the resources associated with the stream.
//
// Still, we implement this cancellation also for `list`, primarily for coompleteness. We'd
// Still, we implement this cancellation also for `list`, primarily for completeness. We'd
// like to avoid an unpleasant and non-actionable error that would appear on a `Send` on a
// client that is no longer accepting requests. This also helps to guard against the
// possibility that some dark corner of gRPC signals cancellation by accident, e.g., during
// shutdown.
//

if k.clusterUnreachable {
return fmt.Errorf("configured Kubernetes cluster is unreachable")
}

namespace := ""
if args["namespace"].HasValue() {
namespace = args["namespace"].StringValue()
Expand Down Expand Up @@ -511,6 +532,10 @@ func (k *kubeProvider) StreamInvoke(
// Set up resource watcher.
//

if k.clusterUnreachable {
return fmt.Errorf("configured Kubernetes cluster is unreachable")
}

namespace := ""
if args["namespace"].HasValue() {
namespace = args["namespace"].StringValue()
Expand Down Expand Up @@ -591,6 +616,10 @@ func (k *kubeProvider) StreamInvoke(
// Set up log stream for Pod.
//

if k.clusterUnreachable {
return fmt.Errorf("configured Kubernetes cluster is unreachable")
}

namespace := "default"
if args["namespace"].HasValue() {
namespace = args["namespace"].StringValue()
Expand Down Expand Up @@ -683,7 +712,7 @@ func (k *kubeProvider) StreamInvoke(
}
}
default:
return fmt.Errorf("Unknown Invoke type '%s'", tok)
return fmt.Errorf("unknown Invoke type '%s'", tok)
}
}

Expand Down Expand Up @@ -813,11 +842,14 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (
return nil, err
}

if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed {
return nil, &kinds.RemovedApiError{GVK: gvk, Version: version}
}
if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) {
_ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk))
// Skip the API version check if the cluster is unreachable.
if !k.clusterUnreachable {
if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed {
return nil, &kinds.RemovedApiError{GVK: gvk, Version: version}
}
if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) {
_ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk))
}
}

// If a default namespace is set on the provider for this resource, check if the resource has Namespaced
Expand All @@ -842,7 +874,7 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (

// HACK: Do not validate against OpenAPI spec if there is a computed value. The OpenAPI spec
// does not know how to deal with the placeholder values for computed values.
if !hasComputedValue(newInputs) {
if !hasComputedValue(newInputs) && !k.clusterUnreachable {
resources, err := k.getResources()
if err != nil {
return nil, pkgerrors.Wrapf(err, "Failed to fetch OpenAPI schema from the API server")
Expand Down Expand Up @@ -972,11 +1004,14 @@ func (k *kubeProvider) Diff(
oldInputs.SetGroupVersionKind(gvk)
}

supportsDryRun, err := openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk)
if err != nil {
return nil, pkgerrors.Wrapf(err,
"Failed to check for changes in resource %s because of an error communicating with the API server",
fqObjName(newInputs))
supportsDryRun := false
if !k.clusterUnreachable {
supportsDryRun, err = openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk)
if err != nil {
return nil, pkgerrors.Wrapf(err,
"Failed to check for changes in resource %s because of an error communicating with the API server",
fqObjName(newInputs))
}
}

var patch []byte
Expand Down Expand Up @@ -1108,6 +1143,11 @@ func (k *kubeProvider) Create(
label := fmt.Sprintf("%s.Create(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// Create requires a connection to a k8s cluster, so bail out immediately if it is unreachable.
if k.clusterUnreachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

// Parse inputs
newResInputs, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.properties", label),
Expand Down Expand Up @@ -1231,6 +1271,12 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
label := fmt.Sprintf("%s.Read(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// If the cluster is unreachable, consider the resource deleted and inform the user.
if k.clusterUnreachable {
_ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable"))
return deleteResponse, nil
}

// Obtain new properties, create a Kubernetes `unstructured.Unstructured` that we can pass to the
// validation routines.
oldState, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{
Expand Down Expand Up @@ -1431,11 +1477,15 @@ func (k *kubeProvider) Update(
// discovery client is completely dynamic.)
// - [ ] Support server-side apply, when it comes out.
//

urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Update(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// Update requires a connection to a k8s cluster, so bail out immediately if it is unreachable.
if k.clusterUnreachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

// Obtain old properties, create a Kubernetes `unstructured.Unstructured`.
oldState, err := plugin.UnmarshalProperties(req.GetOlds(), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.olds", label), KeepUnknowns: true, SkipNulls: true, KeepSecrets: true,
Expand Down Expand Up @@ -1546,10 +1596,17 @@ func (k *kubeProvider) Update(
func (k *kubeProvider) Delete(
ctx context.Context, req *pulumirpc.DeleteRequest,
) (*pbempty.Empty, error) {

urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Delete(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// If the cluster is unreachable, consider the resource deleted and inform the user.
if k.clusterUnreachable {
_ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable"))
return &pbempty.Empty{}, nil
}

// TODO(hausdorff): Propagate other options, like grace period through flags.

// Obtain new properties, create a Kubernetes `unstructured.Unstructured`.
Expand Down

0 comments on commit 56ef622

Please sign in to comment.