Skip to content

Commit

Permalink
Deduplicate provider logs
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed May 9, 2019
1 parent f56730d commit b755a3c
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
### Improvements

- Update to use client-go v11.0.0 (https://github.com/pulumi/pulumi-kubernetes/pull/549)
- Deduplicate provider logs (https://github.com/pulumi/pulumi-kubernetes/pull/558)

### Bug fixes

Expand Down
15 changes: 9 additions & 6 deletions pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ func (dia *deploymentInitAwaiter) Await() error {
}
defer pvcWatcher.Stop()

period := time.NewTicker(10 * time.Second)
defer period.Stop()
aggregateErrorTicker := time.NewTicker(10 * time.Second)
defer aggregateErrorTicker.Stop()

timeout := time.Duration(metadata.TimeoutSeconds(dia.config.currentInputs, 5*60)) * time.Second
return dia.await(deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), period.C)
return dia.await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), aggregateErrorTicker.C)
}

func (dia *deploymentInitAwaiter) Read() error {
Expand Down Expand Up @@ -275,7 +276,8 @@ func (dia *deploymentInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (dia *deploymentInitAwaiter) await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface, timeout, period <-chan time.Time,
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface,
timeout, aggregateErrorTicker <-chan time.Time,
) error {
dia.config.logStatus(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")

Expand All @@ -296,7 +298,7 @@ func (dia *deploymentInitAwaiter) await(
object: dia.deployment,
subErrors: dia.errorMessages(),
}
case <-period:
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := dia.aggregatePodErrors()
for _, message := range scheduleErrors {
dia.config.logStatus(diag.Warning, message)
Expand Down Expand Up @@ -586,7 +588,8 @@ func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() {
// defined, or when all PVCs have a status of 'Bound'
if phase != statusBound {
allPVCsReady = false
message := fmt.Sprintf("PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
message := fmt.Sprintf(
"PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
dia.config.logStatus(diag.Warning, message)
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/await/apps_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,9 @@ func Test_Apps_Deployment(t *testing.T) {
period := make(chan time.Time)
go test.do(deployments, replicaSets, pods, timeout)

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{}, timeout, period)
err := awaiter.await(
&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets}, &chanWatcher{results: pods},
&chanWatcher{}, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/await/apps_statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ func (sia *statefulsetInitAwaiter) Await() error {
}
defer podWatcher.Stop()

period := time.NewTicker(10 * time.Second)
defer period.Stop()
aggregateErrorTicker := time.NewTicker(10 * time.Second)
defer aggregateErrorTicker.Stop()

timeout := time.Duration(metadata.TimeoutSeconds(sia.config.currentInputs, 5*60)) * time.Second
return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), period.C)
return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), aggregateErrorTicker.C)
}

func (sia *statefulsetInitAwaiter) Read() error {
Expand Down Expand Up @@ -219,7 +219,8 @@ func (sia *statefulsetInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (sia *statefulsetInitAwaiter) await(
statefulsetWatcher, podWatcher watch.Interface, timeout, period <-chan time.Time,
statefulsetWatcher, podWatcher watch.Interface,
timeout, aggregateErrorTicker <-chan time.Time,
) error {
for {
if sia.checkAndLogStatus() {
Expand All @@ -238,7 +239,7 @@ func (sia *statefulsetInitAwaiter) await(
object: sia.statefulset,
subErrors: sia.errorMessages(),
}
case <-period:
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := sia.aggregatePodErrors()
for _, message := range scheduleErrors {
sia.config.logStatus(diag.Warning, message)
Expand Down Expand Up @@ -275,7 +276,8 @@ func (sia *statefulsetInitAwaiter) checkAndLogStatus() bool {
sia.config.logStatus(diag.Info, fmt.Sprintf("[1/3] Waiting for StatefulSet update to roll out (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
case !sia.revisionReady:
sia.config.logStatus(diag.Info, "[2/3] Waiting for StatefulSet to update .status.currentRevision")
sia.config.logStatus(diag.Info,
"[2/3] Waiting for StatefulSet to update .status.currentRevision")
}
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/await/apps_statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func Test_Apps_StatefulSet(t *testing.T) {
period := make(chan time.Time)
go test.do(statefulsets, pods, timeout)

err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err := awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -262,7 +263,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) {
awaiter.config.lastInputs = obj
})

err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err := awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
assert.Equal(t, test.firstExpectedError, err, test.description)

statefulsets = make(chan watch.Event)
Expand All @@ -272,7 +274,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) {
period = make(chan time.Time)
go test.secondUpdate(statefulsets, pods, timeout)

err = awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err = awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
assert.Equal(t, test.secondExpectedError, err, test.description)
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/golang/glog"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi-kubernetes/pkg/retry"
Expand Down Expand Up @@ -52,7 +53,8 @@ type ProviderConfig struct {
Host *pulumiprovider.HostClient
URN resource.URN

ClientSet *clients.DynamicClientSet
ClientSet *clients.DynamicClientSet
DedupLogger *logging.DedupLogger
}

type CreateConfig struct {
Expand Down Expand Up @@ -146,6 +148,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.DedupLogger,
}
waitErr := awaiter.awaitCreation(conf)
if waitErr != nil {
Expand Down Expand Up @@ -191,6 +194,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.DedupLogger,
}
waitErr := awaiter.awaitRead(conf)
if waitErr != nil {
Expand Down Expand Up @@ -307,6 +311,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: currentOutputs,
logger: c.DedupLogger,
},
lastInputs: c.Previous,
lastOutputs: liveOldObj,
Expand Down
6 changes: 3 additions & 3 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/glog"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi-kubernetes/pkg/watcher"
"github.com/pulumi/pulumi/pkg/diag"
Expand All @@ -44,15 +45,14 @@ type createAwaitConfig struct {
host *provider.HostClient
ctx context.Context
urn resource.URN
logger *logging.DedupLogger
clientSet *clients.DynamicClientSet
currentInputs *unstructured.Unstructured
currentOutputs *unstructured.Unstructured
}

func (cac *createAwaitConfig) logStatus(sev diag.Severity, message string) {
if cac.host != nil {
_ = cac.host.LogStatus(cac.ctx, sev, cac.urn, message)
}
cac.logger.LogMessage(sev, message)
}

// updateAwaitConfig specifies on which conditions we are to consider a resource "fully updated",
Expand Down
6 changes: 4 additions & 2 deletions pkg/await/core_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ func (sia *serviceInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (sia *serviceInitAwaiter) await(
serviceWatcher, endpointWatcher watch.Interface, timeout <-chan time.Time,
settled chan struct{}, version serverVersion,
serviceWatcher, endpointWatcher watch.Interface,
timeout <-chan time.Time,
settled chan struct{},
version serverVersion,
) error {
sia.config.logStatus(diag.Info, "[1/3] Finding Pods to direct traffic to")

Expand Down
3 changes: 2 additions & 1 deletion pkg/await/core_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ func Test_Core_Service(t *testing.T) {
timeout := make(chan time.Time)
go test.do(services, endpoints, settled, timeout)

err := awaiter.await(&chanWatcher{results: services}, &chanWatcher{results: endpoints},
err := awaiter.await(
&chanWatcher{results: services}, &chanWatcher{results: endpoints},
timeout, settled, test.version)
assert.Equal(t, test.expectedError, err, test.description)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/await/extensions_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,11 @@ func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoint
}

// await is a helper companion to `Await` designed to make it easy to test this module.
func (iia *ingressInitAwaiter) await(ingressWatcher, serviceWatcher, endpointWatcher watch.Interface,
settled chan struct{}, timeout <-chan time.Time) error {
func (iia *ingressInitAwaiter) await(
ingressWatcher, serviceWatcher, endpointWatcher watch.Interface,
settled chan struct{},
timeout <-chan time.Time,
) error {
iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path")

for {
Expand Down
4 changes: 3 additions & 1 deletion pkg/await/extensions_ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ func Test_Extensions_Ingress(t *testing.T) {
timeout := make(chan time.Time)
go test.do(ingresses, services, endpoints, settled, timeout)

err := awaiter.await(&chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints}, settled, timeout)
err := awaiter.await(
&chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints},
settled, timeout)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/await/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

Expand All @@ -13,6 +14,7 @@ func mockAwaitConfig(inputs *unstructured.Unstructured) createAwaitConfig {
//TODO: complete this mock if needed
currentInputs: inputs,
currentOutputs: inputs,
logger: logging.NewLogger(context.Background(), nil, ""),
}
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/logging/dedup_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2016-2019, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logging

import (
"context"
"sync"

"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/resource/provider"
)

// DedupLogger wraps a time-ordered log set to allow batched logging of unique messages.
// Operations on DedupLogger are safe to use concurrently.
type DedupLogger struct {
messages *TimeOrderedLogSet
index int
ctx context.Context
host *provider.HostClient
urn resource.URN
mux sync.Mutex
}

// NewLogger returns an initialized DedupLogger.
func NewLogger(ctx context.Context, host *provider.HostClient, urn resource.URN) *DedupLogger {
return &DedupLogger{
messages: &TimeOrderedLogSet{},
ctx: ctx,
host: host,
urn: urn,
}
}

// LogMessage adds a message to the log set and flushes the queue to the host.
func (l *DedupLogger) LogMessage(severity diag.Severity, s string) {
l.EnqueueMessage(severity, s)
l.LogNewMessages()
}

// EnqueueMessage adds a message to the log set but does not log it to the host.
func (l *DedupLogger) EnqueueMessage(severity diag.Severity, s string) {
l.mux.Lock()
defer l.mux.Unlock()

l.messages.Add(Message{s, severity})
}

// GetNewMessages returns the list of new messages since last calling GetNewMessages.
func (l *DedupLogger) GetNewMessages() []Message {
l.mux.Lock()
defer l.mux.Unlock()

idx := l.index
l.index = len(l.messages.Messages)
return l.messages.Messages[idx:]
}

// LogNewMessages logs any new messages to the host.
func (l *DedupLogger) LogNewMessages() {
if l.host != nil {
for _, msg := range l.GetNewMessages() {
_ = l.host.LogStatus(l.ctx, msg.severity, l.urn, msg.s)
}
}
}
Loading

0 comments on commit b755a3c

Please sign in to comment.