Skip to content

Commit

Permalink
make it possible to wait for a query lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund committed Oct 18, 2023
1 parent e6ee5a7 commit 3c803fc
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 58 deletions.
25 changes: 25 additions & 0 deletions errors/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package errors

import (
"errors"
"fmt"
)

var ErrBadWaitState = errors.New("encountered bad state while waiting for resource")

type BadWaitState struct {
State string
}

func (e BadWaitState) Error() string {
return fmt.Sprintf("%s: %s", ErrBadWaitState.Error(), e.State)
}
func (e BadWaitState) Unwrap() error {
return ErrBadWaitState
}

func NewBadWaitStateError(state string) BadWaitState {
return BadWaitState{
State: state,
}
}
10 changes: 7 additions & 3 deletions option/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"github.com/rockset/rockset-go-client/openapi"
)

type CollectionStatus string

func (c CollectionStatus) String() string { return string(c) }

const (
CollectionStatusCreated = "CREATED"
CollectionStatusInitialized = "INITIALIZED"
CollectionStatusReady = "READY"
CollectionStatusCreated CollectionStatus = "CREATED"
CollectionStatusInitialized CollectionStatus = "INITIALIZED"
CollectionStatusReady CollectionStatus = "READY"
)

type ListCollectionOptions struct {
Expand Down
2 changes: 2 additions & 0 deletions option/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "github.com/rockset/rockset-go-client/openapi"

type QueryState string

func (q QueryState) String() string { return string(q) }

const (
QueryQueued QueryState = "QUEUED"
QueryRunning QueryState = "RUNNING"
Expand Down
9 changes: 9 additions & 0 deletions option/query_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ package option

import "github.com/rockset/rockset-go-client/openapi"

type QueryLambdaState string

func (q QueryLambdaState) String() string { return string(q) }

const (
QueryLambdaActive QueryLambdaState = "ACTIVE"
QueryLambdaInvalid QueryLambdaState = "INVALID"
)

type ExecuteQueryLambdaRequest struct {
openapi.ExecuteQueryLambdaRequest
Tag string
Expand Down
20 changes: 12 additions & 8 deletions option/virtual_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package option

import "time"

type VirtualInstanceState string

func (v VirtualInstanceState) String() string { return string(v) }

const (
VirtualInstanceInitializing = "INITIALIZING"
VirtualInstanceProvisioningResources = "PROVISIONING_RESOURCES"
VirtualInstanceRebalancingCollections = "REBALANCING_COLLECTIONS"
VirtualInstanceActive = "ACTIVE"
VirtualInstanceSuspending = "SUSPENDING"
VirtualInstanceSuspended = "SUSPENDED"
VirtualInstanceResuming = "RESUMING"
VirtualInstanceDeleted = "DELETED"
VirtualInstanceInitializing VirtualInstanceState = "INITIALIZING"
VirtualInstanceProvisioningResources VirtualInstanceState = "PROVISIONING_RESOURCES"
VirtualInstanceRebalancingCollections VirtualInstanceState = "REBALANCING_COLLECTIONS"
VirtualInstanceActive VirtualInstanceState = "ACTIVE"
VirtualInstanceSuspending VirtualInstanceState = "SUSPENDING"
VirtualInstanceSuspended VirtualInstanceState = "SUSPENDED"
VirtualInstanceResuming VirtualInstanceState = "RESUMING"
VirtualInstanceDeleted VirtualInstanceState = "DELETED"

MountCreating = "CREATING"
MountActive = "ACTIVE"
Expand Down
6 changes: 3 additions & 3 deletions wait/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

// UntilCollectionReady waits until the collection is ready.
func (w *Waiter) UntilCollectionReady(ctx context.Context, workspace, name string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.CollectionStatusReady},
func(ctx context.Context) (string, error) {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.CollectionStatus{option.CollectionStatusReady}, nil,
func(ctx context.Context) (option.CollectionStatus, error) {
c, err := w.rc.GetCollection(ctx, workspace, name)
return c.GetStatus(), err
return option.CollectionStatus(c.GetStatus()), err
}))
}

Expand Down
8 changes: 4 additions & 4 deletions wait/collections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ func TestWait_untilCollectionReady(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusInitialized)}, nil)
rs.GetCollectionReturnsOnCall(1, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusCreated)}, nil)
rs.GetCollectionReturnsOnCall(2, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady)}, nil)
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusInitialized.String())}, nil)
rs.GetCollectionReturnsOnCall(1, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusCreated.String())}, nil)
rs.GetCollectionReturnsOnCall(2, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady.String())}, nil)

err := wait.New(&rs).UntilCollectionReady(ctx, "workspace", "collection")
assert.NoError(t, err)
Expand All @@ -28,7 +28,7 @@ func TestWait_untilCollectionGone(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady)}, nil)
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady.String())}, nil)
rs.GetCollectionReturnsOnCall(1, openapi.Collection{}, NotFoundErr)

err := wait.New(&rs).UntilCollectionGone(ctx, "workspace", "collection")
Expand Down
85 changes: 85 additions & 0 deletions wait/fake/fake_resource_getter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions wait/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"github.com/rockset/rockset-go-client/option"
)

