Skip to content

Commit

Permalink
Deduplicate and batch provider logs
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed May 8, 2019
1 parent f56730d commit 2aecd32
Show file tree
Hide file tree
Showing 18 changed files with 373 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
### Improvements

- Update to use client-go v11.0.0 (https://github.com/pulumi/pulumi-kubernetes/pull/549)
- Deduplicate and batch provider logs (https://github.com/pulumi/pulumi-kubernetes/pull/558)

### Bug fixes

Expand Down
42 changes: 24 additions & 18 deletions pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ func (dia *deploymentInitAwaiter) Await() error {
}
defer pvcWatcher.Stop()

period := time.NewTicker(10 * time.Second)
defer period.Stop()
aggregateErrorTicker := time.NewTicker(10 * time.Second)
defer aggregateErrorTicker.Stop()

logTicker := time.NewTicker(1 * time.Second)
defer logTicker.Stop()
defer dia.config.logger.LogNewMessages(dia.config.ctx, dia.config.host, dia.config.urn)

timeout := time.Duration(metadata.TimeoutSeconds(dia.config.currentInputs, 5*60)) * time.Second
return dia.await(deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), period.C)
return dia.await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), logTicker.C,
aggregateErrorTicker.C)
}

func (dia *deploymentInitAwaiter) Read() error {
Expand Down Expand Up @@ -274,10 +280,8 @@ func (dia *deploymentInitAwaiter) read(
}

// await is a helper companion to `Await` designed to make it easy to test this module.
func (dia *deploymentInitAwaiter) await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface, timeout, period <-chan time.Time,
) error {
dia.config.logStatus(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")
func (dia *deploymentInitAwaiter) await(deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface, timeout, logTicker, aggregateErrorTicker <-chan time.Time) error {
dia.config.logger.EnqueueMessage(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")

for {
if dia.checkAndLogStatus() {
Expand All @@ -296,14 +300,16 @@ func (dia *deploymentInitAwaiter) await(
object: dia.deployment,
subErrors: dia.errorMessages(),
}
case <-period:
case <-logTicker:
dia.config.logger.LogNewMessages(dia.config.ctx, dia.config.host, dia.config.urn)
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := dia.aggregatePodErrors()
for _, message := range scheduleErrors {
dia.config.logStatus(diag.Warning, message)
dia.config.logger.EnqueueMessage(diag.Warning, message)
}

for _, message := range containerErrors {
dia.config.logStatus(diag.Warning, message)
dia.config.logger.EnqueueMessage(diag.Warning, message)
}
case event := <-deploymentWatcher.ResultChan():
dia.processDeploymentEvent(event)
Expand Down Expand Up @@ -342,7 +348,7 @@ func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
return false
}

dia.config.logStatus(diag.Info, "✅ Deployment initialization complete")
dia.config.logger.EnqueueMessage(diag.Info, "✅ Deployment initialization complete")
return true
}
} else {
Expand All @@ -351,7 +357,7 @@ func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
return false
}

dia.config.logStatus(diag.Info, "✅ Deployment initialization complete")
dia.config.logger.EnqueueMessage(diag.Info, "✅ Deployment initialization complete")
return true
}
}
Expand Down Expand Up @@ -430,7 +436,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
}
message = fmt.Sprintf("[%s] %s", reason, message)
dia.deploymentErrors[reason] = message
dia.config.logStatus(diag.Warning, message)
dia.config.logger.EnqueueMessage(diag.Warning, message)
}

dia.replicaSetAvailable = condition["reason"] == "NewReplicaSetAvailable" && isProgressing
Expand All @@ -451,7 +457,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
}
message = fmt.Sprintf("[%s] %s", reason, message)
dia.deploymentErrors[reason] = message
dia.config.logStatus(diag.Warning, message)
dia.config.logger.EnqueueMessage(diag.Warning, message)
}
}
}
Expand Down Expand Up @@ -544,8 +550,7 @@ func (dia *deploymentInitAwaiter) checkReplicaSetStatus() {
}

