Skip to content

Commit

Permalink
Update logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed Jan 14, 2020
1 parent f3d5dc7 commit 512a8fe
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func Deletion(c DeleteConfig) error {
return nilIfGVKDeleted(err)
}

err = deleteResource(c.Name, client, cluster.GetServerVersion(c.ClientSet.DiscoveryClientCached))
err = deleteResource(c.Name, client, cluster.TryGetServerVersion(c.ClientSet.DiscoveryClientCached))
if err != nil {
return nilIfGVKDeleted(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/await/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (sia *serviceInitAwaiter) Await() error {
}
defer endpointWatcher.Stop()

version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached)
version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached)

timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentInputs, DefaultServiceTimeoutMins*60)
return sia.await(serviceWatcher, endpointWatcher, time.After(timeout), make(chan struct{}), version)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (sia *serviceInitAwaiter) Read() error {
endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached)
version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached)

return sia.read(service, endpointList, version)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func (v ServerVersion) Compare(version ServerVersion) int {
return res
}

// GetServerVersion attempts to retrieve the server version from k8s.
// TryGetServerVersion attempts to retrieve the server version from k8s.
// Returns the configured default version in case this fails.
func GetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion {
func TryGetServerVersion(cdi discovery.CachedDiscoveryInterface) ServerVersion {
defaultSV := ServerVersion{
Major: 1,
Minor: 14,
Expand Down
122 changes: 56 additions & 66 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ type kubeProvider struct {
suppressDeprecationWarnings bool
enableSecrets bool

clusterReachable bool // Kubernetes cluster is reachable
config *rest.Config // Cluster config, e.g., through $KUBECONFIG file.
clusterUnreachable bool // Kubernetes cluster is unreachable
config *rest.Config // Cluster config, e.g., through $KUBECONFIG file.

clientSet *clients.DynamicClientSet
logClient *clients.LogClient
Expand Down Expand Up @@ -317,17 +317,13 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ
k.suppressDeprecationWarnings = true
}

// Assume cluster is reachable. We will verify later in this function.
k.clusterReachable = true

var kubeconfig clientcmd.ClientConfig
if configJSON, ok := vars["kubernetes:config:kubeconfig"]; ok {
config, err := clientcmd.Load([]byte(configJSON))
if err != nil {
k.clusterReachable = false
glog.V(3).Infof(fmt.Sprintf("failed to parse kubeconfig data in "+
return nil, pkgerrors.Wrap(err, "failed to parse kubeconfig data in "+
"`kubernetes:config:kubeconfig`; this must be a YAML literal string and not "+
"a filename or path - %v", err))
"a filename or path")
} else {
kubeconfig = clientcmd.NewDefaultClientConfig(*config, overrides)
configurationNamespace, _, err := kubeconfig.Namespace()
Expand All @@ -349,36 +345,28 @@ func (k *kubeProvider) Configure(_ context.Context, req *pulumirpc.ConfigureRequ
k.defaultNamespace = defaultNamespace
}

if kubeconfig == nil {
k.clusterReachable = false
} else {
config, err := kubeconfig.ClientConfig()
if err != nil {
glog.V(3).Infof("unable to load Kubernetes client configuration from kubeconfig file: %v", err)
k.clusterReachable = false
} else {
k.config = config
}
config, err := kubeconfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("unable to load Kubernetes client configuration from kubeconfig file: %v", err)
}
k.config = config

if k.clusterReachable {
cs, err := clients.NewDynamicClientSet(k.config)
if err != nil {
return nil, err
}
k.clientSet = cs
cs, err := clients.NewDynamicClientSet(k.config)
if err != nil {
return nil, err
}
k.clientSet = cs

lc, err := clients.NewLogClient(k.config)
if err != nil {
return nil, err
}
k.logClient = lc
lc, err := clients.NewLogClient(k.config)
if err != nil {
return nil, err
}
k.logClient = lc

k.k8sVersion = cluster.GetServerVersion(cs.DiscoveryClientCached)
k.k8sVersion = cluster.TryGetServerVersion(cs.DiscoveryClientCached)

if _, err = k.getResources(); err != nil {
return nil, fmt.Errorf("unable to load schema information from the API server: %v", err)
}
if _, err = k.getResources(); err != nil {
return nil, fmt.Errorf("unable to load schema information from the API server: %v", err)
}

return &pulumirpc.ConfigureResponse{
Expand All @@ -393,14 +381,14 @@ func (k *kubeProvider) Invoke(ctx context.Context,
// Important: Some invoke logic is intended to run during preview, and the Kubernetes provider
// inputs may not have resolved yet. Rather than returning an error here, any invoke logic must
// not assume that a cluster is accessible.
if !k.clusterReachable {
if k.clusterUnreachable {
glog.V(3).Infof(
"configured Kubernetes cluster is unreachable. Invoke call logic may operate in a degraded state.")
}

// Always fail.
tok := req.GetTok()
return nil, fmt.Errorf("Unknown Invoke type '%s'", tok)
return nil, fmt.Errorf("unknown Invoke type '%s'", tok)
}

// StreamInvoke dynamically executes a built-in function in the provider. The result is streamed
Expand All @@ -411,7 +399,7 @@ func (k *kubeProvider) StreamInvoke(
// Important: Some invoke logic is intended to run during preview, and the Kubernetes provider
// inputs may not have resolved yet. Rather than returning an error here, any invoke logic must
// not assume that a cluster is accessible.
if !k.clusterReachable {
if k.clusterUnreachable {
glog.V(3).Infof(
"configured Kubernetes cluster is unreachable. StreamInvoke call logic may operate " +
"in a degraded state.")
Expand Down Expand Up @@ -714,7 +702,7 @@ func (k *kubeProvider) StreamInvoke(
}
}
default:
return fmt.Errorf("Unknown Invoke type '%s'", tok)
return fmt.Errorf("unknown Invoke type '%s'", tok)
}
}

Expand All @@ -736,10 +724,6 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (
// is given to it if it's not already provided.
//

if !k.clusterReachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

// Utilities for determining whether a resource's GVK exists.
gvkExists := func(gvk schema.GroupVersionKind) bool {
knownGVKs := sets.NewString()
Expand Down Expand Up @@ -848,11 +832,14 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (
return nil, err
}

if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed {
return nil, &kinds.RemovedApiError{GVK: gvk, Version: version}
}
if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) {
_ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk))
// Skip the API version check if the cluster is unreachable.
if !k.clusterUnreachable {
if removed, version := kinds.RemovedApiVersion(gvk, k.k8sVersion); removed {
return nil, &kinds.RemovedApiError{GVK: gvk, Version: version}
}
if !k.suppressDeprecationWarnings && kinds.DeprecatedApiVersion(gvk) {
_ = k.host.Log(ctx, diag.Warning, urn, gen.ApiVersionComment(gvk))
}
}

// If a default namespace is set on the provider for this resource, check if the resource has Namespaced
Expand All @@ -877,7 +864,7 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (

// HACK: Do not validate against OpenAPI spec if there is a computed value. The OpenAPI spec
// does not know how to deal with the placeholder values for computed values.
if !hasComputedValue(newInputs) {
if !hasComputedValue(newInputs) && !k.clusterUnreachable {
resources, err := k.getResources()
if err != nil {
return nil, pkgerrors.Wrapf(err, "Failed to fetch OpenAPI schema from the API server")
Expand Down Expand Up @@ -948,10 +935,6 @@ func (k *kubeProvider) Diff(
// (which is not true of the old computed values).
//

if !k.clusterReachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Diff(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)
Expand Down Expand Up @@ -1011,11 +994,14 @@ func (k *kubeProvider) Diff(
oldInputs.SetGroupVersionKind(gvk)
}

supportsDryRun, err := openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk)
if err != nil {
return nil, pkgerrors.Wrapf(err,
"Failed to check for changes in resource %s because of an error communicating with the API server",
fqObjName(newInputs))
supportsDryRun := false
if !k.clusterUnreachable {
supportsDryRun, err = openapi.SupportsDryRun(k.clientSet.DiscoveryClientCached, gvk)
if err != nil {
return nil, pkgerrors.Wrapf(err,
"Failed to check for changes in resource %s because of an error communicating with the API server",
fqObjName(newInputs))
}
}

var patch []byte
Expand Down Expand Up @@ -1144,7 +1130,7 @@ func (k *kubeProvider) Create(
// comments in those methods for details.
//

if !k.clusterReachable {
if k.clusterUnreachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

Expand Down Expand Up @@ -1271,14 +1257,16 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
// comments in those methods for details.
//

if !k.clusterReachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Read(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// If the cluster is unreachable, consider the resource deleted and inform the user.
if k.clusterUnreachable {
_ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable"))
return deleteResponse, nil
}

// Obtain new properties, create a Kubernetes `unstructured.Unstructured` that we can pass to the
// validation routines.
oldState, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{
Expand Down Expand Up @@ -1480,7 +1468,7 @@ func (k *kubeProvider) Update(
// - [ ] Support server-side apply, when it comes out.
//

if !k.clusterReachable {
if k.clusterUnreachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

Expand Down Expand Up @@ -1599,14 +1587,16 @@ func (k *kubeProvider) Delete(
ctx context.Context, req *pulumirpc.DeleteRequest,
) (*pbempty.Empty, error) {

if !k.clusterReachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Delete(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// If the cluster is unreachable, consider the resource deleted and inform the user.
if k.clusterUnreachable {
_ = k.host.Log(ctx, diag.Warning, urn, fmt.Sprintf("configured Kubernetes cluster is unreachable"))
return &pbempty.Empty{}, nil
}

// TODO(hausdorff): Propagate other options, like grace period through flags.

// Obtain new properties, create a Kubernetes `unstructured.Unstructured`.
Expand Down

0 comments on commit 512a8fe

Please sign in to comment.