Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate provider logs #558

Merged
merged 1 commit into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
### Improvements

- Update to use client-go v11.0.0 (https://github.com/pulumi/pulumi-kubernetes/pull/549)
- Deduplicate provider logs (https://github.com/pulumi/pulumi-kubernetes/pull/558)

### Bug fixes

Expand Down
15 changes: 9 additions & 6 deletions pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ func (dia *deploymentInitAwaiter) Await() error {
}
defer pvcWatcher.Stop()

period := time.NewTicker(10 * time.Second)
defer period.Stop()
aggregateErrorTicker := time.NewTicker(10 * time.Second)
defer aggregateErrorTicker.Stop()

timeout := time.Duration(metadata.TimeoutSeconds(dia.config.currentInputs, 5*60)) * time.Second
return dia.await(deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), period.C)
return dia.await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), aggregateErrorTicker.C)
}

func (dia *deploymentInitAwaiter) Read() error {
Expand Down Expand Up @@ -275,7 +276,8 @@ func (dia *deploymentInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (dia *deploymentInitAwaiter) await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface, timeout, period <-chan time.Time,
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface,
timeout, aggregateErrorTicker <-chan time.Time,
) error {
dia.config.logStatus(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")

Expand All @@ -296,7 +298,7 @@ func (dia *deploymentInitAwaiter) await(
object: dia.deployment,
subErrors: dia.errorMessages(),
}
case <-period:
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := dia.aggregatePodErrors()
for _, message := range scheduleErrors {
dia.config.logStatus(diag.Warning, message)
Expand Down Expand Up @@ -586,7 +588,8 @@ func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() {
// defined, or when all PVCs have a status of 'Bound'
if phase != statusBound {
allPVCsReady = false
message := fmt.Sprintf("PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
message := fmt.Sprintf(
"PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
dia.config.logStatus(diag.Warning, message)
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/await/apps_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,9 @@ func Test_Apps_Deployment(t *testing.T) {
period := make(chan time.Time)
go test.do(deployments, replicaSets, pods, timeout)

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{}, timeout, period)
err := awaiter.await(
&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets}, &chanWatcher{results: pods},
&chanWatcher{}, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/await/apps_statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ func (sia *statefulsetInitAwaiter) Await() error {
}
defer podWatcher.Stop()

period := time.NewTicker(10 * time.Second)
defer period.Stop()
aggregateErrorTicker := time.NewTicker(10 * time.Second)
defer aggregateErrorTicker.Stop()

timeout := time.Duration(metadata.TimeoutSeconds(sia.config.currentInputs, 5*60)) * time.Second
return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), period.C)
return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), aggregateErrorTicker.C)
}

func (sia *statefulsetInitAwaiter) Read() error {
Expand Down Expand Up @@ -219,7 +219,8 @@ func (sia *statefulsetInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (sia *statefulsetInitAwaiter) await(
statefulsetWatcher, podWatcher watch.Interface, timeout, period <-chan time.Time,
statefulsetWatcher, podWatcher watch.Interface,
timeout, aggregateErrorTicker <-chan time.Time,
) error {
for {
if sia.checkAndLogStatus() {
Expand All @@ -238,7 +239,7 @@ func (sia *statefulsetInitAwaiter) await(
object: sia.statefulset,
subErrors: sia.errorMessages(),
}
case <-period:
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := sia.aggregatePodErrors()
for _, message := range scheduleErrors {
sia.config.logStatus(diag.Warning, message)
Expand Down Expand Up @@ -275,7 +276,8 @@ func (sia *statefulsetInitAwaiter) checkAndLogStatus() bool {
sia.config.logStatus(diag.Info, fmt.Sprintf("[1/3] Waiting for StatefulSet update to roll out (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
case !sia.revisionReady:
sia.config.logStatus(diag.Info, "[2/3] Waiting for StatefulSet to update .status.currentRevision")
sia.config.logStatus(diag.Info,
"[2/3] Waiting for StatefulSet to update .status.currentRevision")
}
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/await/apps_statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func Test_Apps_StatefulSet(t *testing.T) {
period := make(chan time.Time)
go test.do(statefulsets, pods, timeout)

err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err := awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -262,7 +263,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) {
awaiter.config.lastInputs = obj
})

err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err := awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
assert.Equal(t, test.firstExpectedError, err, test.description)

statefulsets = make(chan watch.Event)
Expand All @@ -272,7 +274,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) {
period = make(chan time.Time)
go test.secondUpdate(statefulsets, pods, timeout)

err = awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
err = awaiter.await(
&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period)
assert.Equal(t, test.secondExpectedError, err, test.description)
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/golang/glog"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi-kubernetes/pkg/retry"
Expand Down Expand Up @@ -52,7 +53,8 @@ type ProviderConfig struct {
Host *pulumiprovider.HostClient
URN resource.URN

ClientSet *clients.DynamicClientSet
ClientSet *clients.DynamicClientSet
DedupLogger *logging.DedupLogger
}

type CreateConfig struct {
Expand Down Expand Up @@ -146,6 +148,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.DedupLogger,
}
waitErr := awaiter.awaitCreation(conf)
if waitErr != nil {
Expand Down Expand Up @@ -191,6 +194,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.DedupLogger,
}
waitErr := awaiter.awaitRead(conf)
if waitErr != nil {
Expand Down Expand Up @@ -307,6 +311,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: currentOutputs,
logger: c.DedupLogger,
},
lastInputs: c.Previous,
lastOutputs: liveOldObj,
Expand Down
6 changes: 3 additions & 3 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/glog"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
"github.com/pulumi/pulumi-kubernetes/pkg/watcher"
"github.com/pulumi/pulumi/pkg/diag"
Expand All @@ -44,15 +45,14 @@ type createAwaitConfig struct {
host *provider.HostClient
ctx context.Context
urn resource.URN
logger *logging.DedupLogger
clientSet *clients.DynamicClientSet
currentInputs *unstructured.Unstructured
currentOutputs *unstructured.Unstructured
}

func (cac *createAwaitConfig) logStatus(sev diag.Severity, message string) {
if cac.host != nil {
_ = cac.host.LogStatus(cac.ctx, sev, cac.urn, message)
}
cac.logger.LogMessage(sev, message)
}

// updateAwaitConfig specifies on which conditions we are to consider a resource "fully updated",
Expand Down
6 changes: 4 additions & 2 deletions pkg/await/core_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ func (sia *serviceInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (sia *serviceInitAwaiter) await(
serviceWatcher, endpointWatcher watch.Interface, timeout <-chan time.Time,
settled chan struct{}, version serverVersion,
serviceWatcher, endpointWatcher watch.Interface,
timeout <-chan time.Time,
settled chan struct{},
version serverVersion,
) error {
sia.config.logStatus(diag.Info, "[1/3] Finding Pods to direct traffic to")

Expand Down
3 changes: 2 additions & 1 deletion pkg/await/core_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ func Test_Core_Service(t *testing.T) {
timeout := make(chan time.Time)
go test.do(services, endpoints, settled, timeout)

err := awaiter.await(&chanWatcher{results: services}, &chanWatcher{results: endpoints},
err := awaiter.await(
&chanWatcher{results: services}, &chanWatcher{results: endpoints},
timeout, settled, test.version)
assert.Equal(t, test.expectedError, err, test.description)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/await/extensions_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,11 @@ func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoint
}

// await is a helper companion to `Await` designed to make it easy to test this module.
func (iia *ingressInitAwaiter) await(ingressWatcher, serviceWatcher, endpointWatcher watch.Interface,
settled chan struct{}, timeout <-chan time.Time) error {
func (iia *ingressInitAwaiter) await(
ingressWatcher, serviceWatcher, endpointWatcher watch.Interface,
settled chan struct{},
timeout <-chan time.Time,
) error {
iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path")

for {
Expand Down
4 changes: 3 additions & 1 deletion pkg/await/extensions_ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ func Test_Extensions_Ingress(t *testing.T) {
timeout := make(chan time.Time)
go test.do(ingresses, services, endpoints, settled, timeout)

err := awaiter.await(&chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints}, settled, timeout)
err := awaiter.await(
&chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints},
settled, timeout)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/await/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

Expand All @@ -13,6 +14,7 @@ func mockAwaitConfig(inputs *unstructured.Unstructured) createAwaitConfig {
//TODO: complete this mock if needed
currentInputs: inputs,
currentOutputs: inputs,
logger: logging.NewLogger(context.Background(), nil, ""),
}
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/logging/dedup_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2016-2019, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logging

import (
"context"
"sync"

"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/resource/provider"
)

// DedupLogger wraps a time-ordered log set to allow batched logging of unique messages.
// Operations on DedupLogger are safe to use concurrently.
type DedupLogger struct {
messages *TimeOrderedLogSet
index int
ctx context.Context
host *provider.HostClient
urn resource.URN
mux sync.Mutex
}

// NewLogger returns an initialized DedupLogger.
func NewLogger(ctx context.Context, host *provider.HostClient, urn resource.URN) *DedupLogger {
return &DedupLogger{
messages: &TimeOrderedLogSet{},
ctx: ctx,
host: host,
urn: urn,
}
}

// LogMessage adds a message to the log set and flushes the queue to the host.
func (l *DedupLogger) LogMessage(severity diag.Severity, s string) {
l.EnqueueMessage(severity, s)
l.LogNewMessages()
}

// EnqueueMessage adds a message to the log set but does not log it to the host.
func (l *DedupLogger) EnqueueMessage(severity diag.Severity, s string) {
l.mux.Lock()
defer l.mux.Unlock()

l.messages.Add(Message{s, severity})
}

// GetNewMessages returns the list of new messages since last calling GetNewMessages.
func (l *DedupLogger) GetNewMessages() []Message {
l.mux.Lock()
defer l.mux.Unlock()

idx := l.index
l.index = len(l.messages.Messages)
return l.messages.Messages[idx:]
}

// LogNewMessages logs any new messages to the host.
func (l *DedupLogger) LogNewMessages() {
if l.host != nil {
for _, msg := range l.GetNewMessages() {
_ = l.host.LogStatus(l.ctx, msg.severity, l.urn, msg.s)
}
}
}
Loading