Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed May 9, 2019
1 parent 268b523 commit f97986d
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 93 deletions.
18 changes: 9 additions & 9 deletions pkg/await/apps_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (dia *deploymentInitAwaiter) await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface,
timeout, aggregateErrorTicker <-chan time.Time,
) error {
dia.config.logger.LogMessage(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")
dia.config.logStatus(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")

for {
if dia.checkAndLogStatus() {
Expand All @@ -301,11 +301,11 @@ func (dia *deploymentInitAwaiter) await(
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := dia.aggregatePodErrors()
for _, message := range scheduleErrors {
dia.config.logger.LogMessage(diag.Warning, message)
dia.config.logStatus(diag.Warning, message)
}

for _, message := range containerErrors {
dia.config.logger.LogMessage(diag.Warning, message)
dia.config.logStatus(diag.Warning, message)
}
case event := <-deploymentWatcher.ResultChan():
dia.processDeploymentEvent(event)
Expand Down Expand Up @@ -344,7 +344,7 @@ func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
return false
}

dia.config.logger.LogMessage(diag.Info, "✅ Deployment initialization complete")
dia.config.logStatus(diag.Info, "✅ Deployment initialization complete")
return true
}
} else {
Expand All @@ -353,7 +353,7 @@ func (dia *deploymentInitAwaiter) checkAndLogStatus() bool {
return false
}

dia.config.logger.LogMessage(diag.Info, "✅ Deployment initialization complete")
dia.config.logStatus(diag.Info, "✅ Deployment initialization complete")
return true
}
}
Expand Down Expand Up @@ -432,7 +432,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
}
message = fmt.Sprintf("[%s] %s", reason, message)
dia.deploymentErrors[reason] = message
dia.config.logger.LogMessage(diag.Warning, message)
dia.config.logStatus(diag.Warning, message)
}

dia.replicaSetAvailable = condition["reason"] == "NewReplicaSetAvailable" && isProgressing
Expand All @@ -453,7 +453,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
}
message = fmt.Sprintf("[%s] %s", reason, message)
dia.deploymentErrors[reason] = message
dia.config.logger.LogMessage(diag.Warning, message)
dia.config.logStatus(diag.Warning, message)
}
}
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func (dia *deploymentInitAwaiter) checkReplicaSetStatus() {
}