if !dia.updatedReplicaSetReady {
dia.config.logStatus(
diag.Info,
dia.config.logger.EnqueueMessage(diag.Info,
fmt.Sprintf("[1/2] Waiting for app ReplicaSet be marked available (%d/%d Pods available)",
readyReplicas, int64(specReplicas)))
}
Expand Down Expand Up @@ -586,8 +591,9 @@ func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() {
// defined, or when all PVCs have a status of 'Bound'
if phase != statusBound {
allPVCsReady = false
message := fmt.Sprintf("PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
dia.config.logStatus(diag.Warning, message)
message := fmt.Sprintf(
"PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
dia.config.logger.EnqueueMessage(diag.Warning, message)
}
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/await/apps_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,9 @@ func Test_Apps_Deployment(t *testing.T) {
period := make(chan time.Time)
go test.do(deployments, replicaSets, pods, timeout)

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{}, timeout, period)
err := awaiter.await(
&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets}, &chanWatcher{results: pods},
&chanWatcher{}, timeout, nil, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -614,7 +615,7 @@ func Test_Apps_Deployment_With_PersistentVolumeClaims(t *testing.T) {
go test.do(deployments, replicaSets, pods, pvcs, timeout)

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{results: pvcs}, timeout, period)
&chanWatcher{results: pods}, &chanWatcher{results: pvcs}, timeout, nil, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -668,7 +669,7 @@ func Test_Apps_Deployment_Without_PersistentVolumeClaims(t *testing.T) {
go test.do(deployments, replicaSets, pods, pvcs, timeout)

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{results: pvcs}, timeout, period)
&chanWatcher{results: pods}, &chanWatcher{results: pvcs}, timeout, nil, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -726,7 +727,7 @@ func Test_Apps_Deployment_MultipleUpdates(t *testing.T) {
})

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{}, timeout, period)
&chanWatcher{results: pods}, &chanWatcher{}, timeout, nil, period)
assert.Nil(t, err, test.description)

deployments = make(chan watch.Event)
Expand All @@ -738,7 +739,7 @@ func Test_Apps_Deployment_MultipleUpdates(t *testing.T) {
go test.secondUpdate(deployments, replicaSets, pods, timeout)

err = awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{}, timeout, period)
&chanWatcher{results: pods}, &chanWatcher{}, timeout, nil, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down
36 changes: 23 additions & 13 deletions pkg/await/apps_statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,15 @@ func (sia *statefulsetInitAwaiter) Await() error {
}
defer podWatcher.Stop()

period := time.NewTicker(10 * time.Second)
defer period.Stop()
aggregateErrorTicker := time.NewTicker(10 * time.Second)
defer aggregateErrorTicker.Stop()

logTicker := time.NewTicker(1 * time.Second)
defer logTicker.Stop()
defer sia.config.logger.LogNewMessages(sia.config.ctx, sia.config.host, sia.config.urn)

timeout := time.Duration(metadata.TimeoutSeconds(sia.config.currentInputs, 5*60)) * time.Second
return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), period.C)
return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), logTicker.C, aggregateErrorTicker.C)
}

