Skip to content

Commit

Permalink
make it possible to wait for a query lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund committed Oct 18, 2023
1 parent e6ee5a7 commit 743cd9d
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 27 deletions.
5 changes: 5 additions & 0 deletions option/query_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package option

import "github.com/rockset/rockset-go-client/openapi"

const (
QueryLambdaActive = "ACTIVE"
QueryLambdaInvalid = "INVALID"
)

type ExecuteQueryLambdaRequest struct {
openapi.ExecuteQueryLambdaRequest
Tag string
Expand Down
2 changes: 1 addition & 1 deletion wait/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// UntilCollectionReady waits until the collection is ready.
func (w *Waiter) UntilCollectionReady(ctx context.Context, workspace, name string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.CollectionStatusReady},
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.CollectionStatusReady}, nil,
func(ctx context.Context) (string, error) {
c, err := w.rc.GetCollection(ctx, workspace, name)
return c.GetStatus(), err
Expand Down
85 changes: 85 additions & 0 deletions wait/fake/fake_resource_getter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions wait/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"github.com/rockset/rockset-go-client/option"
)

// UntilQueryDone waits until queryID has either completed, errored, or been cancelled.
// UntilQueryDone waits until queryID has either completed.
// Returns ErrBadWaitState if the query failed or was cancelled.
func (w *Waiter) UntilQueryDone(ctx context.Context, queryID string) error {
// TODO should this only wait for COMPLETED and return an error for ERROR and CANCELLED?
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.QueryState{option.QueryCompleted, option.QueryError, option.QueryCancelled},
func(ctx context.Context) (option.QueryState, error) {
q, err := w.rc.GetQueryInfo(ctx, queryID)
return option.QueryState(q.GetStatus()), err
}))
return w.rc.RetryWithCheck(ctx,
ResourceHasState(ctx, []option.QueryState{option.QueryCompleted},
[]option.QueryState{option.QueryError, option.QueryCancelled},
func(ctx context.Context) (option.QueryState, error) {
q, err := w.rc.GetQueryInfo(ctx, queryID)
return option.QueryState(q.GetStatus()), err
}))
}
25 changes: 25 additions & 0 deletions wait/query_lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package wait

import (
"context"

"github.com/rockset/rockset-go-client/option"
)

// UntilQueryLambdaVersionGone waits until a query lambda is deleted, i.e. GetQueryLambda() returns "not found".
func (w *Waiter) UntilQueryLambdaVersionGone(ctx context.Context, workspace, name, version string) error {
return w.rc.RetryWithCheck(ctx, ResourceIsGone(ctx, func(ctx context.Context) error {
_, err := w.rc.GetQueryLambdaVersion(ctx, workspace, name, version)
return err
}))
}

// UntilQueryLambdaVersionActive waits until the Virtual Instance is active.
func (w *Waiter) UntilQueryLambdaVersionActive(ctx context.Context, workspace, name, version string) error {
return w.rc.RetryWithCheck(ctx,
ResourceHasState(ctx, []string{option.QueryLambdaActive}, []string{option.QueryLambdaInvalid},
func(ctx context.Context) (string, error) {
ql, err := w.rc.GetQueryLambdaVersion(ctx, workspace, name, version)
return ql.GetState(), err
}))
}
35 changes: 35 additions & 0 deletions wait/query_lambda_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package wait_test

import (
"context"
"testing"

"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"
"github.com/rockset/rockset-go-client/wait"
"github.com/stretchr/testify/assert"
)

func TestWait_untilQueryLambdaActive(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil)
rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaActive)}, nil)

err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v")
assert.NoError(t, err)
assert.Equal(t, 2, rs.GetQueryLambdaVersionCallCount())
}

func TestWait_untilQueryLambdaActive_invalid(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil)
rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaInvalid)}, nil)

