Skip to content

Commit

Permalink
make it possible to wait for a query lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund committed Oct 18, 2023
1 parent e6ee5a7 commit 4c76be4
Show file tree
Hide file tree
Showing 16 changed files with 336 additions and 73 deletions.
25 changes: 25 additions & 0 deletions errors/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package errors

import (
"errors"
"fmt"
)

var ErrBadWaitState = errors.New("encountered bad state while waiting for resource")

type BadWaitState struct {
State string
}

func (e BadWaitState) Error() string {
return fmt.Sprintf("%s: %s", ErrBadWaitState.Error(), e.State)
}
func (e BadWaitState) Unwrap() error {
return ErrBadWaitState
}

func NewBadWaitStateError(state string) BadWaitState {
return BadWaitState{
State: state,
}
}
10 changes: 7 additions & 3 deletions option/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"github.com/rockset/rockset-go-client/openapi"
)

type CollectionStatus string

func (c CollectionStatus) String() string { return string(c) }

const (
CollectionStatusCreated = "CREATED"
CollectionStatusInitialized = "INITIALIZED"
CollectionStatusReady = "READY"
CollectionStatusCreated CollectionStatus = "CREATED"
CollectionStatusInitialized CollectionStatus = "INITIALIZED"
CollectionStatusReady CollectionStatus = "READY"
)

type ListCollectionOptions struct {
Expand Down
2 changes: 2 additions & 0 deletions option/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "github.com/rockset/rockset-go-client/openapi"

type QueryState string

func (q QueryState) String() string { return string(q) }

const (
QueryQueued QueryState = "QUEUED"
QueryRunning QueryState = "RUNNING"
Expand Down
9 changes: 9 additions & 0 deletions option/query_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ package option

import "github.com/rockset/rockset-go-client/openapi"

type QueryLambdaState string

func (q QueryLambdaState) String() string { return string(q) }

const (
QueryLambdaActive QueryLambdaState = "ACTIVE"
QueryLambdaInvalid QueryLambdaState = "INVALID"
)

type ExecuteQueryLambdaRequest struct {
openapi.ExecuteQueryLambdaRequest
Tag string
Expand Down
42 changes: 25 additions & 17 deletions option/virtual_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@ package option

import "time"

type VirtualInstanceState string

func (v VirtualInstanceState) String() string { return string(v) }

type MountState string

func (m MountState) String() string { return string(m) }

const (
VirtualInstanceInitializing = "INITIALIZING"
VirtualInstanceProvisioningResources = "PROVISIONING_RESOURCES"
VirtualInstanceRebalancingCollections = "REBALANCING_COLLECTIONS"
VirtualInstanceActive = "ACTIVE"
VirtualInstanceSuspending = "SUSPENDING"
VirtualInstanceSuspended = "SUSPENDED"
VirtualInstanceResuming = "RESUMING"
VirtualInstanceDeleted = "DELETED"

MountCreating = "CREATING"
MountActive = "ACTIVE"
MountRefreshing = "REFRESHING"
MountExpired = "EXPIRED"
MountDeleting = "DELETING"
MountSwitchingRefreshType = "SWITCHING_REFRESH_TYPE"
MountSuspended = "SUSPENDED"
MountSuspending = "SUSPENDING"
VirtualInstanceInitializing VirtualInstanceState = "INITIALIZING"
VirtualInstanceProvisioningResources VirtualInstanceState = "PROVISIONING_RESOURCES"
VirtualInstanceRebalancingCollections VirtualInstanceState = "REBALANCING_COLLECTIONS"
VirtualInstanceActive VirtualInstanceState = "ACTIVE"
VirtualInstanceSuspending VirtualInstanceState = "SUSPENDING"
VirtualInstanceSuspended VirtualInstanceState = "SUSPENDED"
VirtualInstanceResuming VirtualInstanceState = "RESUMING"
VirtualInstanceDeleted VirtualInstanceState = "DELETED"

MountCreating MountState = "CREATING"
MountActive MountState = "ACTIVE"
MountRefreshing MountState = "REFRESHING"
MountExpired MountState = "EXPIRED"
MountDeleting MountState = "DELETING"
MountSwitchingRefreshType MountState = "SWITCHING_REFRESH_TYPE"
MountSuspended MountState = "SUSPENDED"
MountSuspending MountState = "SUSPENDING"
)

// VirtualInstanceOptions contains the optional settings for a virtual instance.
Expand Down
6 changes: 3 additions & 3 deletions wait/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

// UntilCollectionReady waits until the collection is ready.
func (w *Waiter) UntilCollectionReady(ctx context.Context, workspace, name string) error {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []string{option.CollectionStatusReady},
func(ctx context.Context) (string, error) {
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.CollectionStatus{option.CollectionStatusReady}, nil,
func(ctx context.Context) (option.CollectionStatus, error) {
c, err := w.rc.GetCollection(ctx, workspace, name)
return c.GetStatus(), err
return option.CollectionStatus(c.GetStatus()), err
}))
}

Expand Down
8 changes: 4 additions & 4 deletions wait/collections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ func TestWait_untilCollectionReady(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusInitialized)}, nil)
rs.GetCollectionReturnsOnCall(1, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusCreated)}, nil)
rs.GetCollectionReturnsOnCall(2, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady)}, nil)
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusInitialized.String())}, nil)
rs.GetCollectionReturnsOnCall(1, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusCreated.String())}, nil)
rs.GetCollectionReturnsOnCall(2, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady.String())}, nil)

