Skip to content

Commit

Permalink
fix race condition due to pruning in results watcher
Browse files Browse the repository at this point in the history
Signed-off-by: Satyam Bhardwaj <sabhardw@redhat.com>

rh-pre-commit.version: 2.1.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
ramessesii2 committed Feb 6, 2024
1 parent e1cc982 commit 13a37ed
Showing 1 changed file with 116 additions and 17 deletions.
133 changes: 116 additions & 17 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dynamic

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -48,6 +49,8 @@ var (
clock = clockwork.NewRealClock()
)

const LogFinalizer = "results.tekton.dev/streaming-logs"

// Reconciler implements common reconciler behavior across different Tekton Run
// Object types.
type Reconciler struct {
Expand Down Expand Up @@ -81,8 +84,10 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient
resultsClient: results.NewClient(rc, lc),
objectClient: oc,
cfg: cfg,
// Always true predicate.
IsReadyForDeletionFunc: func(ctx context.Context, object results.Object) (bool, error) {
if LogsFinalizerExist(object, LogFinalizer) {
return false, nil
}
return true, nil
},
}
Expand Down Expand Up @@ -114,6 +119,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {

// Update logs if enabled.
if r.resultsClient.LogsClient != nil {
// Add finalizer for new object if the object has never been reconciled before
// if the object has the results log annotation then the log has been sent/streaming
annotations := o.GetAnnotations()
if _, exists := annotations[annotation.Log]; !exists {
err = r.AddFinalizer(ctx, o)
if err != nil {
return err
}
}
if err := r.sendLog(ctx, o); err != nil {
logger.Errorw("Error sending log",
zap.String("namespace", o.GetNamespace()),
Expand Down Expand Up @@ -342,11 +356,6 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
zap.Error(err),
)
}
logger.Debugw("Streaming log completed",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)
}()
}

Expand Down Expand Up @@ -389,18 +398,18 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
}

errChanRepeater := make(chan error, 100) //some stuff on the internet says buffered channels are better for GC
go func(echan <-chan error, o metav1.Object) {

// logctx is derived from ctx. Therefore, if ctx is cancelled (either explicitly through a call to its cancel
// function or when it reaches its deadline), logctx will be cancelled automatically.
logctx, _ := context.WithTimeout(ctx, 10*time.Minute)

go func(ctx context.Context, echan <-chan error, o metav1.Object) {
select {
case <-ctx.Done():
logger.Warnw("Context done streaming log",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
case <-time.After(10 * time.Minute): //TODO could make this time configurable, but let's see how useful it is first
logger.Warnw("10 minute timer expired streaming log",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
case writeErr := <-echan:
errChanRepeater <- writeErr
var gofuncerr error
Expand All @@ -420,7 +429,33 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
}(errChan, o)

err = r.RemoveFinalizer(ctx, o)
if err != nil {
logger.Errorw("Error removing finalizer",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
zap.Error(err),
)
return
}

logger.Debugw("Finalizer removed successfully",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)

if o.GetDeletionTimestamp() != nil {
err = controller.NewRequeueImmediately()
logger.Errorw("Error requing object for deletion",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
zap.Error(err),
)

}

}(logctx, errChan, o)

// errChanRepeater receives stderr from the TaskRun containers.
// This will be forwarded as combined output (stdout and stderr)
Expand All @@ -430,10 +465,74 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
Err: writer,
}, logChan, errChanRepeater)

logger.Debugw("Exiting streamLogs",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
return nil
}

func (r *Reconciler) AddFinalizer(ctx context.Context, o metav1.Object) error {
finalizers := o.GetFinalizers()
for _, f := range finalizers {
if f == LogFinalizer {
return nil
}
}
finalizers = append(finalizers, LogFinalizer)

patch, err := finalizerPatch(o, finalizers)
if err != nil {
return fmt.Errorf("error adding results log finalizer: %w", err)
}

if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("error patching object: %w", err)
}
return nil
}

func (r *Reconciler) RemoveFinalizer(ctx context.Context, o metav1.Object) error {
finalizers := o.GetFinalizers()
for i, f := range finalizers {
if f == LogFinalizer {
finalizers = append(finalizers[:i], finalizers[i+1:]...)
patch, err := finalizerPatch(o, finalizers)
if err != nil {
return fmt.Errorf("error removing results log finalizer: %w", err)
}

if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("error patching object: %w", err)
}
return nil
}
}
return nil

}

func LogsFinalizerExist(o metav1.Object, finalizer string) bool {
finalizers := o.GetFinalizers()
for _, f := range finalizers {
if f == finalizer {
return true
}
}
return false
}

type mergePatch struct {
Metadata metadata `json:"metadata"`
}

type metadata struct {
Finalizer []string `json:"finalizers"`
}

func finalizerPatch(object metav1.Object, finalizers []string) ([]byte, error) {
data := mergePatch{
Metadata: metadata{
Finalizer: []string{},
},
}
data.Metadata.Finalizer = finalizers

return json.Marshal(data)
}

0 comments on commit 13a37ed

Please sign in to comment.