// UntilQueryDone waits until queryID has either completed, errored, or been cancelled.
// UntilQueryDone waits until queryID has either completed.
// Returns ErrBadWaitState if the query failed or was cancelled.
func (w *Waiter) UntilQueryDone(ctx context.Context, queryID string) error {
// TODO should this only wait for COMPLETED and return an error for ERROR and CANCELLED?
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.QueryState{option.QueryCompleted, option.QueryError, option.QueryCancelled},
func(ctx context.Context) (option.QueryState, error) {
q, err := w.rc.GetQueryInfo(ctx, queryID)
return option.QueryState(q.GetStatus()), err
}))
return w.rc.RetryWithCheck(ctx,
ResourceHasState(ctx, []option.QueryState{option.QueryCompleted},
[]option.QueryState{option.QueryError, option.QueryCancelled},
func(ctx context.Context) (option.QueryState, error) {
q, err := w.rc.GetQueryInfo(ctx, queryID)
return option.QueryState(q.GetStatus()), err
}))
}
25 changes: 25 additions & 0 deletions wait/query_lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package wait

import (
"context"

"github.com/rockset/rockset-go-client/option"
)

// UntilQueryLambdaVersionGone waits until a query lambda is deleted, i.e. GetQueryLambda() returns "not found".
func (w *Waiter) UntilQueryLambdaVersionGone(ctx context.Context, workspace, name, version string) error {
return w.rc.RetryWithCheck(ctx, ResourceIsGone(ctx, func(ctx context.Context) error {
_, err := w.rc.GetQueryLambdaVersion(ctx, workspace, name, version)
return err
}))
}

// UntilQueryLambdaVersionActive waits until the Virtual Instance is active.
func (w *Waiter) UntilQueryLambdaVersionActive(ctx context.Context, workspace, name, version string) error {
return w.rc.RetryWithCheck(ctx,
ResourceHasState(ctx, []option.QueryLambdaState{option.QueryLambdaActive}, []option.QueryLambdaState{option.QueryLambdaInvalid},
func(ctx context.Context) (option.QueryLambdaState, error) {
ql, err := w.rc.GetQueryLambdaVersion(ctx, workspace, name, version)
return option.QueryLambdaState(ql.GetState()), err
}))
}
36 changes: 36 additions & 0 deletions wait/query_lambda_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package wait_test

import (
"context"
rockerr "github.com/rockset/rockset-go-client/errors"
"testing"

"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"
"github.com/rockset/rockset-go-client/wait"
"github.com/stretchr/testify/assert"
)

func TestWait_untilQueryLambdaActive(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil)
rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaActive.String())}, nil)

err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v")
assert.NoError(t, err)
assert.Equal(t, 2, rs.GetQueryLambdaVersionCallCount())
}

func TestWait_untilQueryLambdaActive_invalid(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil)
rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaInvalid.String())}, nil)

err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v")
assert.ErrorIs(t, err, rockerr.ErrBadWaitState)
assert.Equal(t, 2, rs.GetQueryLambdaVersionCallCount())
}
18 changes: 9 additions & 9 deletions wait/virtual_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

// UntilVirtualInstanceActive waits until the Virtual Instance is active.
func (w *Waiter) UntilVirtualInstanceActive(ctx context.Context, id string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.VirtualInstanceActive},
func(ctx context.Context) (string, error) {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.VirtualInstanceState{option.VirtualInstanceActive}, nil,
func(ctx context.Context) (option.VirtualInstanceState, error) {
vi, err := w.rc.GetVirtualInstance(ctx, id)
return vi.GetState(), err
return option.VirtualInstanceState(vi.GetState()), err
}))
}

Expand All @@ -29,20 +29,20 @@ func (w *Waiter) UntilVirtualInstanceGone(ctx context.Context, id string) error

// UntilVirtualInstanceSuspended waits until the Virtual Instance is suspended.
func (w *Waiter) UntilVirtualInstanceSuspended(ctx context.Context, id string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.VirtualInstanceSuspended},
func(ctx context.Context) (string, error) {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.VirtualInstanceState{option.VirtualInstanceSuspended}, nil,
func(ctx context.Context) (option.VirtualInstanceState, error) {
vi, err := w.rc.GetVirtualInstance(ctx, id)
return vi.GetState(), err
return option.VirtualInstanceState(vi.GetState()), err
}))
}

// UntilMountActive waits until the collection mount is active, and queries can be issued to it on the
// virtual instance.
func (w *Waiter) UntilMountActive(ctx context.Context, vID, workspace, collection string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.MountActive},
func(ctx context.Context) (string, error) {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.VirtualInstanceState{option.MountActive}, nil,
func(ctx context.Context) (option.VirtualInstanceState, error) {
cm, err := w.rc.GetCollectionMount(ctx, vID, workspace+"."+collection)
return cm.GetState(), err
return option.VirtualInstanceState(cm.GetState()), err
}))
}

Expand Down
12 changes: 6 additions & 6 deletions wait/virtual_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func TestWait_untilVirtualInstanceActive(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceInitializing)}, nil)
rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive)}, nil)
rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceInitializing.String())}, nil)
rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive.String())}, nil)

err := wait.New(&rs).UntilVirtualInstanceActive(ctx, "id")
assert.NoError(t, err)
Expand All @@ -29,9 +29,9 @@ func TestWait_untilVirtualInstanceSuspended(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive)}, nil)
rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceSuspending)}, nil)
rs.GetVirtualInstanceReturnsOnCall(2, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceSuspended)}, nil)
rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive.String())}, nil)
rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceSuspending.String())}, nil)
rs.GetVirtualInstanceReturnsOnCall(2, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceSuspended.String())}, nil)

err := wait.New(&rs).UntilVirtualInstanceSuspended(ctx, "id")
assert.NoError(t, err)
Expand All @@ -42,7 +42,7 @@ func TestWait_untilVirtualInstanceGone(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive)}, nil)
rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive.String())}, nil)
rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{}, NotFoundErr)

err := wait.New(&rs).UntilVirtualInstanceGone(ctx, "id")
Expand Down
Loading

0 comments on commit 3c803fc

Please sign in to comment.