err := wait.New(&rs).UntilCollectionReady(ctx, "workspace", "collection")
assert.NoError(t, err)
Expand All @@ -28,7 +28,7 @@ func TestWait_untilCollectionGone(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady)}, nil)
rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady.String())}, nil)
rs.GetCollectionReturnsOnCall(1, openapi.Collection{}, NotFoundErr)

err := wait.New(&rs).UntilCollectionGone(ctx, "workspace", "collection")
Expand Down
85 changes: 85 additions & 0 deletions wait/fake/fake_resource_getter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions wait/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"github.com/rockset/rockset-go-client/option"
)

// UntilQueryDone waits until queryID has either completed, errored, or been cancelled.
// UntilQueryDone waits until queryID has either completed.
// Returns ErrBadWaitState if the query failed or was cancelled.
func (w *Waiter) UntilQueryDone(ctx context.Context, queryID string) error {
// TODO should this only wait for COMPLETED and return an error for ERROR and CANCELLED?
return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.QueryState{option.QueryCompleted, option.QueryError, option.QueryCancelled},
func(ctx context.Context) (option.QueryState, error) {
q, err := w.rc.GetQueryInfo(ctx, queryID)
return option.QueryState(q.GetStatus()), err
}))
return w.rc.RetryWithCheck(ctx,
ResourceHasState(ctx, []option.QueryState{option.QueryCompleted},
[]option.QueryState{option.QueryError, option.QueryCancelled},
func(ctx context.Context) (option.QueryState, error) {
q, err := w.rc.GetQueryInfo(ctx, queryID)
return option.QueryState(q.GetStatus()), err
}))
}
25 changes: 25 additions & 0 deletions wait/query_lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package wait

import (
"context"

"github.com/rockset/rockset-go-client/option"
)

// UntilQueryLambdaVersionGone waits until a query lambda is deleted, i.e. GetQueryLambda() returns "not found".
func (w *Waiter) UntilQueryLambdaVersionGone(ctx context.Context, workspace, name, version string) error {
return w.rc.RetryWithCheck(ctx, ResourceIsGone(ctx, func(ctx context.Context) error {
_, err := w.rc.GetQueryLambdaVersion(ctx, workspace, name, version)
return err
}))
}

// UntilQueryLambdaVersionActive waits until the Virtual Instance is active.
func (w *Waiter) UntilQueryLambdaVersionActive(ctx context.Context, workspace, name, version string) error {
return w.rc.RetryWithCheck(ctx,
ResourceHasState(ctx, []option.QueryLambdaState{option.QueryLambdaActive}, []option.QueryLambdaState{option.QueryLambdaInvalid},
func(ctx context.Context) (option.QueryLambdaState, error) {
ql, err := w.rc.GetQueryLambdaVersion(ctx, workspace, name, version)
return option.QueryLambdaState(ql.GetState()), err
}))
}
36 changes: 36 additions & 0 deletions wait/query_lambda_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package wait_test

import (
"context"
rockerr "github.com/rockset/rockset-go-client/errors"
"testing"

"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"
"github.com/rockset/rockset-go-client/wait"
"github.com/stretchr/testify/assert"
)

func TestWait_untilQueryLambdaActive(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil)
rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaActive.String())}, nil)

err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v")
assert.NoError(t, err)
assert.Equal(t, 2, rs.GetQueryLambdaVersionCallCount())
}

func TestWait_untilQueryLambdaActive_invalid(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil)
rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaInvalid.String())}, nil)

err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v")
assert.ErrorIs(t, err, rockerr.ErrBadWaitState)
assert.Equal(t, 2, rs.GetQueryLambdaVersionCallCount())
}
24 changes: 24 additions & 0 deletions wait/query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package wait_test

import (
"context"
"github.com/rockset/rockset-go-client/option"
"testing"

"github.com/stretchr/testify/assert"

"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/wait"
)

func TestWait_untilQueryDone(t *testing.T) {
ctx := context.TODO()

rs := fakeRocksetClient()
rs.GetQueryInfoReturnsOnCall(0, openapi.QueryInfo{Status: openapi.PtrString(option.QueryRunning.String())}, nil)
rs.GetQueryInfoReturnsOnCall(1, openapi.QueryInfo{Status: openapi.PtrString(option.QueryCompleted.String())}, nil)

err := wait.New(&rs).UntilQueryDone(ctx, "id")
assert.NoError(t, err)
assert.Equal(t, 2, rs.GetQueryInfoCallCount())
}
Loading

0 comments on commit 4c76be4

Please sign in to comment.