From f95670b1f9b435e647bf5335d3d105f39437c766 Mon Sep 17 00:00:00 2001 From: Martin Englund Date: Wed, 25 Oct 2023 09:04:15 -0700 Subject: [PATCH] clean up lint and add it to the pre-commit hook (#167) --- .golangci.yaml | 7 +------ .pre-commit-config.yaml | 30 +++++++++++++++++++++++++++++ aliases_test.go | 2 +- collections.go | 6 ++++-- collections_kafka_test.go | 3 +++ errors/rockset.go | 3 ++- errors/rockset_test.go | 2 +- ha/ha.go | 3 ++- internal/test/string.go | 5 ++++- kafka_test.go | 28 +++++++++++++++++++-------- option/actions_test.go | 6 ++++-- organizations_test.go | 2 +- queries_test.go | 4 ++-- retry/exponential.go | 2 ++ retry/exponential_test.go | 1 - rockset.go | 14 +++++++------- rockset_test.go | 6 +++--- roles_test.go | 3 ++- virtual_instances.go | 3 ++- virtual_instances_test.go | 6 ------ wait/alias_test.go | 4 ++-- wait/collections_test.go | 14 +++++++++----- wait/query_lambda.go | 3 ++- wait/query_lambda_test.go | 13 +++++++------ wait/query_test.go | 2 +- wait/virtual_instance.go | 7 ++++--- wait/virtual_instance_test.go | 36 ++++++++++++++++++++--------------- wait/wait.go | 3 +-- wait/wait_test.go | 11 +++++++++-- writer/writer.go | 7 ++++--- 30 files changed, 151 insertions(+), 85 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.golangci.yaml b/.golangci.yaml index 06dab93b..97d0a503 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -2,8 +2,7 @@ run: concurrency: 4 timeout: 1m skip-dirs: - - client - - models + - openapi linters-settings: gci: @@ -16,8 +15,6 @@ linters-settings: linters: enable: - - deadcode - - depguard - dogsled - errcheck - exportloopref @@ -42,11 +39,9 @@ linters: - nolintlint - rowserrcheck - staticcheck - - structcheck - stylecheck - typecheck - unconvert - unparam - unused - - varcheck - whitespace \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..13083f74 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,30 @@ +repos: +- repo: local + hooks: + - id: gofmt + name: "go fmt" + entry: gofmt + language: system + 'types_or': [go] + args: ["-w", "-s"] + require_serial: false + additional_dependencies: [] + minimum_pre_commit_version: 2.9.2 + - id: govet + name: "go vet" + entry: go + language: system + 'types_or': [go] + args: ["vet", "./..."] + require_serial: false + pass_filenames: false + additional_dependencies: [] + minimum_pre_commit_version: 2.9.2 + - id: golangci-lint + name: golangci-lint + description: Fast linters runner for Go. Note that only modified files are linted, so linters like 'unused' that need to scan all files won't work as expected. + entry: golangci-lint run --new-from-rev HEAD --fix + types: [go] + language: golang + require_serial: true + pass_filenames: false diff --git a/aliases_test.go b/aliases_test.go index 6e9809a1..9921bb63 100644 --- a/aliases_test.go +++ b/aliases_test.go @@ -1,7 +1,6 @@ package rockset_test import ( - "github.com/rockset/rockset-go-client/internal/test" "testing" "github.com/stretchr/testify/assert" @@ -9,6 +8,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/rockset/rockset-go-client" + "github.com/rockset/rockset-go-client/internal/test" "github.com/rockset/rockset-go-client/option" ) diff --git a/collections.go b/collections.go index d1385725..4c2f824e 100644 --- a/collections.go +++ b/collections.go @@ -85,7 +85,8 @@ func (rc *RockClient) DeleteCollection(ctx context.Context, workspace, name stri // UpdateCollection updates a collection. Only the option.WithCollectionDescription and // option.WithIngestTransformation can be used, and any other option will return an error. -func (rc *RockClient) UpdateCollection(ctx context.Context, workspace, name string, options ...option.CollectionOption) (openapi.Collection, error) { +func (rc *RockClient) UpdateCollection(ctx context.Context, workspace, name string, + options ...option.CollectionOption) (openapi.Collection, error) { var err error var httpResp *http.Response var resp *openapi.GetCollectionResponse @@ -167,7 +168,8 @@ func (rc *RockClient) UpdateCollection(ctx context.Context, workspace, name stri // ), // option.WithS3Prefix("cities.csv"), // ), -// option.WithKafkaSource("kafka-integration-name", "topic", option.KafkaStartingOffsetEarliest, option.WithJSONFormat(), +// option.WithKafkaSource("kafka-integration-name", "topic", +// option.KafkaStartingOffsetEarliest, option.WithJSONFormat(), // option.WithKafkaSourceV3(), // ), // option.WithCollectionRetention(time.Hour), diff --git a/collections_kafka_test.go b/collections_kafka_test.go index 39b40522..13043f9a 100644 --- a/collections_kafka_test.go +++ b/collections_kafka_test.go @@ -59,11 +59,13 @@ func (s *KafkaIntegrationSuite) TestKafka() { testKafka(ctx, s.T(), s.rc, s.kc) } +//nolint:funlen func (s *KafkaIntegrationSuite) SetupSuite() { var err error ctx := test.Context() _, err = s.rc.CreateWorkspace(ctx, s.kc.workspace) + s.Require().NoError(err) s.dockerPool, err = dockertest.NewPool("") s.Require().NoError(err) @@ -86,6 +88,7 @@ func (s *KafkaIntegrationSuite) SetupSuite() { s.Require().NoError(err, "could not connect zookeeper") defer conn.Close() + //nolint:exhaustive retryFn := func() error { switch conn.State() { case zk.StateHasSession, zk.StateConnected: diff --git a/errors/rockset.go b/errors/rockset.go index f76e6f43..9d2c1fe1 100644 --- a/errors/rockset.go +++ b/errors/rockset.go @@ -32,7 +32,8 @@ func New(err error) Error { } // NewWithStatusCode wraps err in an Error that provides better error messages than the openapi.GenericOpenAPIError, -// and can be retried if the HTTP response StatusCode is in RetryableErrors. If err is nil, NewWithStatusCode() returns nil. +// and can be retried if the HTTP response StatusCode is in RetryableErrors. If err is nil, +// NewWithStatusCode() returns nil. func NewWithStatusCode(err error, response *http.Response) error { if err == nil { return nil diff --git a/errors/rockset_test.go b/errors/rockset_test.go index 1043e0aa..d0faeefe 100644 --- a/errors/rockset_test.go +++ b/errors/rockset_test.go @@ -2,7 +2,6 @@ package errors_test import ( "errors" - "github.com/rockset/rockset-go-client/openapi" "net/http" "testing" @@ -11,6 +10,7 @@ import ( rockerr "github.com/rockset/rockset-go-client/errors" "github.com/rockset/rockset-go-client/internal/test" + "github.com/rockset/rockset-go-client/openapi" ) func TestError_IsNotFoundError(t *testing.T) { diff --git a/ha/ha.go b/ha/ha.go index 3902a8d4..f3344258 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -28,7 +28,8 @@ func New(client ...Querier) *Client { } // Query executes a SQL query in parallel, and returns the first response. -func (ha *Client) Query(ctx context.Context, query string, options ...option.QueryOption) (openapi.QueryResponse, []error) { +func (ha *Client) Query(ctx context.Context, query string, + options ...option.QueryOption) (openapi.QueryResponse, []error) { log := zerolog.Ctx(ctx) var wg sync.WaitGroup diff --git a/internal/test/string.go b/internal/test/string.go index deeaa46c..de067991 100644 --- a/internal/test/string.go +++ b/internal/test/string.go @@ -16,6 +16,7 @@ const PersistentAlias = "getalias" const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +//nolint:gosec var seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) // StringWithCharset creates a random string with length and charset @@ -32,6 +33,8 @@ func RandomString(length int) string { return stringWithCharset(length, charset) } +const randomNameLength = 6 + func RandomName(prefix string) string { num, found := os.LookupEnv(buildNum) if !found { @@ -42,7 +45,7 @@ func RandomName(prefix string) string { } } - return fmt.Sprintf("go_%s_%s_%s", num, prefix, RandomString(6)) + return fmt.Sprintf("go_%s_%s_%s", num, prefix, RandomString(randomNameLength)) } func Description() string { diff --git a/kafka_test.go b/kafka_test.go index a96ce951..74405c82 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -7,15 +7,17 @@ import ( "context" "encoding/json" "fmt" - "github.com/rockset/rockset-go-client" - "github.com/rockset/rockset-go-client/internal/test" - "github.com/rockset/rockset-go-client/option" - "github.com/stretchr/testify/require" "io" "log" "net/http" "os" "testing" + + "github.com/stretchr/testify/require" + + "github.com/rockset/rockset-go-client" + "github.com/rockset/rockset-go-client/internal/test" + "github.com/rockset/rockset-go-client/option" ) type kafkaConfig struct { @@ -103,7 +105,12 @@ func createConnector(url, name string, cfg ConnectorConfig) error { } c := http.Client{} - resp, err := c.Post(url, "application/json", bytes.NewReader(body)) + req, err := http.NewRequestWithContext(context.TODO(), http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return err + } + + resp, err := c.Do(req) if err != nil { return err } @@ -121,7 +128,7 @@ func createConnector(url, name string, cfg ConnectorConfig) error { } func deleteConnector(url, name string) error { c := http.Client{} - r, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%s", url, name), nil) + r, err := http.NewRequestWithContext(context.TODO(), http.MethodDelete, fmt.Sprintf("%s/%s", url, name), nil) if err != nil { return err } @@ -146,7 +153,11 @@ func deleteConnector(url, name string) error { func waitForKafkaConnect(t *testing.T, url string) func() error { return func() error { c := http.Client{} - resp, err := c.Get(url) + req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, url, nil) + if err != nil { + return err + } + resp, err := c.Do(req) if err != nil { return err } @@ -224,7 +235,8 @@ func connectParams(prefix, bootstrapServers, username, password string) []string fmt.Sprintf("CONNECT_%sSSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https", prefix), fmt.Sprintf("CONNECT_%sSECURITY_PROTOCOL=SASL_SSL", prefix), fmt.Sprintf("CONNECT_%sSASL_MECHANISM=PLAIN", prefix), - fmt.Sprintf(`CONNECT_%sSASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="%s" password="%s";`, prefix, username, password), + fmt.Sprintf(`CONNECT_%sSASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule `+ + `required username="%s" password="%s";`, prefix, username, password), fmt.Sprintf("CONNECT_%sREQUEST_TIMEOUT_MS=20000", prefix), fmt.Sprintf("CONNECT_%sRETRY_BACKOFF_MS=500", prefix), } diff --git a/option/actions_test.go b/option/actions_test.go index 296de1e9..0864d045 100644 --- a/option/actions_test.go +++ b/option/actions_test.go @@ -1,9 +1,11 @@ package option_test import ( - "github.com/rockset/rockset-go-client/option" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" + + "github.com/rockset/rockset-go-client/option" ) func TestGetGlobalAction(t *testing.T) { diff --git a/organizations_test.go b/organizations_test.go index 29f519de..127af753 100644 --- a/organizations_test.go +++ b/organizations_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - + "github.com/rockset/rockset-go-client/internal/test" ) diff --git a/queries_test.go b/queries_test.go index 98ec7ff6..ff1f43ba 100644 --- a/queries_test.go +++ b/queries_test.go @@ -15,7 +15,8 @@ import ( // for anyone poking around in the code, rockset.sleep() only works for this test org as no sane person would want // to add a sleep in their query -const slowQuery = `script {{{ import * as rockset from "/rockset"; export function delay(x) { rockset.sleep(x); return x; } }}} select _script.delay(2000, q) from unnest([1] as q)` +const slowQuery = `script {{{ import * as rockset from "/rockset"; export function delay(x) ` + + `{ rockset.sleep(x); return x; } }}} select _script.delay(2000, q) from unnest([1] as q)` type QueryIntegrationSuite struct { suite.Suite @@ -23,7 +24,6 @@ type QueryIntegrationSuite struct { } func TestQueryIntegration(t *testing.T) { - s := QueryIntegrationSuite{} suite.Run(t, &s) } diff --git a/retry/exponential.go b/retry/exponential.go index 041f8da6..dbb7cb97 100644 --- a/retry/exponential.go +++ b/retry/exponential.go @@ -80,6 +80,7 @@ func (r Exponential) Retry(ctx context.Context, retryFn Func) error { return ctx.Err() case t := <-t.C: log.Trace().Str("t", t.String()).Msg("wait time") + //nolint:gosec var jitter = time.Duration(jitterFraction*rand.Float64()) * waitInterval waitInterval *= 2 waitInterval += jitter @@ -135,6 +136,7 @@ func (r Exponential) RetryWithCheck(ctx context.Context, checkFn CheckFn) error return ctx.Err() case t := <-t.C: log.Trace().Str("t", t.String()).Msg("wait time") + //nolint:gosec var jitter = time.Duration(jitterFraction*rand.Float64()) * waitInterval waitInterval *= 2 waitInterval += jitter diff --git a/retry/exponential_test.go b/retry/exponential_test.go index 1e18b832..7d12e060 100644 --- a/retry/exponential_test.go +++ b/retry/exponential_test.go @@ -31,7 +31,6 @@ type ExponentialRetrySuite struct { } func TestExponentialRetrySuite(t *testing.T) { - suite.Run(t, &ExponentialRetrySuite{}) } diff --git a/rockset.go b/rockset.go index b74b7d1b..2f49c01a 100644 --- a/rockset.go +++ b/rockset.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/rockset/rockset-go-client/wait" "io" "net/http" "net/http/httputil" @@ -17,6 +16,7 @@ import ( "github.com/rockset/rockset-go-client/openapi" "github.com/rockset/rockset-go-client/retry" + "github.com/rockset/rockset-go-client/wait" ) const ( @@ -82,7 +82,7 @@ func NewClient(options ...RockOption) (*RockClient, error) { } if rc.APIServer == "" { - return nil, NoAPIServerErr + return nil, ErrNoAPIServer } u, err := url.Parse(rc.APIServer) @@ -102,9 +102,9 @@ func NewClient(options ...RockOption) (*RockClient, error) { cfg.Scheme = "https" // we do not allow setting the scheme from the URL as we only support HTTPS if rc.APIKey == "" && rc.Token == "" { - return nil, NoAPICredentialsErr + return nil, ErrNoAPICredentials } else if rc.APIKey != "" && rc.Token != "" { - return nil, DuplicateCredentialsErr + return nil, ErrDuplicateCredentials } else if rc.APIKey != "" { cfg.AddDefaultHeader("Authorization", "apikey "+rc.APIKey) } else { @@ -122,9 +122,9 @@ func NewClient(options ...RockOption) (*RockClient, error) { } var ( - NoAPICredentialsErr = errors.New("no API credentials provided") - DuplicateCredentialsErr = errors.New("duplicate API credentials provided") - NoAPIServerErr = errors.New("no API server provided") + ErrNoAPICredentials = errors.New("no API credentials provided") + ErrDuplicateCredentials = errors.New("duplicate API credentials provided") + ErrNoAPIServer = errors.New("no API server provided") ) // RockOption is the type for RockClient options. diff --git a/rockset_test.go b/rockset_test.go index 0f8a256f..2cc0906a 100644 --- a/rockset_test.go +++ b/rockset_test.go @@ -1,13 +1,13 @@ package rockset_test import ( - "github.com/stretchr/testify/suite" "os" "testing" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/rockset/rockset-go-client" "github.com/rockset/rockset-go-client/internal/test" @@ -58,12 +58,12 @@ func (s *RockOptionSuite) SetupTest() { func (s *RockOptionSuite) TestMissingCreds() { _, err := rockset.NewClient(rockset.WithAPIServer("server")) - s.ErrorIs(err, rockset.NoAPICredentialsErr) + s.ErrorIs(err, rockset.ErrNoAPICredentials) } func (s *RockOptionSuite) TestMissingServer() { _, err := rockset.NewClient(rockset.WithAPIKey("key")) - s.ErrorIs(err, rockset.NoAPIServerErr) + s.ErrorIs(err, rockset.ErrNoAPIServer) } func (s *RockOptionSuite) TestLastCredWins() { diff --git a/roles_test.go b/roles_test.go index 5933d189..88e92648 100644 --- a/roles_test.go +++ b/roles_test.go @@ -82,7 +82,8 @@ func (s *RoleIntegrationSuite) TestUpdate() { option.WithIntegrationPrivilege(option.CreateCollectionIntegration, "test"), option.WithWorkspacePrivilege(option.QueryDataWs, "commons"), option.WithWorkspacePrivilege(option.CreateViewWs, "commons", option.WithCluster("usw2a1")), - option.WithVirtualInstancePrivilege(option.QueryVirtualInstanceAction, rocksetCircleCIMainVI, option.WithCluster("usw2a1")), + option.WithVirtualInstancePrivilege(option.QueryVirtualInstanceAction, rocksetCircleCIMainVI, + option.WithCluster("usw2a1")), ) s.NoError(err) s.Equal(s.name, role.GetRoleName()) diff --git a/virtual_instances.go b/virtual_instances.go index 054b61dc..dd9ed1fd 100644 --- a/virtual_instances.go +++ b/virtual_instances.go @@ -2,9 +2,10 @@ package rockset import ( "context" - "github.com/rs/zerolog" "net/http" + "github.com/rs/zerolog" + rockerr "github.com/rockset/rockset-go-client/errors" "github.com/rockset/rockset-go-client/openapi" "github.com/rockset/rockset-go-client/option" diff --git a/virtual_instances_test.go b/virtual_instances_test.go index fde87364..7725bb49 100644 --- a/virtual_instances_test.go +++ b/virtual_instances_test.go @@ -137,7 +137,6 @@ func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_0_Create() { err = s.rc.Wait.UntilVirtualInstanceActive(ctx, s.vID) s.Require().NoError(err) s.T().Logf("vi %s is active (%s)", s.vID, time.Since(t0)) - t0 = time.Now() } func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_1_Collection() { @@ -165,8 +164,6 @@ func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_1_Collection() { err = s.rc.Wait.UntilCollectionHasDocuments(ctx, s.workspace, s.collection, 2_830) s.Require().NoError(err) s.T().Log("collection has documents", time.Since(t0)) - t0 = time.Now() - } func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_2_Mount() { @@ -187,7 +184,6 @@ func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_2_Mount() { err = s.rc.Wait.UntilMountActive(ctx, s.vID, s.workspace, s.collection) s.Require().NoError(err) s.T().Logf("mount is active: %s", time.Since(t0)) - t0 = time.Now() } func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_3_Query() { @@ -205,7 +201,6 @@ func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_3_Query() { s.Require().NoError(err) s.T().Logf("listed queries (%s)", time.Since(t0)) s.NotEmpty(queries) - t0 = time.Now() } func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_4_Unmount() { @@ -222,7 +217,6 @@ func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_4_Unmount() { _, err = rc.UnmountCollection(ctx, s.vID, s.path) s.Require().NoError(err) s.T().Logf("unmounted (%s)", time.Since(t0)) - t0 = time.Now() } func (s *VirtualInstanceIntegrationSuite) TestVirtualInstance_5_Suspend() { diff --git a/wait/alias_test.go b/wait/alias_test.go index 5bd6ca2f..7a54b790 100644 --- a/wait/alias_test.go +++ b/wait/alias_test.go @@ -14,7 +14,7 @@ func TestWait_untilAliasReady(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetAliasReturnsOnCall(0, openapi.Alias{}, NotFoundErr) + rs.GetAliasReturnsOnCall(0, openapi.Alias{}, ErrNotFound) rs.GetAliasReturnsOnCall(1, openapi.Alias{}, nil) err := wait.New(&rs).UntilAliasAvailable(ctx, "workspace", "alias") @@ -27,7 +27,7 @@ func TestWait_untilAliasGone(t *testing.T) { rs := fakeRocksetClient() rs.GetAliasReturnsOnCall(0, openapi.Alias{}, nil) - rs.GetAliasReturnsOnCall(1, openapi.Alias{}, NotFoundErr) + rs.GetAliasReturnsOnCall(1, openapi.Alias{}, ErrNotFound) err := wait.New(&rs).UntilAliasGone(ctx, "workspace", "alias") assert.NoError(t, err) diff --git a/wait/collections_test.go b/wait/collections_test.go index 1afb401e..ae2db45b 100644 --- a/wait/collections_test.go +++ b/wait/collections_test.go @@ -15,9 +15,12 @@ func TestWait_untilCollectionReady(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusInitialized.String())}, nil) - rs.GetCollectionReturnsOnCall(1, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusCreated.String())}, nil) - rs.GetCollectionReturnsOnCall(2, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady.String())}, nil) + rs.GetCollectionReturnsOnCall(0, openapi.Collection{ + Status: stringPtr(option.CollectionStatusInitialized)}, nil) + rs.GetCollectionReturnsOnCall(1, openapi.Collection{ + Status: stringPtr(option.CollectionStatusCreated)}, nil) + rs.GetCollectionReturnsOnCall(2, openapi.Collection{ + Status: stringPtr(option.CollectionStatusReady)}, nil) err := wait.New(&rs).UntilCollectionReady(ctx, "workspace", "collection") assert.NoError(t, err) @@ -28,8 +31,9 @@ func TestWait_untilCollectionGone(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetCollectionReturnsOnCall(0, openapi.Collection{Status: openapi.PtrString(option.CollectionStatusReady.String())}, nil) - rs.GetCollectionReturnsOnCall(1, openapi.Collection{}, NotFoundErr) + rs.GetCollectionReturnsOnCall(0, openapi.Collection{ + Status: stringPtr(option.CollectionStatusReady)}, nil) + rs.GetCollectionReturnsOnCall(1, openapi.Collection{}, ErrNotFound) err := wait.New(&rs).UntilCollectionGone(ctx, "workspace", "collection") assert.NoError(t, err) diff --git a/wait/query_lambda.go b/wait/query_lambda.go index 70ea8781..fec4a8f7 100644 --- a/wait/query_lambda.go +++ b/wait/query_lambda.go @@ -17,7 +17,8 @@ func (w *Waiter) UntilQueryLambdaVersionGone(ctx context.Context, workspace, nam // UntilQueryLambdaVersionActive waits until the Virtual Instance is active. func (w *Waiter) UntilQueryLambdaVersionActive(ctx context.Context, workspace, name, version string) error { return w.rc.RetryWithCheck(ctx, - ResourceHasState(ctx, []option.QueryLambdaState{option.QueryLambdaActive}, []option.QueryLambdaState{option.QueryLambdaInvalid}, + ResourceHasState(ctx, + []option.QueryLambdaState{option.QueryLambdaActive}, []option.QueryLambdaState{option.QueryLambdaInvalid}, func(ctx context.Context) (option.QueryLambdaState, error) { ql, err := w.rc.GetQueryLambdaVersion(ctx, workspace, name, version) return option.QueryLambdaState(ql.GetState()), err diff --git a/wait/query_lambda_test.go b/wait/query_lambda_test.go index 5c6219da..b06d5101 100644 --- a/wait/query_lambda_test.go +++ b/wait/query_lambda_test.go @@ -2,21 +2,22 @@ package wait_test import ( "context" - rockerr "github.com/rockset/rockset-go-client/errors" "testing" + "github.com/stretchr/testify/assert" + + rockerr "github.com/rockset/rockset-go-client/errors" "github.com/rockset/rockset-go-client/openapi" "github.com/rockset/rockset-go-client/option" "github.com/rockset/rockset-go-client/wait" - "github.com/stretchr/testify/assert" ) func TestWait_untilQueryLambdaActive(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil) - rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaActive.String())}, nil) + rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: &emptyString}, nil) + rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: stringPtr(option.QueryLambdaActive)}, nil) err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v") assert.NoError(t, err) @@ -27,8 +28,8 @@ func TestWait_untilQueryLambdaActive_invalid(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: openapi.PtrString("")}, nil) - rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: openapi.PtrString(option.QueryLambdaInvalid.String())}, nil) + rs.GetQueryLambdaVersionReturnsOnCall(0, openapi.QueryLambdaVersion{State: &emptyString}, nil) + rs.GetQueryLambdaVersionReturnsOnCall(1, openapi.QueryLambdaVersion{State: stringPtr(option.QueryLambdaInvalid)}, nil) err := wait.New(&rs).UntilQueryLambdaVersionActive(ctx, "ws", "ql", "v") assert.ErrorIs(t, err, rockerr.ErrBadWaitState) diff --git a/wait/query_test.go b/wait/query_test.go index 6b1ae6fb..c264b120 100644 --- a/wait/query_test.go +++ b/wait/query_test.go @@ -2,12 +2,12 @@ package wait_test import ( "context" - "github.com/rockset/rockset-go-client/option" "testing" "github.com/stretchr/testify/assert" "github.com/rockset/rockset-go-client/openapi" + "github.com/rockset/rockset-go-client/option" "github.com/rockset/rockset-go-client/wait" ) diff --git a/wait/virtual_instance.go b/wait/virtual_instance.go index 84da320a..5d1c7f1f 100644 --- a/wait/virtual_instance.go +++ b/wait/virtual_instance.go @@ -3,11 +3,11 @@ package wait import ( "context" "errors" - rockerr "github.com/rockset/rockset-go-client/errors" - "github.com/rockset/rockset-go-client/retry" "regexp" + rockerr "github.com/rockset/rockset-go-client/errors" "github.com/rockset/rockset-go-client/option" + "github.com/rockset/rockset-go-client/retry" ) // UntilVirtualInstanceActive waits until the Virtual Instance is active. @@ -29,7 +29,8 @@ func (w *Waiter) UntilVirtualInstanceGone(ctx context.Context, id string) error // UntilVirtualInstanceSuspended waits until the Virtual Instance is suspended. func (w *Waiter) UntilVirtualInstanceSuspended(ctx context.Context, id string) error { - return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, []option.VirtualInstanceState{option.VirtualInstanceSuspended}, nil, + return w.rc.RetryWithCheck(ctx, ResourceHasState(ctx, + []option.VirtualInstanceState{option.VirtualInstanceSuspended}, nil, func(ctx context.Context) (option.VirtualInstanceState, error) { vi, err := w.rc.GetVirtualInstance(ctx, id) return option.VirtualInstanceState(vi.GetState()), err diff --git a/wait/virtual_instance_test.go b/wait/virtual_instance_test.go index 5d613fd9..91e202c7 100644 --- a/wait/virtual_instance_test.go +++ b/wait/virtual_instance_test.go @@ -2,12 +2,12 @@ package wait_test import ( "context" - rockerr "github.com/rockset/rockset-go-client/errors" "net/http" "testing" "github.com/stretchr/testify/assert" + rockerr "github.com/rockset/rockset-go-client/errors" "github.com/rockset/rockset-go-client/openapi" "github.com/rockset/rockset-go-client/option" "github.com/rockset/rockset-go-client/wait" @@ -17,8 +17,10 @@ func TestWait_untilVirtualInstanceActive(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceInitializing.String())}, nil) - rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive.String())}, nil) + rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{ + State: stringPtr(option.VirtualInstanceInitializing)}, nil) + rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{ + State: stringPtr(option.VirtualInstanceActive)}, nil) err := wait.New(&rs).UntilVirtualInstanceActive(ctx, "id") assert.NoError(t, err) @@ -29,9 +31,12 @@ func TestWait_untilVirtualInstanceSuspended(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive.String())}, nil) - rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceSuspending.String())}, nil) - rs.GetVirtualInstanceReturnsOnCall(2, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceSuspended.String())}, nil) + rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{ + State: stringPtr(option.VirtualInstanceActive)}, nil) + rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{ + State: stringPtr(option.VirtualInstanceSuspending)}, nil) + rs.GetVirtualInstanceReturnsOnCall(2, openapi.VirtualInstance{ + State: stringPtr(option.VirtualInstanceSuspended)}, nil) err := wait.New(&rs).UntilVirtualInstanceSuspended(ctx, "id") assert.NoError(t, err) @@ -42,8 +47,9 @@ func TestWait_untilVirtualInstanceGone(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{State: openapi.PtrString(option.VirtualInstanceActive.String())}, nil) - rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{}, NotFoundErr) + rs.GetVirtualInstanceReturnsOnCall(0, openapi.VirtualInstance{ + State: stringPtr(option.VirtualInstanceActive)}, nil) + rs.GetVirtualInstanceReturnsOnCall(1, openapi.VirtualInstance{}, ErrNotFound) err := wait.New(&rs).UntilVirtualInstanceGone(ctx, "id") assert.NoError(t, err) @@ -54,8 +60,8 @@ func TestWait_untilMountActive(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetCollectionMountReturnsOnCall(0, openapi.CollectionMount{State: openapi.PtrString(option.MountCreating.String())}, nil) - rs.GetCollectionMountReturnsOnCall(1, openapi.CollectionMount{State: openapi.PtrString(option.MountActive.String())}, nil) + rs.GetCollectionMountReturnsOnCall(0, openapi.CollectionMount{State: stringPtr(option.MountCreating)}, nil) + rs.GetCollectionMountReturnsOnCall(1, openapi.CollectionMount{State: stringPtr(option.MountActive)}, nil) err := wait.New(&rs).UntilMountActive(ctx, "id", "workspace", "collection") assert.NoError(t, err) @@ -66,9 +72,9 @@ func TestWait_untilMountGone404(t *testing.T) { ctx := context.TODO() rs := fakeRocksetClient() - rs.GetCollectionMountReturnsOnCall(0, openapi.CollectionMount{State: openapi.PtrString(option.MountActive.String())}, nil) - rs.GetCollectionMountReturnsOnCall(1, openapi.CollectionMount{State: openapi.PtrString(option.MountDeleting.String())}, nil) - rs.GetCollectionMountReturnsOnCall(2, openapi.CollectionMount{}, NotFoundErr) + rs.GetCollectionMountReturnsOnCall(0, openapi.CollectionMount{State: stringPtr(option.MountActive)}, nil) + rs.GetCollectionMountReturnsOnCall(1, openapi.CollectionMount{State: stringPtr(option.MountDeleting)}, nil) + rs.GetCollectionMountReturnsOnCall(2, openapi.CollectionMount{}, ErrNotFound) err := wait.New(&rs).UntilMountGone(ctx, "id", "workspace", "collection") assert.NoError(t, err) @@ -84,8 +90,8 @@ func TestWait_untilMountGone400(t *testing.T) { e404.ErrorModel.Message = &msg rs := fakeRocksetClient() - rs.GetCollectionMountReturnsOnCall(0, openapi.CollectionMount{State: openapi.PtrString(option.MountActive.String())}, nil) - rs.GetCollectionMountReturnsOnCall(1, openapi.CollectionMount{State: openapi.PtrString(option.MountDeleting.String())}, nil) + rs.GetCollectionMountReturnsOnCall(0, openapi.CollectionMount{State: stringPtr(option.MountActive)}, nil) + rs.GetCollectionMountReturnsOnCall(1, openapi.CollectionMount{State: stringPtr(option.MountDeleting)}, nil) rs.GetCollectionMountReturnsOnCall(2, openapi.CollectionMount{}, e404) err := wait.New(&rs).UntilMountGone(ctx, "id", "workspace", "collection") diff --git a/wait/wait.go b/wait/wait.go index 78c7d20b..cbf10813 100644 --- a/wait/wait.go +++ b/wait/wait.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/rs/zerolog" rockerr "github.com/rockset/rockset-go-client/errors" @@ -35,8 +36,6 @@ func New(rs ResourceGetter) *Waiter { return &Waiter{rs} } -var ErrBadWaitState = errors.New("encountered bad state while waiting") - // ResourceHasState implements RetryFn to wait until the resource has the desired state, and if a bad state is // encountered it will return ErrBadWaitState func ResourceHasState[T fmt.Stringer](ctx context.Context, validStates, badStates []T, diff --git a/wait/wait_test.go b/wait/wait_test.go index 88621fbb..700404ca 100644 --- a/wait/wait_test.go +++ b/wait/wait_test.go @@ -3,7 +3,6 @@ package wait_test import ( "context" "fmt" - "github.com/rockset/rockset-go-client/wait" "net/http" "testing" "time" @@ -14,14 +13,22 @@ import ( rockerr "github.com/rockset/rockset-go-client/errors" "github.com/rockset/rockset-go-client/openapi" "github.com/rockset/rockset-go-client/retry" + "github.com/rockset/rockset-go-client/wait" "github.com/rockset/rockset-go-client/wait/fake" ) -var NotFoundErr = rockerr.Error{ +var ErrNotFound = rockerr.Error{ ErrorModel: openapi.NewErrorModel(), StatusCode: http.StatusNotFound, } +func stringPtr[T fmt.Stringer](status T) *string { + s := status.String() + return &s +} + +var emptyString = "" + // return a fake Rockset client with an ExponentialRetry that doesn't back off func fakeRocksetClient() fake.FakeResourceGetter { return fake.FakeResourceGetter{ diff --git a/writer/writer.go b/writer/writer.go index 6bc1b4c1..580f0873 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -140,6 +140,8 @@ func (w *Writer) Stop() { // ... // w.Stop() // w.Wait() +// +//nolint:funlen func (w *Writer) Run(ctx context.Context) { // TODO: should this panic if started more than once? w.wg.Add(1) @@ -291,10 +293,9 @@ func (w *Writer) Workers() int { // buffer adds Request into a per workspace and collection buffer func (w *Writer) buffer(r Request) { - wsBuffer, found := w.buffers[r.Workspace] + _, found := w.buffers[r.Workspace] if !found { - wsBuffer = make(map[string][]interface{}) - w.buffers[r.Workspace] = wsBuffer + w.buffers[r.Workspace] = make(map[string][]interface{}) } w.buffers[r.Workspace][r.Collection] = append(w.buffers[r.Workspace][r.Collection], structs.Map(r.Data))