if !dia.updatedReplicaSetReady {
dia.config.logger.LogMessage(diag.Info,
dia.config.logStatus(diag.Info,
fmt.Sprintf("[1/2] Waiting for app ReplicaSet be marked available (%d/%d Pods available)",
readyReplicas, int64(specReplicas)))
}
Expand Down Expand Up @@ -589,7 +589,7 @@ func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() {
allPVCsReady = false
message := fmt.Sprintf(
"PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
dia.config.logger.LogMessage(diag.Warning, message)
dia.config.logStatus(diag.Warning, message)
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/await/apps_statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ func (sia *statefulsetInitAwaiter) await(
case <-aggregateErrorTicker:
scheduleErrors, containerErrors := sia.aggregatePodErrors()
for _, message := range scheduleErrors {
sia.config.logger.LogMessage(diag.Warning, message)
sia.config.logStatus(diag.Warning, message)
}

for _, message := range containerErrors {
sia.config.logger.LogMessage(diag.Warning, message)
sia.config.logStatus(diag.Warning, message)
}
case event := <-statefulsetWatcher.ResultChan():
sia.processStatefulSetEvent(event)
Expand All @@ -260,25 +260,25 @@ func (sia *statefulsetInitAwaiter) await(
// the provider.
func (sia *statefulsetInitAwaiter) checkAndLogStatus() bool {
if sia.replicasReady && sia.revisionReady {
sia.config.logger.LogMessage(diag.Info, "✅ StatefulSet initialization complete")
sia.config.logStatus(diag.Info, "✅ StatefulSet initialization complete")
return true
}

isInitialDeployment := sia.currentGeneration <= 1

// For initial generation, the revision doesn't need to be updated, so skip that step in the log.
if isInitialDeployment {
sia.config.logger.LogMessage(diag.Info,
sia.config.logStatus(diag.Info,
fmt.Sprintf("[1/2] Waiting for StatefulSet to create Pods (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
} else {
switch {
case !sia.replicasReady:
sia.config.logger.LogMessage(diag.Info,
sia.config.logStatus(diag.Info,
fmt.Sprintf("[1/3] Waiting for StatefulSet update to roll out (%d/%d Pods ready)",
sia.currentReplicas, sia.targetReplicas))
case !sia.revisionReady:
sia.config.logger.LogMessage(diag.Info,
sia.config.logStatus(diag.Info,
"[2/3] Waiting for StatefulSet to update .status.currentRevision")
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type ProviderConfig struct {
URN resource.URN

ClientSet *clients.DynamicClientSet
Logger *logging.Logger
Logger *logging.DedupLogger
}

type CreateConfig struct {
Expand Down
6 changes: 2 additions & 4 deletions pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,14 @@ type createAwaitConfig struct {
host *provider.HostClient
ctx context.Context
urn resource.URN
logger *logging.Logger
logger *logging.DedupLogger
clientSet *clients.DynamicClientSet
currentInputs *unstructured.Unstructured
currentOutputs *unstructured.Unstructured
}

func (cac *createAwaitConfig) logStatus(sev diag.Severity, message string) {
if cac.host != nil {
_ = cac.host.LogStatus(cac.ctx, sev, cac.urn, message)
}
cac.logger.LogMessage(sev, message)
}

// updateAwaitConfig specifies on which conditions we are to consider a resource "fully updated",
Expand Down
14 changes: 7 additions & 7 deletions pkg/await/core_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,28 +282,28 @@ func (pc *podChecker) errorMessages() []string {

func (pia *podInitAwaiter) logErrors() {
for reason, message := range pia.podScheduledErrors {
pia.config.logger.LogMessage(diag.Warning, podSchedulerError(reason, message))
pia.config.logStatus(diag.Warning, podSchedulerError(reason, message))
}

for reason, message := range pia.podInitErrors {
// Ignore non-useful status messages.
if reason == "ContainersNotInitialized" {
continue
}
pia.config.logger.LogMessage(diag.Warning, podUninitializedError(reason, message))
pia.config.logStatus(diag.Warning, podUninitializedError(reason, message))
}

for reason, message := range pia.podReadyErrors {
// Ignore non-useful status messages.
if reason == "ContainersNotReady" {
continue
}
pia.config.logger.LogMessage(diag.Warning, podNotReadyError(reason, message))
pia.config.logStatus(diag.Warning, podNotReadyError(reason, message))
}

for reason, errs := range pia.containerErrors {
for _, message := range errs {
pia.config.logger.LogMessage(diag.Warning, containerError(reason, message))
pia.config.logStatus(diag.Warning, containerError(reason, message))
}
}
}
Expand Down Expand Up @@ -401,7 +401,7 @@ func (pia *podInitAwaiter) read(pod *unstructured.Unstructured) error {

// await is a helper companion to `Await` designed to make it easy to test this module.
func (pia *podInitAwaiter) await(podWatcher watch.Interface, timeout <-chan time.Time) error {
pia.config.logger.LogMessage(diag.Info, "[1/2] Pulling container images")
pia.config.logStatus(diag.Info, "[1/2] Pulling container images")

for {
if pia.checkAndLogStatus() {
Expand Down Expand Up @@ -457,9 +457,9 @@ func (pia *podInitAwaiter) processPodEvent(event watch.Event) {

func (pia *podInitAwaiter) checkAndLogStatus() bool {
if pia.podReady {
pia.config.logger.LogMessage(diag.Info, "✅ Pod reported ready status")
pia.config.logStatus(diag.Info, "✅ Pod reported ready status")
} else if pia.podSuccess {
pia.config.logger.LogMessage(diag.Info, "✅ Pod completed successfully")
pia.config.logStatus(diag.Info, "✅ Pod completed successfully")
}

return pia.podReady || pia.podSuccess
Expand Down
6 changes: 3 additions & 3 deletions pkg/await/core_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (sia *serviceInitAwaiter) await(
settled chan struct{},
version serverVersion,
) error {
sia.config.logger.LogMessage(diag.Info, "[1/3] Finding Pods to direct traffic to")
sia.config.logStatus(diag.Info, "[1/3] Finding Pods to direct traffic to")

for {
// Check whether we've succeeded.
Expand Down Expand Up @@ -427,9 +427,9 @@ func (sia *serviceInitAwaiter) checkAndLogStatus(version serverVersion) bool {

success := sia.serviceReady && sia.endpointsSettled && sia.endpointsReady
if success {
sia.config.logger.LogMessage(diag.Info, "✅ Service initialization complete")
sia.config.logStatus(diag.Info, "✅ Service initialization complete")
} else if sia.endpointsSettled && sia.endpointsReady {
sia.config.logger.LogMessage(diag.Info, "[2/3] Attempting to allocate IP address to Service")
sia.config.logStatus(diag.Info, "[2/3] Attempting to allocate IP address to Service")
}

return success
Expand Down
8 changes: 4 additions & 4 deletions pkg/await/extensions_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (iia *ingressInitAwaiter) await(
settled chan struct{},
timeout <-chan time.Time,
) error {
iia.config.logger.LogMessage(diag.Info, "[1/3] Finding a matching service for each Ingress path")
iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path")

for {
// Check whether we've succeeded.
Expand Down Expand Up @@ -322,7 +322,7 @@ func (iia *ingressInitAwaiter) checkIfEndpointsReady() bool {
}

if !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logger.LogMessage(diag.Warning,
iia.config.logStatus(diag.Warning,
fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))

Expand Down Expand Up @@ -405,9 +405,9 @@ func (iia *ingressInitAwaiter) errorMessages() []string {
func (iia *ingressInitAwaiter) checkAndLogStatus() bool {
success := iia.ingressReady && iia.checkIfEndpointsReady()
if success {
iia.config.logger.LogMessage(diag.Info, "✅ Ingress initialization complete")
iia.config.logStatus(diag.Info, "✅ Ingress initialization complete")
} else if iia.checkIfEndpointsReady() {
iia.config.logger.LogMessage(diag.Info, "[2/3] Waiting for update of .status.loadBalancer with hostname/IP")
iia.config.logStatus(diag.Info, "[2/3] Waiting for update of .status.loadBalancer with hostname/IP")
}

return success
Expand Down
31 changes: 18 additions & 13 deletions pkg/logging/logger.go → pkg/logging/dedup_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@ package logging

import (
"context"
"sync"

"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/resource/provider"
)

// Logger wraps a time-ordered log set to allow batched logging of unique messages.
type Logger struct {
// DedupLogger wraps a time-ordered log set to allow batched logging of unique messages.
// Operations on DedupLogger are safe to use concurrently.
type DedupLogger struct {
messages *TimeOrderedLogSet
index int
ctx context.Context
host *provider.HostClient
urn resource.URN
mux sync.Mutex
}

// NewLogger returns an initialized Logger.
func NewLogger(ctx context.Context, host *provider.HostClient, urn resource.URN) *Logger {
return &Logger{
// NewLogger returns an initialized DedupLogger.
func NewLogger(ctx context.Context, host *provider.HostClient, urn resource.URN) *DedupLogger {
return &DedupLogger{
messages: &TimeOrderedLogSet{},
ctx: ctx,
host: host,
Expand All @@ -42,29 +45,31 @@ func NewLogger(ctx context.Context, host *provider.HostClient, urn resource.URN)
}

// LogMessage adds a message to the log set and flushes the queue to the host.
func (l *Logger) LogMessage(severity diag.Severity, s string) {
func (l *DedupLogger) LogMessage(severity diag.Severity, s string) {
l.EnqueueMessage(severity, s)
l.LogNewMessages()
}

// EnqueueMessage adds a message to the log set but does not log it to the host.
func (l *Logger) EnqueueMessage(severity diag.Severity, s string) {
func (l *DedupLogger) EnqueueMessage(severity diag.Severity, s string) {
l.mux.Lock()
defer l.mux.Unlock()

l.messages.Add(Message{s, severity})
}

// GetNewMessages returns the list of new messages since last calling GetNewMessages.
// Note: this method is not thread-safe, but is currently only called from a single process
// because a new Logger is spawned for each resource.
func (l *Logger) GetNewMessages() []Message {
func (l *DedupLogger) GetNewMessages() []Message {
l.mux.Lock()
defer l.mux.Unlock()

idx := l.index
l.index = len(l.messages.Messages)
return l.messages.Messages[idx:]
}

// LogNewMessages logs any new messages to the host.
// Note: this method is not thread-safe, but is currently only called from a single process
// because a new Logger is spawned for each resource.
func (l *Logger) LogNewMessages() {
func (l *DedupLogger) LogNewMessages() {
if l.host != nil {
for _, msg := range l.GetNewMessages() {
_ = l.host.LogStatus(l.ctx, msg.severity, l.urn, msg.s)
Expand Down
6 changes: 0 additions & 6 deletions pkg/logging/time_ordered_log_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,3 @@ func (o *TimeOrderedLogSet) Add(msg Message) {
o.exists[msg] = true
}
}

// Reset clears the time-ordered set.
func (o *TimeOrderedLogSet) Reset() {
o.exists = make(map[Message]bool)
o.Messages = []Message{}
}
40 changes: 0 additions & 40 deletions pkg/logging/time_ordered_log_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,43 +81,3 @@ func TestOrderedStringSet_Add(t *testing.T) {
})
}
}

func TestOrderedStringSet_Reset(t *testing.T) {
status1 := Message{"foo", diag.Info}

type fields struct {
exists map[Message]bool
Messages []Message
}
tests := []struct {
name string
fields fields
expect []Message
}{
{
"reset populated list",
fields{map[Message]bool{status1: true}, []Message{status1}},
[]Message{},
},
{
"reset empty list",
fields{map[Message]bool{}, []Message{}},
[]Message{},
},
{
"reset uninitialized list",
fields{},
[]Message{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
o := &TimeOrderedLogSet{
exists: tt.fields.exists,
Messages: tt.fields.Messages,
}
o.Reset()
assert.ObjectsAreEqual(o.Messages, tt.expect)
})
}
}

0 comments on commit f97986d

Please sign in to comment.