err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v")
assert.ErrorIs(t, err, wait.ErrBadWaitState)
assert.Equal(t, 2, rs.GetQueryLambdaVersionCallCount())
}
6 changes: 3 additions & 3 deletions wait/virtual_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// UntilVirtualInstanceActive waits until the Virtual Instance is active.
func (w *Waiter) UntilVirtualInstanceActive(ctx context.Context, id string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.VirtualInstanceActive},
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.VirtualInstanceActive}, nil,
func(ctx context.Context) (string, error) {
vi, err := w.rc.GetVirtualInstance(ctx, id)
return vi.GetState(), err
Expand All @@ -29,7 +29,7 @@ func (w *Waiter) UntilVirtualInstanceGone(ctx context.Context, id string) error

// UntilVirtualInstanceSuspended waits until the Virtual Instance is suspended.
func (w *Waiter) UntilVirtualInstanceSuspended(ctx context.Context, id string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.VirtualInstanceSuspended},
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.VirtualInstanceSuspended}, nil,
func(ctx context.Context) (string, error) {
vi, err := w.rc.GetVirtualInstance(ctx, id)
return vi.GetState(), err
Expand All @@ -39,7 +39,7 @@ func (w *Waiter) UntilVirtualInstanceSuspended(ctx context.Context, id string) e
// UntilMountActive waits until the collection mount is active, and queries can be issued to it on the
// virtual instance.
func (w *Waiter) UntilMountActive(ctx context.Context, vID, workspace, collection string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.MountActive},
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.MountActive}, nil,
func(ctx context.Context) (string, error) {
cm, err := w.rc.GetCollectionMount(ctx, vID, workspace+"."+collection)
return cm.GetState(), err
Expand Down
17 changes: 14 additions & 3 deletions wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wait
import (
"context"
"errors"
"fmt"

"github.com/rs/zerolog"

Expand All @@ -25,6 +26,7 @@ type ResourceGetter interface {
GetCollectionMount(ctx context.Context, id, collectionPath string) (openapi.CollectionMount, error)
GetIntegration(ctx context.Context, name string) (openapi.Integration, error)
GetQueryInfo(ctx context.Context, queryID string) (openapi.QueryInfo, error)
GetQueryLambdaVersion(ctx context.Context, workspace, name, version string) (openapi.QueryLambdaVersion, error)
GetView(ctx context.Context, workspace, name string) (openapi.View, error)
GetVirtualInstance(ctx context.Context, id string) (openapi.VirtualInstance, error)
GetWorkspace(ctx context.Context, name string) (openapi.Workspace, error)
Expand All @@ -34,8 +36,12 @@ func New(rs ResourceGetter) *Waiter {
return &Waiter{rs}
}

// ResourceHasState implements RetryFn to wait until the resource has the desired state
func ResourceHasState[T comparable](ctx context.Context, states []T,
var ErrBadWaitState = errors.New("encountered bad state while waiting")

// ResourceHasState implements RetryFn to wait until the resource has the desired state, and if a bad state is
// encountered it will return ErrBadWaitState
func ResourceHasState[T comparable](ctx context.Context, validStates, badStates []T,
// TODO should T be Stringer instead? Then all
fn func(ctx context.Context) (T, error)) retry.CheckFn {
return func() (bool, error) {
zl := zerolog.Ctx(ctx)
Expand All @@ -44,11 +50,16 @@ func ResourceHasState[T comparable](ctx context.Context, states []T,
return false, err
}

for _, s := range states {
for _, s := range validStates {
if state == s {
return false, nil
}
}
for _, s := range badStates {
if state == s {
return false, fmt.Errorf("%w: %v", ErrBadWaitState, state)
}
}

zl.Trace().Any("current", state).Msg("waiting for resource state")

Expand Down
56 changes: 43 additions & 13 deletions wait/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ func (s *WaitTestSuite) TestResourceIsGone() {
}
})

retry, err := rc()
assert.True(s.T(), retry)
r, err := rc()
assert.True(s.T(), r)
assert.NoError(s.T(), err)

retry, err = rc()
assert.False(s.T(), retry)
r, err = rc()
assert.False(s.T(), r)
assert.NoError(s.T(), err)

retry, err = rc()
assert.False(s.T(), retry)
r, err = rc()
assert.False(s.T(), r)
assert.Error(s.T(), err)
}

func (s *WaitTestSuite) TestResourceHasState() {
ctx := context.TODO()
var counter int

rc := wait.ResourceHasState(ctx, []string{"foo", "bar"}, func(ctx context.Context) (string, error) {
rc := wait.ResourceHasState(ctx, []string{"foo", "bar"}, nil, func(ctx context.Context) (string, error) {
defer func() { counter++ }()

switch counter {
Expand All @@ -126,15 +126,45 @@ func (s *WaitTestSuite) TestResourceHasState() {
}
})

retry, err := rc()
assert.True(s.T(), retry)
r, err := rc()
assert.True(s.T(), r)
assert.NoError(s.T(), err)

retry, err = rc()
assert.False(s.T(), retry)
r, err = rc()
assert.False(s.T(), r)
assert.NoError(s.T(), err)

retry, err = rc()
assert.False(s.T(), retry)
r, err = rc()
assert.False(s.T(), r)
assert.Error(s.T(), err)
}

func (s *WaitTestSuite) TestResourceHasState_badState() {
ctx := context.TODO()
var counter int

rc := wait.ResourceHasState(ctx, []string{"foo", "bar"}, []string{"err"}, func(ctx context.Context) (string, error) {
defer func() { counter++ }()

switch counter {
case 0:
return "baz", nil
case 1:
return "bar", nil
default:
return "err", nil
}
})

r, err := rc()
assert.True(s.T(), r)
assert.NoError(s.T(), err)

r, err = rc()
assert.False(s.T(), r)
assert.NoError(s.T(), err)

r, err = rc()
assert.False(s.T(), r)
assert.ErrorIs(s.T(), err, wait.ErrBadWaitState)
}

0 comments on commit 743cd9d

Please sign in to comment.