Skip to content

Commit

Permalink
Revert #1596 (#1636)
Browse files Browse the repository at this point in the history
* Revert #1596

* Add changelog entry for 3.4.1
  • Loading branch information
Vivek Lakshmanan committed Jun 25, 2021
1 parent 0fa6631 commit 9e7f108
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 143 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
- Update pulumi dependencies v3.5.1 (https://github.com/pulumi/pulumi-kubernetes/pull/1623)
- Skip cluster connectivity check in yamlRenderMode (https://github.com/pulumi/pulumi-kubernetes/pull/1629)
- Handle different namespaces for server-side diff (https://github.com/pulumi/pulumi-kubernetes/pull/1631)
- *Revert* Fix hanging updates for deployment await logic (https://github.com/pulumi/pulumi-kubernetes/pull/1596)

## 3.4.1 (June 24, 2021)
- *Revert* Fix hanging updates for deployment await logic (https://github.com/pulumi/pulumi-kubernetes/pull/1596)

## 3.4.0 (June 17, 2021)

Expand Down
158 changes: 24 additions & 134 deletions provider/pkg/await/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package await
import (
"context"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -145,90 +144,40 @@ func (dia *deploymentInitAwaiter) Await() error {
// because it doesn't do a rollout (i.e., it simply creates the Deployment and
// corresponding ReplicaSet), and therefore there is no rollout to mark as "Progressing".
//

deploymentClient, replicaSetClient, podClient, pvcClient, err := dia.makeClients()
if err != nil {
return err
}

// Create Deployment watcher starting from the deployment's resource version, if set.
deploymentWatcher, err := deploymentClient.Watch(context.TODO(), metav1.ListOptions{
ResourceVersion: dia.deployment.GetResourceVersion(),
})
// If the required resource version is no longer available, reset tracked deployment state to the latest.
if apierrors.IsResourceExpired(err) || apierrors.IsGone(err) {
var deployment *unstructured.Unstructured
// Get live versions of Deployment again.
deployment, err = deploymentClient.Get(context.TODO(),
dia.config.currentInputs.GetName(),
metav1.GetOptions{})
if err != nil {
return err
}
dia.processDeploymentEvent(watchAddedEvent(deployment))
deploymentWatcher, err = deploymentClient.Watch(context.TODO(), metav1.ListOptions{
ResourceVersion: dia.deployment.GetResourceVersion(), // dia.deployment was reset in processDeploymentEvent.
})
}
// Create Deployment watcher.
deploymentWatcher, err := deploymentClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err, "could not set up watch for Deployment object %q",
dia.config.currentInputs.GetName())
}
defer deploymentWatcher.Stop()

var rsVersion string
for _, rs := range dia.replicaSets {
if rsVersion == "" || rsVersion < rs.GetResourceVersion() {
rsVersion = rs.GetResourceVersion()
}
}

// Create ReplicaSet watcher starting from the most recent version for known replicasets.
var rsListOptions metav1.ListOptions
if rsVersion != "" {
rsListOptions.ResourceVersion = rsVersion
}

replicaSetWatcher, err := replicaSetClient.Watch(context.TODO(), rsListOptions)
// Create ReplicaSet watcher.
replicaSetWatcher, err := replicaSetClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for ReplicaSet objects associated with Deployment %q",
dia.config.currentInputs.GetName())
}
defer replicaSetWatcher.Stop()

var podVersion string
for _, pod := range dia.pods {
if podVersion == "" || podVersion < pod.GetResourceVersion() {
podVersion = pod.GetResourceVersion()
}
}

// Create Pod watcher starting from the most recent version for known pods if we have any.
var podListOptions metav1.ListOptions
if podVersion != "" {
podListOptions.ResourceVersion = podVersion
}
podWatcher, err := podClient.Watch(context.TODO(), podListOptions)
// Create Pod watcher.
podWatcher, err := podClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for Pods objects associated with Deployment %q",
dia.config.currentInputs.GetName())
}
defer podWatcher.Stop()

var pvcVersion string
for _, pvc := range dia.pvcs {
if pvcVersion == "" || pvcVersion < pvc.GetResourceVersion() {
pvcVersion = pvc.GetResourceVersion()
}
}

var pvcListOptions metav1.ListOptions
if pvcVersion != "" {
pvcListOptions.ResourceVersion = pvcVersion
}
// Create PersistentVolumeClaims watcher starting from the most recent version for known PVCs.
pvcWatcher, err := pvcClient.Watch(context.TODO(), pvcListOptions)
// Create PersistentVolumeClaims watcher.
pvcWatcher, err := pvcClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for PersistentVolumeClaims objects associated with Deployment %q",
Expand Down Expand Up @@ -374,14 +323,6 @@ func (dia *deploymentInitAwaiter) await(
}
}