func (sia *statefulsetInitAwaiter) Read() error {
Expand Down Expand Up @@ -219,7 +223,8 @@ func (sia *statefulsetInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (sia *statefulsetInitAwaiter) await(
statefulsetWatcher, podWatcher watch.Interface, timeout, period <-chan time.Time,
statefulsetWatcher, podWatcher watch.Interface,
timeout, logTicker, aggregateErrorTicker <-chan time.Time,
) error {
for {
if sia.checkAndLogStatus() {
Expand All @@ -238,14 +243,16 @@ func (sia *statefulsetInitAwaiter) await(
object: sia.statefulset,
subErrors: sia.errorMessages(),
}
case <-period:
case <-logTicker:
sia.config.logger.LogNewMessages(sia.config.ctx, sia.config.host, sia.config.urn)
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := sia.aggregatePodErrors()
for _, message := range scheduleErrors {
sia.config.logStatus(diag.Warning, message)
sia.config.logger.EnqueueMessage(diag.Warning, message)
}

for _, message := range containerErrors {
sia.config.logStatus(diag.Warning, message)
sia.config.logger.EnqueueMessage(diag.Warning, message)
}
case event := <-statefulsetWatcher.ResultChan():
sia.processStatefulSetEvent(event)
Expand All @@ -259,23 +266,26 @@ func (sia *statefulsetInitAwaiter) await(
// the provider.
func (sia *statefulsetInitAwaiter) checkAndLogStatus() bool {
if sia.replicasReady && sia.revisionReady {
sia.config.logStatus(diag.Info, "✅ StatefulSet initialization complete")
sia.config.logger.EnqueueMessage(diag.Info, "✅ StatefulSet initialization complete")
return true
}

isInitialDeployment := sia.currentGeneration <= 1

// For initial generation, the revision doesn't need to be updated, so skip that step in the log.
if isInitialDeployment {
sia.config.logStatus(diag.Info, fmt.Sprintf("[1/2] Waiting for StatefulSet to create Pods (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
sia.config.logger.EnqueueMessage(diag.Info,
fmt.Sprintf("[1/2] Waiting for StatefulSet to create Pods (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
} else {
switch {
case !sia.replicasReady:
sia.config.logStatus(diag.Info, fmt.Sprintf("[1/3] Waiting for StatefulSet update to roll out (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
sia.config.logger.EnqueueMessage(diag.Info,
fmt.Sprintf("[1/3] Waiting for StatefulSet update to roll out (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
case !sia.revisionReady:
sia.config.logStatus(diag.Info, "[2/3] Waiting for StatefulSet to update .status.currentRevision")
sia.config.logger.EnqueueMessage(diag.Info,
"[2/3] Waiting for StatefulSet to update .status.currentRevision")
}
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/await/apps_statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func Test_Apps_StatefulSet(t *testing.T) {
period := make(chan time.Time)
go test.do(statefulsets, pods, timeout)

err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err := awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, nil, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -262,7 +263,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) {
awaiter.config.lastInputs = obj
})

err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err := awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, nil, period)
assert.Equal(t, test.firstExpectedError, err, test.description)

statefulsets = make(chan watch.Event)
Expand All @@ -272,7 +274,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) {
period = make(chan time.Time)
go test.secondUpdate(statefulsets, pods, timeout)

err = awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err = awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, nil, period)
assert.Equal(t, test.secondExpectedError, err, test.description)
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/golang/glog"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi-kubernetes/pkg/retry"
Expand Down Expand Up @@ -53,6 +54,7 @@ type ProviderConfig struct {
URN resource.URN

ClientSet *clients.DynamicClientSet
Logger *logging.Logger
}

type CreateConfig struct {
Expand Down Expand Up @@ -146,6 +148,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.Logger,
}
waitErr := awaiter.awaitCreation(conf)
if waitErr != nil {
Expand Down Expand Up @@ -191,6 +194,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.Logger,
}
waitErr := awaiter.awaitRead(conf)
if waitErr != nil {
Expand Down Expand Up @@ -307,6 +311,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: currentOutputs,
logger: c.Logger,
},
lastInputs: c.Previous,
lastOutputs: liveOldObj,
Expand Down
2 changes: 2 additions & 0 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/glog"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi-kubernetes/pkg/watcher"
"github.com/pulumi/pulumi/pkg/diag"
Expand All @@ -44,6 +45,7 @@ type createAwaitConfig struct {
host *provider.HostClient
ctx context.Context
urn resource.URN
logger *logging.Logger
clientSet *clients.DynamicClientSet
currentInputs *unstructured.Unstructured
currentOutputs *unstructured.Unstructured
Expand Down
Loading

0 comments on commit 2aecd32

Please sign in to comment.