Skip to content

Commit

Permalink
Merge pull request #166 from linuxboot/bugfix/propagate_context_values
Browse files Browse the repository at this point in the history
fix(api): Propagate context values
  • Loading branch information
xaionaro committed Jun 21, 2023
2 parents eca9e1e + fb97c4d commit fa98f00
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 4 deletions.
9 changes: 5 additions & 4 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"

"github.com/facebookincubator/go-belt/beltctx"
"github.com/linuxboot/contest/pkg/signaling"
"github.com/linuxboot/contest/pkg/signals"
"github.com/linuxboot/contest/pkg/storage"
"github.com/linuxboot/contest/pkg/storage/limits"
"github.com/linuxboot/contest/pkg/types"
Expand Down Expand Up @@ -170,10 +172,9 @@ func (a *API) Start(ctx context.Context, requestor EventRequestor, jobDescriptor
// signals to the job's context. Therefore we use a fresh context
// (without any cancels and signalings) and just passthrough its
// observability belt.
//
// It also loose context values, but there are no any values
// we care about here.
ctx = beltctx.WithField(beltctx.WithBelt(context.Background(), beltctx.Belt(ctx)), "api_method", "start")
ctx = newValuesProxyContext(ctx) // ignore the cancel and deadline signals
ctx, _ = signaling.WithSignal(ctx, signals.Paused) // ignore the pause signal
ctx = beltctx.WithField(ctx, "api_method", "start")

ev := &Event{
Context: ctx,
Expand Down
80 changes: 80 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ package api

import (
"context"
"fmt"
"runtime"
"testing"
"time"

"github.com/linuxboot/contest/pkg/job"
"github.com/linuxboot/contest/pkg/signaling"
"github.com/linuxboot/contest/pkg/signals"

"github.com/facebookincubator/go-belt/tool/logger"

Expand All @@ -38,6 +41,83 @@ func (d dummyEventMsg) Requestor() EventRequestor {
return "unit-test"
}

func TestStartContext(t *testing.T) {
t.Run("NoCancel", func(t *testing.T) {
apiInstance, err := New(OptionServerID("unit-test"))
require.NoError(t, err)

ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()

go func() {
ev := <-apiInstance.Events
var err error
select {
case <-ev.Context.Done():
err = fmt.Errorf("cancel signal was propagated")
default:
}
ev.RespCh <- &EventResponse{
Err: err,
}
}()

r, err := apiInstance.Start(ctx, "unit-test-requestor", "unit-test-job")
require.NoError(t, err)
require.NoError(t, r.Err)
})

t.Run("NoPause", func(t *testing.T) {
apiInstance, err := New(OptionServerID("unit-test"))
require.NoError(t, err)

ctx, pauseFunc := signaling.WithSignal(context.Background(), signals.Paused)
pauseFunc()

go func() {
ev := <-apiInstance.Events
var err error
select {
case <-ev.Context.Done():
err = fmt.Errorf("pause signal was propagated")
default:
}
ev.RespCh <- &EventResponse{
Err: err,
}
}()

r, err := apiInstance.Start(ctx, "unit-test-requestor", "unit-test-job")
require.NoError(t, err)
require.NoError(t, r.Err)
})

t.Run("HaveValues", func(t *testing.T) {
apiInstance, err := New(OptionServerID("unit-test"))
require.NoError(t, err)

type privateStringType string
var ctxKey = privateStringType("unit-test-key")

ctx := context.WithValue(context.Background(), ctxKey, "unit-test-value")

go func() {
ev := <-apiInstance.Events
var err error
if ctx.Value(ctxKey) != ev.Context.Value(ctxKey) {
err = fmt.Errorf("context value was not propagated correctly: <%v> != <%v>", ev.Context.Value(ctxKey), ctx.Value(ctxKey))
}
ev.RespCh <- &EventResponse{
Err: err,
}
}()

r, err := apiInstance.Start(ctx, "unit-test-requestor", "unit-test-job")
require.NoError(t, err)
require.NoError(t, r.Err)
})
}

func TestEventTimeout(t *testing.T) {
t.Run("timeout", func(t *testing.T) {
apiInstance, err := New(OptionServerID("unit-test"), OptionEventTimeout(time.Nanosecond))
Expand Down
46 changes: 46 additions & 0 deletions pkg/api/values_proxy_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.

package api

import (
"context"
"time"
)

type valuesProxyContext struct {
valueser context.Context
}

var _ context.Context = (*valuesProxyContext)(nil)

// Deadline implements interface context.Context.
func (ctx *valuesProxyContext) Deadline() (time.Time, bool) {
return time.Time{}, false
}

// Done implements interface context.Context.
func (ctx *valuesProxyContext) Done() <-chan struct{} {
return nil
}

// Err implements interface context.Context.
func (ctx *valuesProxyContext) Err() error {
return nil
}

// Value implements interface context.Context.
func (ctx *valuesProxyContext) Value(key any) any {
return ctx.valueser.Value(key)
}

// newValuesProxyContext returns a context without cancellation/deadline signals,
// but with all the values kept as is.
func newValuesProxyContext(ctx context.Context) context.Context {
ctx = &valuesProxyContext{
valueser: ctx,
}
return ctx
}

0 comments on commit fa98f00

Please sign in to comment.