func (dia *deploymentInitAwaiter) isEveryPVCReady() bool {
if len(dia.pvcs) == 0 || (len(dia.pvcs) > 0 && dia.pvcsAvailable) {
return true
}

return false
}

// Check whether we've succeeded, log the result as a status message to the provider. There are two
// cases:
//
Expand All @@ -392,31 +333,18 @@ func (dia *deploymentInitAwaiter) isEveryPVCReady() bool {
// rolled out. This means there is no rollout to be marked as "progressing", so we need only
// check that the Deployment was created, and the corresponding ReplicaSet needs to be marked
// available.
func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
rs, updatedReplicaSetCreated := dia.replicaSets[dia.replicaSetGeneration]
if updatedReplicaSetCreated {
logger.V(9).Infof("Replicaset %s for deployment: %s generation: %s check - [RS available: %v], [RS ready: %v], [deployment available: %v]",
rs.GetName(),
dia.deployment.GetName(),
dia.replicaSetGeneration,
dia.replicaSetAvailable,
dia.updatedReplicaSetReady,
dia.deploymentAvailable)
} else {
logger.V(9).Infof("No replicaset at generation %s for deployment: %s. [RS available: %v], [RS ready: %v], [deployment available: %v]",
dia.replicaSetGeneration,
dia.deployment.GetName(),
dia.replicaSetAvailable,
dia.updatedReplicaSetReady,
dia.deploymentAvailable)
func (dia *deploymentInitAwaiter) isEveryPVCReady() bool {
if len(dia.pvcs) == 0 || (len(dia.pvcs) > 0 && dia.pvcsAvailable) {
return true
}

return false
}

func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
if dia.replicaSetGeneration == "1" {
if dia.deploymentAvailable && dia.updatedReplicaSetReady {
if !dia.isEveryPVCReady() {
logger.V(9).Infof("Replicaset %s for deployment: %s generation: %s PVC check failed",
rs.GetName(),
dia.deployment.GetName(),
dia.replicaSetGeneration)
return false
}

Expand All @@ -435,11 +363,11 @@ func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
return true
}
}

return false
}

func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
logger.V(9).Infof("Processing event for deployment: %s", dia.deployment.GetName())
inputDeploymentName := dia.config.currentInputs.GetName()

deployment, isUnstructured := event.Object.(*unstructured.Unstructured)
Expand All @@ -454,38 +382,14 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {

// Do nothing if this is not the Deployment we're waiting for.
if deployment.GetName() != inputDeploymentName {
logger.V(9).Infof("Watch event NOT actually for deployment: %s", dia.deployment.GetName())
return
}

logger.V(9).Infof("Watch event for deployment: %s", dia.deployment.GetName())
// Mark the rollout as incomplete if it's deleted.
if event.Type == watch.Deleted {
logger.V(9).Infof("Watch event for deployment: %s - DELETED", dia.deployment.GetName())
return
}

newGeneration := deployment.GetAnnotations()[revision]
if dia.deployment != nil {
if dia.deployment.GetResourceVersion() > deployment.GetResourceVersion() {
logger.V(3).Infof("Deployment %s received an older event. Known version: %q, received: %q",
dia.deployment.GetName(),
dia.deployment.GetResourceVersion(),
deployment.GetResourceVersion())
return
}

currentGeneration := dia.deployment.GetAnnotations()[revision]
if dia.deployment.GetResourceVersion() != deployment.GetResourceVersion() {
logger.V(9).Infof("Deployment %s updated. Known version: %q, received: %q. Existing generation: %q, New generation: %q",
dia.deployment.GetName(),
dia.deployment.GetResourceVersion(),
deployment.GetResourceVersion(),
currentGeneration,
newGeneration)
}
}

dia.deployment = deployment

// extensions/v1beta1 does not include the "Progressing" status for rollouts.
Expand All @@ -495,7 +399,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
extensionsV1Beta1API := dia.config.initialAPIVersion == extensionsv1b1ApiVersion

// Get generation of the Deployment's ReplicaSet.
dia.replicaSetGeneration = newGeneration
dia.replicaSetGeneration = deployment.GetAnnotations()[revision]
if dia.replicaSetGeneration == "" {
// No current generation, Deployment controller has not yet created a ReplicaSet. Do
// nothing.
Expand Down Expand Up @@ -592,19 +496,13 @@ func (dia *deploymentInitAwaiter) processReplicaSetEvent(event watch.Event) {

// Check whether this ReplicaSet was created by our Deployment.
if !isOwnedBy(rs, dia.config.currentInputs) {
logger.V(9).Infof("ReplicaSet event %q is NOT for %q", rs.GetName(), dia.config.currentInputs.GetName())
return
}

logger.V(3).Infof("ReplicaSet %q is owned by %q", rs.GetName(), dia.config.currentInputs.GetName())

generation := rs.GetAnnotations()[revision]
if generation == "" {
// This should be unlikely but this is not a RS we want to process anyway if no revision is specified.
return
}

// If Pod was deleted, remove it from our aggregated checkers.
generation := rs.GetAnnotations()[revision]
if event.Type == watch.Deleted {
delete(dia.replicaSets, generation)
return
Expand Down Expand Up @@ -786,20 +684,12 @@ func (dia *deploymentInitAwaiter) processPodEvent(event watch.Event) {
return
}

logger.V(9).Infof("Received POD watch event for %q", pod.GetName())

// Check whether this Pod was created by a replicaset associated with our Deployment.
var podName string
for _, replicaSet := range dia.replicaSets {
if isOwnedBy(pod, replicaSet) {
podName = pod.GetName()
break
}
}
if podName == "" {
logger.V(9).Infof("Pod event %q is NOT for %q", pod.GetName(), dia.config.currentInputs.GetName())
// Check whether this Pod was created by our Deployment.
currentReplicaSet := dia.replicaSets[dia.replicaSetGeneration]
if !isOwnedBy(pod, currentReplicaSet) {
return
}
podName := pod.GetName()

// If Pod was deleted, remove it from our aggregated checkers.
if event.Type == watch.Deleted {
Expand Down
7 changes: 1 addition & 6 deletions tests/sdk/go/kustomize/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,11 @@ import (
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
ns, err := corev1.NewNamespace(ctx, "test", &corev1.NamespaceArgs{})
if err != nil {
return err
}
provider, err := k8s.NewProvider(ctx, "k8s", &k8s.ProviderArgs{
Kubeconfig: pulumi.String("~/.kube/config"),
Namespace: ns.Metadata.Name(),
})
if err != nil {
return err
}

_, err = kustomize.NewDirectory(ctx, "helloWorld",
kustomize.DirectoryArgs{Directory: pulumi.String("helloWorld")},
pulumi.Provider(provider),
Expand Down
3 changes: 1 addition & 2 deletions tests/sdk/nodejs/istio/step1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,5 @@ export const frontendIp = pulumi
])
.apply(([status, spec]) => {
const port = spec.ports.filter(p => p.name == "http2")[0].port;
const ingress = status.loadBalancer.ingress[0];
return `${ingress.hostname ?? ingress.ip}:${port}/productpage`;
return `${status.loadBalancer.ingress[0].ip}:${port}/productpage`;
});
2 changes: 1 addition & 1 deletion tests/sdk/nodejs/nodejs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ func TestSecrets(t *testing.T) {
"message": secretMessage,
},
ExpectRefreshChanges: true,
Quick: true,
Quick: true,
ExtraRuntimeValidation: func(t *testing.T, stackInfo integration.RuntimeValidationStackInfo) {
assert.NotNil(t, stackInfo.Deployment)
state, err := json.Marshal(stackInfo.Deployment)
Expand Down

0 comments on commit 9e7f108

Please sign in to comment.