From e2cc7faad1852c8c257c0825890850194f9a5471 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Fri, 26 Sep 2025 15:39:50 +0300 Subject: [PATCH] driver: implement tcs driver for tarantool support - New Tarantool Config Storage (TCS) driver package (driver/tcs). - Integration testing for TCS. - Updated .golangci.yml. - Modified core driver interfaces to be compatible with tcs. - Update go module dependencies. Closes #TNTP-4188 --- .../actions/prepare-ee-test-env/action.yml | 51 ++ .github/workflows/tcs.yaml | 57 ++ .golangci.yml | 14 + Makefile | 4 +- driver/driver.go | 4 +- driver/etcd/etcd.go | 2 +- driver/tcs/error.go | 85 ++ driver/tcs/error_test.go | 384 +++++++++ driver/tcs/examples_test.go | 444 ++++++++++ driver/tcs/integration_test.go | 755 ++++++++++++++++++ driver/tcs/operations.go | 104 +++ driver/tcs/operations_test.go | 131 +++ driver/tcs/predicate.go | 102 +++ driver/tcs/predicate_test.go | 225 ++++++ driver/tcs/tcs.go | 120 +-- driver/tcs/tcs_test.go | 582 ++++++++++++++ driver/tcs/txn.go | 84 ++ go.mod | 2 +- internal/mocks/doer_watcher.go | 3 + internal/mocks/doer_watcher_mock.go | 739 +++++++++++++++++ internal/mocks/driver_mock.go | 20 +- internal/mocks/tarantool_watcher.go | 3 + internal/mocks/tarantool_watcher_mock.go | 248 ++++++ internal/testing/doer.go | 85 ++ internal/testing/doerwithwatcher.go | 96 +++ internal/testing/request.go | 54 ++ internal/testing/response.go | 81 ++ internal/testing/t.go | 40 + kv/kv.go | 4 - storage_test.go | 2 +- tx/requestresponse.go | 10 +- watch/event.go | 47 +- 32 files changed, 4462 insertions(+), 120 deletions(-) create mode 100644 .github/actions/prepare-ee-test-env/action.yml create mode 100644 .github/workflows/tcs.yaml create mode 100644 driver/tcs/error.go create mode 100644 driver/tcs/error_test.go create mode 100644 driver/tcs/examples_test.go create mode 100644 driver/tcs/integration_test.go create mode 100644 driver/tcs/operations.go create mode 100644 driver/tcs/operations_test.go create mode 100644 driver/tcs/predicate.go create mode 100644 driver/tcs/predicate_test.go create mode 100644 driver/tcs/tcs_test.go create mode 100644 driver/tcs/txn.go create mode 100644 internal/mocks/doer_watcher.go create mode 100644 internal/mocks/doer_watcher_mock.go create mode 100644 internal/mocks/tarantool_watcher.go create mode 100644 internal/mocks/tarantool_watcher_mock.go create mode 100644 internal/testing/doer.go create mode 100644 internal/testing/doerwithwatcher.go create mode 100644 internal/testing/request.go create mode 100644 internal/testing/response.go create mode 100644 internal/testing/t.go diff --git a/.github/actions/prepare-ee-test-env/action.yml b/.github/actions/prepare-ee-test-env/action.yml new file mode 100644 index 0000000..5d97379 --- /dev/null +++ b/.github/actions/prepare-ee-test-env/action.yml @@ -0,0 +1,51 @@ +name: "Prepare test environment with Tarantool EE" +description: "Prepares test environment with Tarantool EE" + +inputs: + sdk-version: + required: true + type: string + sdk-build: + required: false + type: string + default: release + sdk-gc: + required: false + type: string + default: gc64 + sdk-download-token: + required: true + type: string + +runs: + using: "composite" + steps: + - name: Cache Tarantool SDK + id: cache-sdk + uses: actions/cache@v3 + with: + path: tarantool-enterprise + key: ${{ matrix.sdk-version }} + + - name: Download Tarantool SDK + run: | + ARCHIVE_NAME=tarantool-enterprise-sdk-${{ inputs.sdk-gc }}-${{ inputs.sdk-version }}.tar.gz + ARCHIVE_PATH=$(echo ${{ inputs.sdk-version }} | sed -rn \ + 's/^([0-9]+)\.([0-9]+)\.([0-9]+-){2}([a-z0-9]+-)?r[0-9]+\.([a-z]+)\.([a-z0-9_]+)$/${{ inputs.sdk-build }}\/\5\/\6\/\1\.\2/p') + curl -O -L \ + https://${{ inputs.sdk-download-token }}@download.tarantool.io/enterprise/${ARCHIVE_PATH}/${ARCHIVE_NAME} + if [ $(stat -c%s "${ARCHIVE_NAME}") -eq 0 ]; then + echo "Failed to download Tarantool EE SDK: '${ARCHIVE_PATH}/${ARCHIVE_NAME}'" + exit 1 + fi + tar -xzf ${ARCHIVE_NAME} + rm -f ${ARCHIVE_NAME} + source tarantool-enterprise/env.sh + shell: bash + + - name: Add SDK to PATH and set TARANTOOL_SDK_PATH variable + run: | + SDK_PATH="$(realpath tarantool-enterprise)" + echo "${SDK_PATH}" >> ${GITHUB_PATH} + echo "TARANTOOL_SDK_PATH=${SDK_PATH}" >> ${GITHUB_ENV} + shell: bash diff --git a/.github/workflows/tcs.yaml b/.github/workflows/tcs.yaml new file mode 100644 index 0000000..99f442f --- /dev/null +++ b/.github/workflows/tcs.yaml @@ -0,0 +1,57 @@ +name: tcs.yaml +on: + pull_request_target: + types: [ labeled ] + +env: + # Note: Use exactly match version of tool, to avoid unexpected issues with test on CI. + GO_VERSION: '1.24.9' + +jobs: + full-ci-ee: + # Tests will run only when the pull request is labeled with `full-ci`. To + # avoid security problems, the label must be reset manually for every run. + # + # We need to use `pull_request_target` because it has access to base + # repository secrets unlike `pull_request`. + if: (github.event_name == 'push') || + (github.event_name == 'pull_request_target' && + github.event.action == 'labeled' && + github.event.label.name == 'full-ci') || + (github.event_name == 'pull_request' && + github.event.action == 'synchronize' && + github.event.pull_request.head.repo.full_name == github.repository && + contains(github.event.pull_request.labels.*.name, 'full-ci')) + runs-on: ubuntu-22.04 + strategy: + matrix: + sdk-version: + - "3.5.0-0-r70.linux.x86_64" + fail-fast: false + steps: + # `ref` as merge request is needed for pull_request_target because this + # target runs in the context of the base commit of the pull request. + - uses: actions/checkout@v4 + if: github.event_name == 'pull_request_target' + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha }} + + - uses: actions/checkout@v4 + if: github.event_name != 'pull_request_target' + with: + fetch-depth: 0 + + - name: Prepare EE env + uses: ./.github/actions/prepare-ee-test-env + with: + sdk-version: '${{ matrix.sdk-version }}' + sdk-download-token: '${{ secrets.SDK_DOWNLOAD_TOKEN }}' + + - name: Integration tests + run: + go test ./... -count=1 -v + + - name: Integration tests + run: + go test ./... -count=100 -v -race diff --git a/.golangci.yml b/.golangci.yml index 1dac47c..443c8fe 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -25,8 +25,16 @@ linters: - path: _test.go linters: - wrapcheck + - err113 + - funlen settings: + varnamelen: + ignore-names: + - tt + ignore-decls: + - mc *minimock.Controller + - t T godot: scope: all lll: @@ -50,6 +58,8 @@ linters: - "go.etcd.io/etcd/client/v3" - "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-option" + - "github.com/vmihailenco/msgpack/v5" + - "github.com/tarantool/go-iproto" test: files: - "$test" @@ -57,3 +67,7 @@ linters: - $gostd - "github.com/tarantool/go-storage" - "github.com/stretchr/testify" + - "github.com/tarantool/go-tarantool/v2" + - "github.com/tarantool/go-iproto" + - "github.com/vmihailenco/msgpack/v5" + - "github.com/gojuno/minimock/v3" diff --git a/Makefile b/Makefile index 73bf4cf..0f5bf3f 100644 --- a/Makefile +++ b/Makefile @@ -10,12 +10,12 @@ codespell: .PHONY: test test: @echo "Running tests" - @go test ./... -count=1 + @go test ./... -count=1 -v .PHONY: testrace testrace: @echo "Running tests with race flag" - @go test ./... -count=100 -race + @go test ./... -count=100 -race -v -failfast .PHONY: coverage coverage: diff --git a/driver/driver.go b/driver/driver.go index dc49a03..3ed9188 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -26,5 +26,7 @@ type Driver interface { // Watch establishes a watch stream for changes to a specific key or prefix. // The returned channel will receive events as changes occur. - Watch(ctx context.Context, key []byte, opts ...watch.Option) <-chan watch.Event + // The returned cleanup function should be called to stop the watch and release resources. + // An error is returned if the watch could not be established. + Watch(ctx context.Context, key []byte, opts ...watch.Option) (<-chan watch.Event, func(), error) } diff --git a/driver/etcd/etcd.go b/driver/etcd/etcd.go index 91ed8fc..152b556 100644 --- a/driver/etcd/etcd.go +++ b/driver/etcd/etcd.go @@ -69,6 +69,6 @@ func (d Driver) Execute( // Watch monitors changes to a specific key and returns a stream of events. // It supports optional watch configuration through the opts parameter. -func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) <-chan watch.Event { +func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) { panic("implement me") } diff --git a/driver/tcs/error.go b/driver/tcs/error.go new file mode 100644 index 0000000..94b25f3 --- /dev/null +++ b/driver/tcs/error.go @@ -0,0 +1,85 @@ +package tcs + +import ( + "fmt" +) + +// DecodingError represents an error that occurs during decoding operations. +type DecodingError struct { + ObjectType string + Text string + Err error +} + +// Error returns the error message. +func (e DecodingError) Error() string { + suffix := e.ObjectType + if e.Text != "" { + suffix = fmt.Sprintf("%s, %s", suffix, e.Text) + } + + return fmt.Sprintf("failed to decode %s: %s", suffix, e.Err) +} + +func (e DecodingError) Unwrap() error { + return e.Err +} + +// NewTxnOpResponseDecodingError returns a new txnOpResponse decoding error. +func NewTxnOpResponseDecodingError(err error) error { + if err == nil { + return nil + } + + return DecodingError{ + ObjectType: "txnOpResponse", + Text: "", + Err: err, + } +} + +// EncodingError represents an error that occurs during encoding operations. +type EncodingError struct { + ObjectType string + Text string + Err error +} + +// Error returns the error message. +func (e EncodingError) Error() string { + if e.Text == "" { + return fmt.Sprintf("failed to encode %s: %s", e.ObjectType, e.Err) + } + + return fmt.Sprintf("failed to encode %s, %s: %s", e.ObjectType, e.Text, e.Err) +} + +func (e EncodingError) Unwrap() error { + return e.Err +} + +// NewOperationEncodingError returns a new operation encoding error. +func NewOperationEncodingError(text string, err error) error { + if err == nil { + return nil + } + + return EncodingError{ + ObjectType: "operation", + Text: text, + Err: err, + } +} + +// NewPredicateEncodingError returns a new predicate encoding error. +func NewPredicateEncodingError(text string, err error) error { + if err == nil { + return nil + } + + return EncodingError{ + ObjectType: "predicate", + Text: text, + Err: err, + } +} diff --git a/driver/tcs/error_test.go b/driver/tcs/error_test.go new file mode 100644 index 0000000..e0105e1 --- /dev/null +++ b/driver/tcs/error_test.go @@ -0,0 +1,384 @@ +package tcs_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tarantool/go-storage/driver/tcs" +) + +func TestEncodingError_Error(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + error tcs.EncodingError + expected string + }{ + { + name: "operation encoding error with text and error", + error: tcs.EncodingError{ + ObjectType: "operation", + Text: "test operation", + Err: errors.New("encoding failed"), + }, + expected: "failed to encode operation, test operation: encoding failed", + }, + { + name: "predicate encoding error with text and error", + error: tcs.EncodingError{ + ObjectType: "predicate", + Text: "test predicate", + Err: errors.New("marshal error"), + }, + expected: "failed to encode predicate, test predicate: marshal error", + }, + { + name: "custom object type encoding error", + error: tcs.EncodingError{ + ObjectType: "custom", + Text: "custom object", + Err: errors.New("custom error"), + }, + expected: "failed to encode custom, custom object: custom error", + }, + { + name: "encoding error with empty text", + error: tcs.EncodingError{ + ObjectType: "operation", + Text: "", + Err: errors.New("error"), + }, + expected: "failed to encode operation: error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + result := tt.error.Error() + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestEncodingError_Unwrap(t *testing.T) { + t.Parallel() + + t.Run("some", func(t *testing.T) { + t.Parallel() + + innerErr := errors.New("inner error") + decodingErr := tcs.EncodingError{ + ObjectType: "operation", + Text: "test operation", + Err: innerErr, + } + + result := decodingErr.Unwrap() + assert.Equal(t, innerErr, result) + }) + + // Shouldn't be possible, but let's test it anyway. + t.Run("nil", func(t *testing.T) { + t.Parallel() + + decodingErr := tcs.DecodingError{ + ObjectType: "operation", + Text: "test operation", + Err: nil, + } + + err := decodingErr.Unwrap() + assert.NoError(t, err) + }) +} + +func TestNewOperationEncodingError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + text string + err error + expected error + }{ + { + name: "operation encoding error with text and error", + text: "test operation", + err: errors.New("encoding failed"), + expected: tcs.EncodingError{ + ObjectType: "operation", + Text: "test operation", + Err: errors.New("encoding failed"), + }, + }, + { + name: "operation encoding error with empty text", + text: "", + err: errors.New("error"), + expected: tcs.EncodingError{ + ObjectType: "operation", + Text: "", + Err: errors.New("error"), + }, + }, + { + name: "operation encoding error with nil error", + text: "test", + err: nil, + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := tcs.NewOperationEncodingError(tt.text, tt.err) + + if tt.expected == nil { + assert.NoError(t, err) + return + } + + require.Error(t, err) + + var encodingErr tcs.EncodingError + + { + ok := errors.As(err, &encodingErr) + require.True(t, ok, "Expected result to be of type tcs.EncodingError") + } + + var expectedEncodingErr tcs.EncodingError + + { + ok := errors.As(tt.expected, &expectedEncodingErr) + require.True(t, ok, "Expected tt.expected to be of type tcs.EncodingError") + } + + assert.Equal(t, expectedEncodingErr.ObjectType, encodingErr.ObjectType) + assert.Equal(t, expectedEncodingErr.Text, encodingErr.Text) + + assert.Equal(t, expectedEncodingErr.Err.Error(), encodingErr.Err.Error()) + }) + } +} + +func TestNewPredicateEncodingError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + text string + err error + expected error + }{ + { + name: "predicate encoding error with text and error", + text: "test predicate", + err: errors.New("marshal error"), + expected: tcs.EncodingError{ + ObjectType: "predicate", + Text: "test predicate", + Err: errors.New("marshal error"), + }, + }, + { + name: "predicate encoding error with empty text", + text: "", + err: errors.New("error"), + expected: tcs.EncodingError{ + ObjectType: "predicate", + Text: "", + Err: errors.New("error"), + }, + }, + { + name: "predicate encoding error with nil error", + text: "test", + err: nil, + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := tcs.NewPredicateEncodingError(tt.text, tt.err) + + if tt.expected == nil { + assert.NoError(t, err) + return + } + + require.Error(t, err) + + var encodingErr tcs.EncodingError + { + ok := errors.As(err, &encodingErr) + require.True(t, ok, "Expected result to be of type tcs.EncodingError") + } + + var expectedEncodingErr tcs.EncodingError + { + ok := errors.As(tt.expected, &expectedEncodingErr) + require.True(t, ok, "Expected tt.expected to be of type tcs.EncodingError") + } + + assert.Equal(t, expectedEncodingErr.ObjectType, encodingErr.ObjectType) + assert.Equal(t, expectedEncodingErr.Text, encodingErr.Text) + + assert.Equal(t, expectedEncodingErr.Err.Error(), encodingErr.Err.Error()) + }) + } +} + +func TestDecodingError_Error(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + error tcs.DecodingError + expected string + }{ + { + name: "decoding error with object type and error", + error: tcs.DecodingError{ + ObjectType: "txnOpResponse", + Text: "", + Err: errors.New("decoding failed"), + }, + expected: "failed to decode txnOpResponse: decoding failed", + }, + { + name: "decoding error with object type, text and error", + error: tcs.DecodingError{ + ObjectType: "txnOpResponse", + Text: "response data", + Err: errors.New("invalid format"), + }, + expected: "failed to decode txnOpResponse, response data: invalid format", + }, + { + name: "decoding error with custom object type", + error: tcs.DecodingError{ + ObjectType: "customObject", + Text: "custom data", + Err: errors.New("custom error"), + }, + expected: "failed to decode customObject, custom data: custom error", + }, + { + name: "decoding error with empty text", + error: tcs.DecodingError{ + ObjectType: "txnOpResponse", + Text: "", + Err: errors.New("error"), + }, + expected: "failed to decode txnOpResponse: error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + result := tt.error.Error() + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestDecodingError_Unwrap(t *testing.T) { + t.Parallel() + + t.Run("some", func(t *testing.T) { + t.Parallel() + + innerErr := errors.New("inner error") + decodingErr := tcs.DecodingError{ + ObjectType: "txnOpResponse", + Text: "test", + Err: innerErr, + } + + result := decodingErr.Unwrap() + assert.Equal(t, innerErr, result) + }) + + // Shouldn't be possible, but let's test it anyway. + t.Run("nil", func(t *testing.T) { + t.Parallel() + + decodingErr := tcs.DecodingError{ + ObjectType: "txnOpResponse", + Text: "test", + Err: nil, + } + + assert.NoError(t, decodingErr.Unwrap()) + }) +} + +func TestNewTxnOpResponseDecodingError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + expected error + }{ + { + name: "txnOpResponse decoding error with error", + err: errors.New("decoding failed"), + expected: tcs.DecodingError{ + ObjectType: "txnOpResponse", + Text: "", + Err: errors.New("decoding failed"), + }, + }, + { + name: "txnOpResponse decoding error with nil error", + err: nil, + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := tcs.NewTxnOpResponseDecodingError(tt.err) + + if tt.expected == nil { + assert.NoError(t, err) + return + } + + require.Error(t, err) + + var decodingErr tcs.DecodingError + { + ok := errors.As(err, &decodingErr) + require.True(t, ok, "Expected result to be of type tcs.DecodingError") + } + + var expectedDecodingErr tcs.DecodingError + { + ok := errors.As(tt.expected, &expectedDecodingErr) + require.True(t, ok, "Expected tt.expected to be of type tcs.DecodingError") + } + + assert.Equal(t, expectedDecodingErr.ObjectType, decodingErr.ObjectType) + assert.Equal(t, expectedDecodingErr.Text, decodingErr.Text) + assert.Equal(t, expectedDecodingErr.Err.Error(), decodingErr.Err.Error()) + }) + } +} diff --git a/driver/tcs/examples_test.go b/driver/tcs/examples_test.go new file mode 100644 index 0000000..2c93041 --- /dev/null +++ b/driver/tcs/examples_test.go @@ -0,0 +1,444 @@ +package tcs_test + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/tarantool/go-tarantool/v2" + + "github.com/tarantool/go-storage/driver/tcs" + gsTesting "github.com/tarantool/go-storage/internal/testing" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" +) + +func newResponse(success bool, data [][]interface{}) *gsTesting.MockResponse { + return gsTesting.NewMockResponse(gsTesting.NewT(), []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": data, + "is_success": success, + }, + "revision": 1000, + }, + }) +} + +func createTCSDriverExecuteSimple() *tcs.Driver { + mock := gsTesting.NewMockDoer(gsTesting.NewT(), + + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{ + { + map[string]interface{}{ + "path": []byte("/config/app/version"), + "value": []byte("1.0.0"), + "mod_revision": 1000, + }, + }, + }), + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{{}, {}}), + ) + + // createDriver is a function to create dummy driver. + return tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) +} + +// ExampleExecuteBasicOperations demonstrates basic Execute operations with the TCS driver. +// This example shows Put, Get, and Delete operations without predicates. +func ExampleDriver_Execute_simple() { + ctx := context.Background() + + driver := createTCSDriverExecuteSimple() + + // Example 1: Simple Put operation. + { + key := []byte("/config/app/version") + value := []byte("1.0.0") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + if err != nil { + log.Printf("Put operation failed: %v", err) + return + } + + fmt.Println("Key", string(key), "stored with value:", string(value)) + } + + // Example 2: Simple Get operation. + { + key := []byte("/config/app/version") + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + if err != nil { + log.Printf("Get operation failed: %v", err) + return + } + + if response.Succeeded && len(response.Results) > 0 { + if len(response.Results[0].Values) > 0 { + kv := response.Results[0].Values[0] + fmt.Printf("Retrieved key: %s, value: %s, version: %d\n", + string(kv.Key), string(kv.Value), kv.ModRevision) + } + } + } + + // Example 3: Simple Delete operation. + { + key := []byte("/config/app/version") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) + if err != nil { + log.Printf("Delete operation failed: %v", err) + return + } + + fmt.Println("Successfully deleted key:", string(key)) + } + + // Example 4: Multiple operations in single transaction. + { + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put([]byte("/config/app/name"), []byte("MyApp")), + operation.Put([]byte("/config/app/environment"), []byte("production")), + }, nil) + if err != nil { + log.Printf("Multi-put operation failed: %v", err) + return + } + + fmt.Println("Successfully stored", len(response.Results), "configuration items") + } + + // Output: + // Key /config/app/version stored with value: 1.0.0 + // Retrieved key: /config/app/version, value: 1.0.0, version: 1000 + // Successfully deleted key: /config/app/version + // Successfully stored 2 configuration items +} + +func createTCSDriverExecuteWithPredicates() *tcs.Driver { + mock := gsTesting.NewMockDoer(gsTesting.NewT(), + + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{ + { + map[string]interface{}{ + "path": []byte("/config/app/feature"), + "value": []byte("enabled"), + "mod_revision": 1000, + }, + }, + }), + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{{}, {}}), + newResponse(true, [][]interface{}{{}, {}}), + newResponse(false, [][]interface{}{{}, {}}), + ) + + // createDriver is a function to create dummy driver. + return tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) +} + +// ExampleDriver_Execute_WithPredicates demonstrates conditional Execute operations using predicates. +// This example shows how to use value and version predicates for conditional execution. +func ExampleDriver_Execute_with_predicates() { + ctx := context.Background() + + driver := createTCSDriverExecuteWithPredicates() + + // Example 1: Value-based conditional update. + { + key := []byte("/config/app/settings") + currentValue := []byte("old-settings") + newValue := []byte("new-settings") + + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, currentValue), + }, nil) + + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key, "old-settings"), + }, []operation.Operation{ + operation.Put(key, newValue), + }, nil) + if err != nil { + log.Printf("Conditional update failed: %v", err) + return + } + + if response.Succeeded { + fmt.Println("Conditional update succeeded - value was updated") + } else { + fmt.Println("Conditional update failed - value did not match") + } + } + + // Example 2: Version-based conditional update. + { + key := []byte("/config/app/feature") + value := []byte("enabled") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + if err != nil { + log.Printf("Initial update failed: %v", err) + return + } + + getResponse, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + if err != nil { + log.Printf("Get operation failed: %v", err) + return + } + + var currentVersion int64 + if len(getResponse.Results) > 0 && len(getResponse.Results[0].Values) > 0 { + currentVersion = getResponse.Results[0].Values[0].ModRevision + } + + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.VersionEqual(key, currentVersion), + }, []operation.Operation{ + operation.Put(key, []byte("disabled")), + }, nil) + if err != nil { + log.Printf("Version-based update failed: %v", err) + return + } + + if response.Succeeded { + fmt.Println("Version-based update succeeded - no concurrent modification") + } else { + fmt.Println("Version-based update failed - version conflict detected") + } + } + + // Example 3: Multiple predicates with Else operations. + { + key1 := []byte("/config/database/host") + key2 := []byte("/config/database/port") + + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, []byte("localhost")), + operation.Put(key2, []byte("5432")), + }, nil) + + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key1, "localhost"), + predicate.ValueEqual(key2, "5432"), + }, []operation.Operation{ + operation.Put(key1, []byte("new-host")), + operation.Put(key2, []byte("6432")), + }, []operation.Operation{ + operation.Delete(key1), + operation.Delete(key2), + }) + if err != nil { + log.Printf("Multi-predicate transaction failed: %v", err) + return + } + + if response.Succeeded { + fmt.Println("Multi-predicate transaction succeeded - values were updated") + } else { + fmt.Println("Multi-predicate transaction failed - cleanup operations executed") + } + } + + // Output: + // Conditional update succeeded - value was updated + // Version-based update succeeded - no concurrent modification + // Multi-predicate transaction failed - cleanup operations executed +} + +func createTCSDriverWatch() *tcs.Driver { + return tcs.New( + gsTesting.NewMockDoerWithWatcher( + gsTesting.NewMockDoer(gsTesting.NewT(), + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{{}}), + newResponse(true, [][]interface{}{{}}), + ), + map[string][]tarantool.WatchEvent{ + "config.storage:/config/app/status": { + tarantool.WatchEvent{ + Conn: nil, + Key: "/config/app/status", + Value: nil, + }, + }, + "config.storage:/config/database/": { + tarantool.WatchEvent{ + Conn: nil, + Key: "/config/database/host", + Value: nil, + }, + tarantool.WatchEvent{ + Conn: nil, + Key: "/config/database/port", + Value: nil, + }, + tarantool.WatchEvent{ + Conn: nil, + Key: "/config/database/host", + Value: nil, + }, + }, + "config.storage:/config/monitoring/metrics": {}, + }, + ), + ) +} + +// ExampleWatchOperations demonstrates how to use Watch for real-time change notifications. +// This example shows watching individual keys and handling watch events. +func ExampleDriver_Watch() { + ctx := context.Background() + + driver := createTCSDriverWatch() + + // Example 1: Basic watch on a single key. + { + key := []byte("/config/app/status") + + watchCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + eventCh, stopWatch, err := driver.Watch(watchCtx, key) + if err != nil { + log.Printf("Failed to start watch: %v", err) + return + } + defer stopWatch() + + fmt.Println("Watching for changes on:", string(key)) + + go func() { + time.Sleep(100 * time.Millisecond) + + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, []byte("running")), + }, nil) + }() + + select { + case event := <-eventCh: + fmt.Printf("Received watch event for key: %s\n", string(event.Prefix)) + case <-watchCtx.Done(): + fmt.Println("Watch context expired") + } + } + + // Example 2: Watch with multiple operations. + { + key := []byte("/config/database/") + + watchCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + eventCh, stopWatch, err := driver.Watch(watchCtx, key) + if err != nil { + log.Printf("Failed to start watch: %v", err) + return + } + defer stopWatch() + + fmt.Println("Watching for changes on prefix:", string(key)) + + go func() { + time.Sleep(100 * time.Millisecond) + + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put([]byte("/config/database/host"), []byte("db1")), + }, nil) + + time.Sleep(200 * time.Millisecond) + + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put([]byte("/config/database/port"), []byte("5432")), + }, nil) + + time.Sleep(300 * time.Millisecond) + + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete([]byte("/config/database/host")), + }, nil) + }() + + eventCount := 0 + for eventCount < 3 { + select { + case event := <-eventCh: + fmt.Printf("Event %d: change detected on %s\n", eventCount+1, string(event.Prefix)) + + eventCount++ + case <-watchCtx.Done(): + fmt.Println("Watch context expired") + return + } + } + } + + // Example 3: Graceful watch termination. + { + key := []byte("/config/monitoring/metrics") + + watchCtx, cancel := context.WithCancel(ctx) + + eventCh, stopWatch, err := driver.Watch(watchCtx, key) + if err != nil { + log.Printf("Failed to start watch: %v", err) + cancel() + + return + } + + fmt.Println("Started watch with manual control") + + go func() { + time.Sleep(100 * time.Millisecond) + fmt.Println("Stopping watch gracefully...") + stopWatch() + cancel() + }() + + for { + select { + case event, ok := <-eventCh: + if !ok { + return + } + + fmt.Printf("Received event: %s\n", string(event.Prefix)) + case <-watchCtx.Done(): + return + } + } + } + + // Output: + // Watching for changes on: /config/app/status + // Received watch event for key: /config/app/status + // Watching for changes on prefix: /config/database/ + // Event 1: change detected on /config/database/ + // Event 2: change detected on /config/database/ + // Event 3: change detected on /config/database/ + // Started watch with manual control + // Stopping watch gracefully... +} diff --git a/driver/tcs/integration_test.go b/driver/tcs/integration_test.go new file mode 100644 index 0000000..742558c --- /dev/null +++ b/driver/tcs/integration_test.go @@ -0,0 +1,755 @@ +package tcs_test + +import ( + "context" + "errors" + "flag" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/pool" + tcshelper "github.com/tarantool/go-tarantool/v2/test_helpers/tcs" + + "github.com/tarantool/go-storage/driver/tcs" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-storage/tx" +) + +var ( + haveTCS bool //nolint: gochecknoglobals + tcsEndpoints []string //nolint: gochecknoglobals +) + +// createTestDriver creates a TCS driver for testing. +// It skips the test if no Tarantool instance is available. +func createTestDriver(ctx context.Context, t *testing.T) (*tcs.Driver, func()) { + t.Helper() + + if !haveTCS { + t.Skip("TCS is unsupported or Tarantool isn't found") + } + + // Create connection pool. + instances := make([]pool.Instance, 0, len(tcsEndpoints)) + for i, addr := range tcsEndpoints { + instances = append(instances, pool.Instance{ + Name: string(rune('a' + i)), + Dialer: &tarantool.NetDialer{ + Address: addr, + User: "client", + Password: "secret", + RequiredProtocolInfo: tarantool.ProtocolInfo{ + Auth: 0, + Version: 0, + Features: nil, + }, + }, + Opts: tarantool.Opts{ + Timeout: 0, + Reconnect: 0, + MaxReconnects: 0, + RateLimit: 0, + RLimitAction: 0, + Concurrency: 0, + SkipSchema: false, + Notify: nil, + Handle: nil, + Logger: nil, + }, + }) + } + + conn, err := pool.Connect(ctx, instances) + require.NoError(t, err, "Failed to connect to Tarantool pool") + + // Wrap the pool connection to implement DoerWatcher. + wrapper := pool.NewConnectorAdapter(conn, pool.RW) + + return tcs.New(wrapper), func() { _ = wrapper.Close() } +} + +// cleanupTestKey deletes a test key to ensure clean state. +func cleanupTestKey(ctx context.Context, driver *tcs.Driver, key []byte) { + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) +} + +// testKey generates a unique test key to avoid conflicts between tests. +func testKey(t *testing.T, prefix string) []byte { + t.Helper() + + return []byte("/test/" + prefix + "/" + t.Name()) +} + +func TestTCSDriver_Put(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "put") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("put-test-value") + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Put operation should not return any values in response") +} + +func TestTCSDriver_Get(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "get") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("get-test-value") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "Get operation should return one result") + assert.Len(t, response.Results[0].Values, 1, "Get operation should return one value in response") + assert.Equal(t, key, response.Results[0].Values[0].Key, "Returned key should match requested key") + assert.Equal(t, value, response.Results[0].Values[0].Value, "Returned value should match stored value") + assert.Positive(t, response.Results[0].Values[0].ModRevision, "ModRevision should be greater than 0") +} + +func TestTCSDriver_Delete(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "delete") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("delete-test-value") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Equal(t, key, response.Results[0].Values[0].Key, "Returned key should match deleted key") + assert.Equal(t, value, response.Results[0].Values[0].Value, "Returned value should match deleted value") + assert.Positive(t, response.Results[0].Values[0].ModRevision, "ModRevision should be greater than 0") +} + +func TestTCSDriver_GetAfterDelete(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "get-after-delete") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("get-after-delete-test-value") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Len(t, response.Results[0].Values, 1, "Delete operation should return one value in response") + + response, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Get operation on deleted key should return empty values") +} + +func TestTCSDriver_ValueEqualPredicate(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "value-equal") + defer cleanupTestKey(ctx, driver, key) + + initialValue := []byte("initial") + updatedValue := []byte("updated") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, initialValue), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key, "initial"), + }, []operation.Operation{ + operation.Put(key, updatedValue), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when value matches") + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Put operation should not return any values in response") +} + +func TestTCSDriver_ValueNotEqualPredicate(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "value-not-equal") + defer cleanupTestKey(ctx, driver, key) + + initialValue := []byte("initial") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, initialValue), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueNotEqual(key, "different"), + }, []operation.Operation{ + operation.Put(key, []byte("new-value")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when value doesn't match") + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Put operation should not return any values in response") +} + +func TestTCSDriver_VersionEqualPredicate(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "version-equal") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("version-test") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + + initialRevision := getRevisionFromResponse(t, response, 0) + + response, err = driver.Execute(ctx, []predicate.Predicate{ + predicate.VersionEqual(key, initialRevision), + }, []operation.Operation{ + operation.Put(key, []byte("updated")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when version matches") +} + +func TestTCSDriver_VersionGreaterPredicate(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "version-greater") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("version-test") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + + initialRevision := getRevisionFromResponse(t, response, 0) + + response, err = driver.Execute(ctx, []predicate.Predicate{ + predicate.VersionGreater(key, initialRevision-1), + }, []operation.Operation{ + operation.Put(key, []byte("new-value")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when version is greater than specified version") +} + +func TestTCSDriver_MultipleKeysPut(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key1 := testKey(t, "multi-put1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "multi-put2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 2, "TX should return two results") + + for i := range response.Results { + assert.Empty(t, response.Results[i].Values, "Put operation %d should not return any values in response", i) + } +} + +func TestTCSDriver_MultiplePredicates(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key1 := testKey(t, "multi-pred1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "multi-pred2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key1, "value1"), + predicate.ValueEqual(key2, "value2"), + }, []operation.Operation{ + operation.Put(key1, []byte("updated1")), + operation.Put(key2, []byte("updated2")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Transaction with multiple predicates should succeed") + + require.Len(t, response.Results, 2, "Transaction with multiple predicates should return two results") + + for i := range response.Results { + assert.Empty(t, response.Results[i].Values, "Operation %d should not return any values in response", i) + } +} + +func TestTCSDriver_MultipleOperations(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key1 := testKey(t, "multi-op1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "multi-op2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, []byte("updated1")), + operation.Put(key2, []byte("updated2")), + operation.Get(key1), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Transaction with multiple operations should succeed") + + require.Len(t, response.Results, 3, "Transaction with multiple operations should return three results") + assert.Equal(t, key1, response.Results[2].Values[0].Key, "Get operation should return the correct key") + assert.Equal(t, []byte("updated1"), response.Results[2].Values[0].Value, + "Get operation should return the updated value") + assert.Positive(t, response.Results[2].Values[0].ModRevision, + "Get operation should return a valid mod_revision") +} + +func TestTCSDriver_ElseOperations(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key1 := testKey(t, "else-op1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "else-op2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key1, "wrong-value"), + }, []operation.Operation{ + operation.Put(key1, []byte("should-not-execute")), + }, []operation.Operation{ + operation.Delete(key1), + operation.Delete(key2), + }) + require.NoError(t, err) + assert.False(t, response.Succeeded, "Transaction should fail when predicates don't match") + + require.Len(t, response.Results, 2, "Transaction with else operations should return two results") + assert.Equal(t, key1, response.Results[0].Values[0].Key, "Delete operation should delete key1") + assert.Equal(t, key2, response.Results[1].Values[0].Key, "Delete operation should delete key2") +} + +func TestTCSDriver_WatchPutEvent(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "watch-put") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("watch-test-value") + + watchCtx, cancel := context.WithTimeout(ctx, defaultWaitTimeout) + defer cancel() + + eventCh, stopWatch, err := driver.Watch(watchCtx, key) + require.NoError(t, err) + + defer stopWatch() + + time.Sleep(500 * time.Millisecond) + + _, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + select { + case event := <-eventCh: + assert.Equal(t, key, event.Prefix) + case <-watchCtx.Done(): + t.Fatal("Timeout waiting for put event") + } +} + +func TestTCSDriver_WatchDeleteEvent(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := testKey(t, "watch-delete") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("watch-test-value") + + watchCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + eventCh, stopWatch, err := driver.Watch(watchCtx, key) + require.NoError(t, err) + + defer stopWatch() + + time.Sleep(500 * time.Millisecond) + + _, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + select { + case <-eventCh: + case <-watchCtx.Done(): + t.Fatal("Timeout waiting for initial put event") + } + + _, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) + require.NoError(t, err) + + select { + case event := <-eventCh: + assert.Equal(t, key, event.Prefix) + case <-watchCtx.Done(): + t.Fatal("Timeout waiting for delete event") + } +} + +func TestTCSDriver_GetByPrefix(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + basePrefix := "/test/prefix-test/" + t.Name() + "/" + + key1 := []byte(basePrefix + "key1") + key2 := []byte(basePrefix + "key2") + key3 := []byte(basePrefix + "key3") + key4 := []byte("/test/other-prefix/other-key") + + defer cleanupTestKey(ctx, driver, key1) + defer cleanupTestKey(ctx, driver, key2) + defer cleanupTestKey(ctx, driver, key3) + defer cleanupTestKey(ctx, driver, key4) + + value1 := []byte("prefix-value1") + value2 := []byte("prefix-value2") + value3 := []byte("prefix-value3") + value4 := []byte("other-value") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + operation.Put(key3, value3), + operation.Put(key4, value4), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get([]byte(basePrefix)), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "Get operation should return one result") + + assert.Len(t, response.Results[0].Values, 3, "Get by prefix should return three values matching the prefix") + + for _, kv := range response.Results[0].Values { + assert.True(t, strings.HasPrefix(string(kv.Key), basePrefix), + "Returned key %s should have prefix %s", string(kv.Key), basePrefix) + assert.Positive(t, kv.ModRevision, "ModRevision should be greater than 0") + } + + foundKeys := make(map[string][]byte) + for _, kv := range response.Results[0].Values { + foundKeys[string(kv.Key)] = kv.Value + } + + assert.Equal(t, value1, foundKeys[string(key1)], "Should find value1 for key1") + assert.Equal(t, value2, foundKeys[string(key2)], "Should find value2 for key2") + assert.Equal(t, value3, foundKeys[string(key3)], "Should find value3 for key3") + + _, exists := foundKeys[string(key4)] + assert.False(t, exists, "Key with different prefix should not be included in results") +} + +func TestTCSDriver_GetNonExistentKey(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + key := []byte("/test/nonexistent/key") + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err, "Get operation should not fail for non-existent key") + assert.True(t, response.Succeeded, "Get operation should succeed") +} + +func TestTCSDriver_ErrConnect(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + instances := []pool.Instance{ + { + Name: "a", + Dialer: &tarantool.NetDialer{ + Address: "10.0.0.1:65534", + User: "client", + Password: "secret", + RequiredProtocolInfo: tarantool.ProtocolInfo{ + Auth: 0, + Version: 0, + Features: nil, + }, + }, + Opts: tarantool.Opts{ + Timeout: 10 * time.Millisecond, + Reconnect: 1, + MaxReconnects: 1, + RateLimit: 0, + RLimitAction: 0, + Concurrency: 0, + SkipSchema: false, + Notify: nil, + Handle: nil, + Logger: nil, + }, + }, + } + + conn, err := pool.Connect(ctx, instances) + require.NoError(t, err, "failed to connect to Tarantool pool") + + wrapper := pool.NewConnectorAdapter(conn, pool.RW) + + defer func() { _ = wrapper.Close() }() + + driver := tcs.New(wrapper) + require.NotEmpty(t, driver) + + key := []byte("/test/nonexistent/key") + + _, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.Error(t, err, "Get operation should fail with connection failed") + assert.ErrorIs(t, err, pool.ErrNoRwInstance) +} + +func TestTCSDriver_GetRoot(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + driver, done := createTestDriver(ctx, t) + defer done() + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get([]byte("/")), + }, nil) + + require.NoError(t, err, "Get root operation should not fail") + assert.True(t, response.Succeeded, "Get root operation should succeed") +} + +// getRevisionFromResponse extracts revision from transaction response. +// This extracts the revision from the tx.Response structure. +func getRevisionFromResponse(t *testing.T, response tx.Response, position int) int64 { + t.Helper() + + require.Greater(t, len(response.Results), position, "expected at least %d results", position+1) + require.NotEmpty(t, response.Results[position].Values, "expected at least %d values", position+1) + + return response.Results[position].Values[0].ModRevision +} + +func TestMain(m *testing.M) { + flag.Parse() + + tcsInstance, err := tcshelper.Start(0) + switch { + case errors.Is(err, tcshelper.ErrNotSupported): + fmt.Println("TcS is not supported:", err) //nolint:forbidigo + case err != nil: + fmt.Println("Failed to start TCS:", err) //nolint:forbidigo + default: + haveTCS = true + tcsEndpoints = tcsInstance.Endpoints() + } + + os.Exit(func() int { + defer func() { + if haveTCS { + tcsInstance.Stop() + } + }() + + return m.Run() + }()) +} diff --git a/driver/tcs/operations.go b/driver/tcs/operations.go new file mode 100644 index 0000000..c0140fd --- /dev/null +++ b/driver/tcs/operations.go @@ -0,0 +1,104 @@ +package tcs + +import ( + "errors" + + "github.com/vmihailenco/msgpack/v5" + + goOperation "github.com/tarantool/go-storage/operation" +) + +// Error definitions for err113 compliance. +var ( + // ErrUnknownOperation is returned when the operation is unknown. + ErrUnknownOperation = errors.New("unknown operation") +) + +const ( + // putOperationArrayLen is the length of the array that is used to encode a put operation. + putOperationArrayLen = 3 + // otherOperationArrayLen is the length of the array that is used to encode a delete operation. + otherOperationArrayLen = 2 +) + +var ( + //nolint: gochecknoglobals + ops = map[goOperation.Type]string{ + goOperation.TypeGet: "get", + goOperation.TypePut: "put", + goOperation.TypeDelete: "delete", + } +) + +// getOperation returns the TCS operation string for an operation type. +func getOperation(opType goOperation.Type) (string, bool) { + result, ok := ops[opType] + return result, ok +} + +type operation struct { + goOperation.Operation +} + +// newOperations returns a slice of TCS operations from a slice of operations. +func newOperations(inOperations []goOperation.Operation) []operation { + outOperations := make([]operation, 0, len(inOperations)) + for _, o := range inOperations { + outOperations = append(outOperations, operation{o}) + } + + return outOperations +} + +func (o operation) EncodeMsgpack(encoder *msgpack.Encoder) error { + op, ok := getOperation(o.Type()) //nolint:varnamelen + if !ok { + return ErrUnknownOperation + } + + switch { + case o.Type() == goOperation.TypePut: + err := encoder.EncodeArrayLen(putOperationArrayLen) + if err != nil { + return NewOperationEncodingError("encode put operation array length", err) + } + + err = encoder.EncodeString(op) + if err != nil { + return NewOperationEncodingError("encode put operation", err) + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(o.Key())) + if err != nil { + return NewOperationEncodingError("encode put operation key", err) + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(o.Value())) + if err != nil { + return NewOperationEncodingError("encode put operation value", err) + } + default: + err := encoder.EncodeArrayLen(otherOperationArrayLen) + if err != nil { + return NewOperationEncodingError("encode operation array length", err) + } + + err = encoder.EncodeString(op) + if err != nil { + return NewOperationEncodingError("encode operation", err) + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(o.Key())) + if err != nil { + return NewOperationEncodingError("encode operation key", err) + } + } + + return nil +} diff --git a/driver/tcs/operations_test.go b/driver/tcs/operations_test.go new file mode 100644 index 0000000..b245749 --- /dev/null +++ b/driver/tcs/operations_test.go @@ -0,0 +1,131 @@ +//nolint:testpackage +package tcs + +import ( + "bytes" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tarantool/go-iproto" + "github.com/vmihailenco/msgpack/v5" + + goOperation "github.com/tarantool/go-storage/operation" +) + +func logMsgpackAsJSONConvert(t *testing.T, data []byte) { + t.Helper() + + var decodedMsgpack map[int]interface{} + + decoder := msgpack.NewDecoder(bytes.NewReader(data)) + require.NoError(t, decoder.Decode(&decodedMsgpack)) + + decodedConvertedMsgpack := map[string]interface{}{} + for k, v := range decodedMsgpack { + decodedConvertedMsgpack[fmt.Sprintf("%s[%d]", iproto.Key(k).String(), k)] = v + } + + encodedJSON, err := json.MarshalIndent(decodedConvertedMsgpack, "", " ") + require.NoError(t, err, "failed to convert msgpack to json") + + for _, line := range bytes.Split(encodedJSON, []byte("\n")) { + t.Log(string(line)) + } +} + +func compareGoldenMsgpackAndPrintDiff(t *testing.T, expected []byte, got []byte) { + t.Helper() + + if assert.Equal(t, expected, got, "golden file content is not equal to actual") { + return + } + + t.Logf("expected:\n") + logMsgpackAsJSONConvert(t, expected) + t.Logf("actual:\n") + logMsgpackAsJSONConvert(t, got) +} + +func TestOperation_EncodeMsgpack(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + operation goOperation.Operation + expectError bool + expected []byte + }{ + { + name: "put operation with key and value", + operation: goOperation.Put([]byte("test-key"), []byte("test-value")), + expectError: false, + expected: []byte{ + 0x93, 0xa3, 0x70, 0x75, 0x74, 0xa8, 0x74, 0x65, 0x73, 0x74, + 0x2d, 0x6b, 0x65, 0x79, 0xaa, 0x74, 0x65, 0x73, 0x74, 0x2d, + 0x76, 0x61, 0x6c, 0x75, 0x65, + }, + }, + { + name: "get operation with key", + operation: goOperation.Get([]byte("test-key")), + expectError: false, + expected: []byte{ + 0x92, 0xa3, 0x67, 0x65, 0x74, 0xa8, 0x74, 0x65, 0x73, 0x74, + 0x2d, 0x6b, 0x65, 0x79, + }, + }, + { + name: "delete operation with key", + operation: goOperation.Delete([]byte("test-key")), + expectError: false, + expected: []byte{ + 0x92, 0xa6, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0xa8, 0x74, + 0x65, 0x73, 0x74, 0x2d, 0x6b, 0x65, 0x79, + }, + }, + { + name: "put operation with empty key and value", + operation: goOperation.Put([]byte(""), []byte("")), + expectError: false, + expected: []byte{0x93, 0xa3, 0x70, 0x75, 0x74, 0xa0, 0xa0}, + }, + { + name: "get operation with empty key", + operation: goOperation.Get([]byte("")), + expectError: false, + expected: []byte{0x92, 0xa3, 0x67, 0x65, 0x74, 0xa0}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Create the TCS operation wrapper. + tcsOp := operation{Operation: tt.operation} + + // Encode the operation. + var buf bytes.Buffer + + encoder := msgpack.NewEncoder(&buf) + + err := tcsOp.EncodeMsgpack(encoder) + + if tt.expectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + + // Get the encoded bytes. + encodedBytes := buf.Bytes() + + // Use the golden file comparison function. + compareGoldenMsgpackAndPrintDiff(t, tt.expected, encodedBytes) + }) + } +} diff --git a/driver/tcs/predicate.go b/driver/tcs/predicate.go new file mode 100644 index 0000000..4566e94 --- /dev/null +++ b/driver/tcs/predicate.go @@ -0,0 +1,102 @@ +package tcs + +import ( + "errors" + + "github.com/vmihailenco/msgpack/v5" + + goPredicate "github.com/tarantool/go-storage/predicate" +) + +var ( + // ErrUnknownOperator is returned when the operator is unknown. + ErrUnknownOperator = errors.New("unknown operator") + // ErrUnknownTarget is returned when the target is unknown. + ErrUnknownTarget = errors.New("unknown target") + + _ msgpack.CustomEncoder = predicate{Predicate: nil} + + //nolint: gochecknoglobals + operators = map[goPredicate.Op]string{ + goPredicate.OpEqual: "==", + goPredicate.OpNotEqual: "!=", + goPredicate.OpGreater: ">", + goPredicate.OpLess: "<", + } + + //nolint: gochecknoglobals + targets = map[goPredicate.Target]string{ + goPredicate.TargetValue: "value", + goPredicate.TargetVersion: "mod_revision", + } +) + +// getOperator returns the TCS operator string for a predicate operation. +func getOperator(op goPredicate.Op) (string, bool) { + result, ok := operators[op] + return result, ok +} + +// getTarget returns the TCS target string for a predicate target. +func getTarget(target goPredicate.Target) (string, bool) { + result, ok := targets[target] + return result, ok +} + +type predicate struct { + goPredicate.Predicate +} + +func newPredicates(inPredicates []goPredicate.Predicate) []predicate { + outPredicates := make([]predicate, 0, len(inPredicates)) + for _, p := range inPredicates { + outPredicates = append(outPredicates, predicate{p}) + } + + return outPredicates +} + +const ( + defaultPredicateArrayLen = 4 +) + +func (p predicate) EncodeMsgpack(encoder *msgpack.Encoder) error { + op, ok := getOperator(p.Operation()) //nolint:varnamelen + if !ok { + return ErrUnknownOperator + } + + target, ok := getTarget(p.Target()) + if !ok { + return ErrUnknownTarget + } + + err := encoder.EncodeArrayLen(defaultPredicateArrayLen) + if err != nil { + return NewPredicateEncodingError("encode array length", err) + } + + err = encoder.EncodeString(target) + if err != nil { + return NewPredicateEncodingError("encode target", err) + } + + err = encoder.EncodeString(op) + if err != nil { + return NewPredicateEncodingError("encode operator", err) + } + + err = encoder.Encode(p.Value()) + if err != nil { + return NewPredicateEncodingError("encode value", err) + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(p.Key())) + if err != nil { + return NewPredicateEncodingError("encode key", err) + } + + return nil +} diff --git a/driver/tcs/predicate_test.go b/driver/tcs/predicate_test.go new file mode 100644 index 0000000..e80f0d3 --- /dev/null +++ b/driver/tcs/predicate_test.go @@ -0,0 +1,225 @@ +//nolint:testpackage +package tcs + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack/v5" + + goPredicate "github.com/tarantool/go-storage/predicate" +) + +func TestPredicate_EncodeMsgpack(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + predicate goPredicate.Predicate + expectError bool + expected []byte + }{ + { + name: "value equal predicate with string value", + predicate: goPredicate.ValueEqual([]byte("test-key"), "test-value"), + expectError: false, + expected: []byte{ + 0x94, 0xa5, 0x76, 0x61, 0x6c, 0x75, 0x65, 0xa2, 0x3d, 0x3d, + 0xaa, 0x74, 0x65, 0x73, 0x74, 0x2d, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0xa8, 0x74, 0x65, 0x73, 0x74, 0x2d, 0x6b, 0x65, 0x79, + }, + }, + { + name: "value not equal predicate with int value", + predicate: goPredicate.ValueNotEqual([]byte("test-key"), 42), + expectError: false, + expected: []byte{ + 0x94, 0xa5, 0x76, 0x61, 0x6c, 0x75, 0x65, 0xa2, 0x21, 0x3d, + 0x2a, 0xa8, 0x74, 0x65, 0x73, 0x74, 0x2d, 0x6b, 0x65, 0x79, + }, + }, + { + name: "version equal predicate", + predicate: goPredicate.VersionEqual([]byte("test-key"), int64(123)), + expectError: false, + expected: []byte{ + 0x94, 0xac, 0x6d, 0x6f, 0x64, 0x5f, 0x72, 0x65, 0x76, 0x69, + 0x73, 0x69, 0x6f, 0x6e, 0xa2, 0x3d, 0x3d, 0xd3, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x7b, 0xa8, 0x74, 0x65, 0x73, 0x74, + 0x2d, 0x6b, 0x65, 0x79, + }, + }, + { + name: "version not equal predicate", + predicate: goPredicate.VersionNotEqual([]byte("test-key"), int64(456)), + expectError: false, + expected: []byte{ + 0x94, 0xac, 0x6d, 0x6f, 0x64, 0x5f, 0x72, 0x65, 0x76, 0x69, + 0x73, 0x69, 0x6f, 0x6e, 0xa2, 0x21, 0x3d, 0xd3, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x1, 0xc8, 0xa8, 0x74, 0x65, 0x73, 0x74, + 0x2d, 0x6b, 0x65, 0x79, + }, + }, + { + name: "version greater predicate", + predicate: goPredicate.VersionGreater([]byte("test-key"), int64(789)), + expectError: false, + expected: []byte{ + 0x94, 0xac, 0x6d, 0x6f, 0x64, 0x5f, 0x72, 0x65, 0x76, 0x69, + 0x73, 0x69, 0x6f, 0x6e, 0xa1, 0x3e, 0xd3, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x3, 0x15, 0xa8, 0x74, 0x65, 0x73, 0x74, 0x2d, 0x6b, + 0x65, 0x79, + }, + }, + { + name: "version less predicate", + predicate: goPredicate.VersionLess([]byte("test-key"), int64(1000)), + expectError: false, + expected: []byte{ + 0x94, 0xac, 0x6d, 0x6f, 0x64, 0x5f, 0x72, 0x65, 0x76, 0x69, 0x73, + 0x69, 0x6f, 0x6e, 0xa1, 0x3c, 0xd3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x3, 0xe8, 0xa8, 0x74, 0x65, 0x73, 0x74, 0x2d, 0x6b, 0x65, 0x79, + }, + }, + { + name: "value equal predicate with empty key", + predicate: goPredicate.ValueEqual([]byte(""), "empty-key-value"), + expectError: false, + expected: []byte{ + 0x94, 0xa5, 0x76, 0x61, 0x6c, 0x75, 0x65, 0xa2, 0x3d, 0x3d, 0xaf, + 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2d, 0x6b, 0x65, 0x79, 0x2d, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0xa0, + }, + }, + { + name: "value equal predicate with nil value", + predicate: goPredicate.ValueEqual([]byte("test-key"), nil), + expectError: false, + expected: []byte{ + 0x94, 0xa5, 0x76, 0x61, 0x6c, 0x75, 0x65, 0xa2, 0x3d, 0x3d, 0xc0, + 0xa8, 0x74, 0x65, 0x73, 0x74, 0x2d, 0x6b, 0x65, 0x79, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Create the TCS predicate wrapper. + tcsPredicate := predicate{Predicate: tt.predicate} + + // Encode the predicate. + var buf bytes.Buffer + + encoder := msgpack.NewEncoder(&buf) + + err := tcsPredicate.EncodeMsgpack(encoder) + + if tt.expectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + + // Get the encoded bytes. + encodedBytes := buf.Bytes() + + // Use the golden file comparison function. + compareGoldenMsgpackAndPrintDiff(t, tt.expected, encodedBytes) + }) + } +} + +func TestPredicate_EncodeMsgpack_UnknownOperator(t *testing.T) { + t.Parallel() + + // Create a predicate with an unknown operator. + // We need to create a custom predicate since the public API doesn't allow invalid operators. + unknownOpPredicate := &struct { + key []byte + op goPredicate.Op + target goPredicate.Target + value interface{} + }{ + key: []byte("test-key"), + op: goPredicate.Op(999), // Invalid operator. + target: goPredicate.TargetValue, + value: "test-value", + } + + // Override the methods to return our custom values. + // This is a bit hacky but necessary to test the error case. + var buf bytes.Buffer + + encoder := msgpack.NewEncoder(&buf) + + // We'll test the error case by directly calling the encoding function + // with a predicate that has invalid operator. + tcsPredicate := predicate{ + Predicate: &mockPredicate{ + key: unknownOpPredicate.key, + operation: unknownOpPredicate.op, + target: unknownOpPredicate.target, + value: unknownOpPredicate.value, + }, + } + + err := tcsPredicate.EncodeMsgpack(encoder) + + require.Error(t, err) + assert.ErrorIs(t, err, ErrUnknownOperator) +} + +func TestPredicate_EncodeMsgpack_UnknownTarget(t *testing.T) { + t.Parallel() + + // Create a predicate with an unknown target. + unknownTargetPredicate := &mockPredicate{ + key: []byte("test-key"), + operation: goPredicate.OpEqual, + target: goPredicate.Target(999), // Invalid target. + value: "test-value", + } + + tcsPredicate := predicate{ + Predicate: unknownTargetPredicate, + } + + var buf bytes.Buffer + + encoder := msgpack.NewEncoder(&buf) + + err := tcsPredicate.EncodeMsgpack(encoder) + + require.Error(t, err) + assert.ErrorIs(t, err, ErrUnknownTarget) +} + +// mockPredicate is a helper struct to test error cases +// by allowing us to set invalid operator and target values. +type mockPredicate struct { + key []byte + operation goPredicate.Op + target goPredicate.Target + value interface{} +} + +func (m *mockPredicate) Key() []byte { + return m.key +} + +func (m *mockPredicate) Operation() goPredicate.Op { + return m.operation +} + +func (m *mockPredicate) Target() goPredicate.Target { + return m.target +} + +func (m *mockPredicate) Value() interface{} { + return m.value +} diff --git a/driver/tcs/tcs.go b/driver/tcs/tcs.go index 2bab56c..0a213b9 100644 --- a/driver/tcs/tcs.go +++ b/driver/tcs/tcs.go @@ -1,84 +1,108 @@ -// Package tcs provides a Tarantool Cartridge storage driver implementation. +// Package tcs provides a Tarantool config storage driver implementation. // It enables using Tarantool as a distributed key-value storage backend. package tcs import ( "context" + "errors" "fmt" + "sync" "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-tarantool/v2/pool" "github.com/tarantool/go-storage/driver" - "github.com/tarantool/go-storage/operation" - "github.com/tarantool/go-storage/predicate" + goOperation "github.com/tarantool/go-storage/operation" + goPredicate "github.com/tarantool/go-storage/predicate" "github.com/tarantool/go-storage/tx" "github.com/tarantool/go-storage/watch" ) +// DoerWatcher is an interface that combines tarantool.Doer and NewWatcher method. +// tarantool.Connection and pool.ConnectionAdapter implement this interface. +type DoerWatcher interface { + Do(req tarantool.Request) (fut *tarantool.Future) + NewWatcher(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) +} + // Driver is a Tarantool implementation of the storage driver interface. // It uses TCS as the underlying key-value storage backend. type Driver struct { - conn *pool.ConnectionPool // Tarantool connection pool. + conn DoerWatcher // Tarantool connection pool. } var ( _ driver.Driver = &Driver{} //nolint:exhaustruct + + // ErrUnexpectedResponse is returned when the response from tarantool has unexpected format. + ErrUnexpectedResponse = errors.New("unexpected response from tarantool") ) // New creates a new Tarantool driver instance. // It establishes connections to Tarantool instances using the provided addresses. -func New(ctx context.Context, addrs []string) (*Driver, error) { - instances := make([]pool.Instance, 0, len(addrs)) - for i, addr := range addrs { - instances = append(instances, pool.Instance{ - Name: fmt.Sprintf("instance-%d", i), - Dialer: &tarantool.NetDialer{ - Address: addr, - User: "user", - Password: "password", - RequiredProtocolInfo: tarantool.ProtocolInfo{ - Auth: tarantool.AutoAuth, - Version: tarantool.ProtocolVersion(0), - Features: nil, - }, - }, - Opts: tarantool.Opts{ - Timeout: 0, - Reconnect: 0, - MaxReconnects: 0, - RateLimit: 0, - RLimitAction: tarantool.RLimitAction(0), - Concurrency: 0, - SkipSchema: false, - Notify: nil, - Handle: nil, - Logger: nil, - }, - }) - } - - conn, err := pool.Connect(ctx, instances) - if err != nil { - return nil, fmt.Errorf("failed to connect to tarantool pool: %w", err) - } - - return &Driver{conn: conn}, nil +func New(doer DoerWatcher) *Driver { + return &Driver{conn: doer} } // Execute executes a transactional operation with conditional logic. // It processes predicates to determine whether to execute thenOps or elseOps. func (d Driver) Execute( - _ context.Context, - _ []predicate.Predicate, - _ []operation.Operation, - _ []operation.Operation, + ctx context.Context, + predicates []goPredicate.Predicate, + thenOps []goOperation.Operation, + elseOps []goOperation.Operation, ) (tx.Response, error) { - panic("implement me") + txnArg := newTxnRequest(predicates, thenOps, elseOps) + + req := tarantool.NewCallRequest("config.storage.txn"). + Args([]any{txnArg}).Context(ctx) + + var result []txnResponse + + switch err := d.conn.Do(req).GetTyped(&result); { + case err != nil: + return tx.Response{}, fmt.Errorf("failed to execute transaction: %w", err) + case len(result) != 1: + return tx.Response{}, fmt.Errorf("%w: expected 1 response, got %d", ErrUnexpectedResponse, len(result)) + } + + return result[0].asTxnResponse(), nil } // Watch monitors changes to a specific key and returns a stream of events. // It supports optional watch configuration through the opts parameter. -func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) <-chan watch.Event { - panic("implement me") +// To watch for config storage key "config.storage:" prefix should be used. +func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) { + rvChan := make(chan watch.Event, 1) + + watcher, err := d.conn.NewWatcher("config.storage:"+string(key), func(_ tarantool.WatchEvent) { + select { + case rvChan <- watch.Event{Prefix: key}: + default: + } + }) + if err != nil { + close(rvChan) + return nil, nil, fmt.Errorf("failed to create watcher: %w", err) + } + + var ( + isStoppedOnce = sync.Once{} + isStopped = make(chan struct{}) + ) + + go func() { + defer func() { + // When watcher.Unregister() will finish it's execution - means watcher won't call any more callbacks, + // that will write messages to rvChan, so we can close it. + watcher.Unregister() + close(rvChan) + }() + + select { + case <-ctx.Done(): + case <-isStopped: + } + }() + + return rvChan, func() { isStoppedOnce.Do(func() { close(isStopped) }) }, nil } diff --git a/driver/tcs/tcs_test.go b/driver/tcs/tcs_test.go new file mode 100644 index 0000000..bc0273b --- /dev/null +++ b/driver/tcs/tcs_test.go @@ -0,0 +1,582 @@ +// Package tcs_test provides unit tests for the TCS driver implementation. +// It uses mocks to test the driver without requiring a real Tarantool connection. +package tcs_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/gojuno/minimock/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2" + + "github.com/tarantool/go-storage/driver/tcs" + "github.com/tarantool/go-storage/internal/mocks" + gsTesting "github.com/tarantool/go-storage/internal/testing" + goOperation "github.com/tarantool/go-storage/operation" + goPredicate "github.com/tarantool/go-storage/predicate" +) + +const ( + defaultUnregisterTimeout = 10 * time.Second + defaultWaitTimeout = 10 * time.Second +) + +func TestNew(t *testing.T) { + t.Parallel() + + mockDoer := mocks.NewDoerWatcherMock(t) + driver := tcs.New(mockDoer) + + assert.NotNil(t, driver) +} + +func TestDriver_Watch_WatcherSuccess(t *testing.T) { + t.Parallel() + + mc := minimock.NewController(t) + + mockDoer := mocks.NewDoerWatcherMock(mc) + watcher := mocks.NewWatcherMock(mc).UnregisterMock.Expect().Times(1).Return() + + mockDoer.NewWatcherMock.Set(func(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + assert.Equal(t, "config.storage:test-key", key) + assert.NotNil(t, callback) + + return watcher, nil + }) + + driver := tcs.New(mockDoer) + + ctx := context.Background() + key := []byte("test-key") + + events, cleanup, err := driver.Watch(ctx, key) + + require.NoError(t, err) + assert.NotNil(t, events) + assert.NotNil(t, cleanup) + + cleanup() + + // We should wait for the watcher to be unregistered in separate goroutine. + // Without testing internals of this library - best way is to simply sleep a little here. + mc.Wait(defaultUnregisterTimeout) +} + +func TestDriver_Watch_WatcherError(t *testing.T) { + t.Parallel() + + var ( + mc = minimock.NewController(t) + mockDoer = mocks.NewDoerWatcherMock(mc) + watcherErr = errors.New("watcher creation failed") + ) + + mockDoer.NewWatcherMock.Set(func(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + assert.Equal(t, "config.storage:test-key", key) + assert.NotNil(t, callback) + + return nil, watcherErr + }) + + driver := tcs.New(mockDoer) + + ctx := context.Background() + key := []byte("test-key") + + events, cleanup, err := driver.Watch(ctx, key) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to create watcher") + assert.Contains(t, err.Error(), "watcher creation failed") + assert.Nil(t, events) + assert.Nil(t, cleanup) +} + +func TestDriver_Watch_ContextCanceled_Before(t *testing.T) { + t.Parallel() + + mc := minimock.NewController(t) + + mockDoer := mocks.NewDoerWatcherMock(mc) + watcher := mocks.NewWatcherMock(mc).UnregisterMock.Expect().Times(1).Return() + + mockDoer.NewWatcherMock.Set(func(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + assert.Equal(t, "config.storage:test-key", key) + assert.NotNil(t, callback) + + return watcher, nil + }) + + driver := tcs.New(mockDoer) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + key := []byte("test-key") + + events, cleanup, err := driver.Watch(ctx, key) + + require.NoError(t, err) + assert.NotNil(t, events) + assert.NotNil(t, cleanup) + + mc.Wait(defaultUnregisterTimeout) + + require.NotPanics(t, func() { cleanup() }) +} + +func TestDriver_Watch_ContextCanceled_After(t *testing.T) { + t.Parallel() + + mc := minimock.NewController(t) + + mockDoer := mocks.NewDoerWatcherMock(mc) + watcher := mocks.NewWatcherMock(mc).UnregisterMock.Expect().Times(1).Return() + + mockDoer.NewWatcherMock.Set(func(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + assert.Equal(t, "config.storage:test-key", key) + assert.NotNil(t, callback) + + return watcher, nil + }) + + driver := tcs.New(mockDoer) + + ctx, cancel := context.WithCancel(context.Background()) + + key := []byte("test-key") + + events, cleanup, err := driver.Watch(ctx, key) + + require.NoError(t, err) + assert.NotNil(t, events) + assert.NotNil(t, cleanup) + + cancel() + + mc.Wait(defaultUnregisterTimeout) + + require.NotPanics(t, func() { cleanup() }) +} + +func TestDriver_Watch_CallbackOnceCalled(t *testing.T) { + t.Parallel() + + mc := minimock.NewController(t) + + mockDoer := mocks.NewDoerWatcherMock(mc) + watcher := mocks.NewWatcherMock(mc).UnregisterMock.Expect().Times(1).Return() + + mockDoer.NewWatcherMock.Set(func(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + assert.Equal(t, "config.storage:test-key", key) + assert.NotNil(t, callback) + + callback(tarantool.WatchEvent{ + Conn: nil, + Key: "config.storage:test-key", + Value: []byte("test-value"), + }) + + return watcher, nil + }) + + driver := tcs.New(mockDoer) + + ctx := context.Background() + key := []byte("test-key") + + events, cleanup, err := driver.Watch(ctx, key) + + require.NoError(t, err) + assert.NotNil(t, events) + assert.NotNil(t, cleanup) + + select { + case val, ok := <-events: + require.True(t, ok) + assert.Equal(t, []byte("test-key"), val.Prefix) + case <-time.After(defaultWaitTimeout): + assert.Fail(t, "timeout") + } + + cleanup() + mc.Wait(defaultUnregisterTimeout) +} + +func TestDriver_Watch_CallbackTwiceCalled(t *testing.T) { + t.Parallel() + + mc := minimock.NewController(t) + + mockDoer := mocks.NewDoerWatcherMock(mc) + watcher := mocks.NewWatcherMock(mc).UnregisterMock.Expect().Times(1).Return() + + mockDoer.NewWatcherMock.Set(func(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + assert.Equal(t, "config.storage:test-key", key) + assert.NotNil(t, callback) + + callback(tarantool.WatchEvent{ + Conn: nil, + Key: "config.storage:test-key", + Value: []byte("test-value-1"), + }) + + callback(tarantool.WatchEvent{ + Conn: nil, + Key: "config.storage:test-key", + Value: []byte("test-value-2"), + }) + + return watcher, nil + }) + + driver := tcs.New(mockDoer) + + ctx := context.Background() + key := []byte("test-key") + + events, cleanup, err := driver.Watch(ctx, key) + + require.NoError(t, err) + assert.NotNil(t, events) + assert.NotNil(t, cleanup) + + select { + case val, ok := <-events: + require.True(t, ok) + assert.Equal(t, []byte("test-key"), val.Prefix) + case <-time.After(defaultWaitTimeout): + assert.Fail(t, "timeout") + } + + select { + case val, ok := <-events: + require.True(t, ok) + assert.Fail(t, "must be empty", ": %v, %v", val, ok) + default: + } + + cleanup() + mc.Wait(defaultUnregisterTimeout) +} + +func TestDriver_Execute_Get_Empty(t *testing.T) { + t.Parallel() + + data := []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{{}}, + "is_success": true, + }, + "revision": 1000, + }, + } + mock := gsTesting.NewMockDoer(t, + gsTesting.NewMockResponse(t, data), + ) + + driver := tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) + + var ( + ctx = context.Background() + predicates []goPredicate.Predicate + elseOps []goOperation.Operation + + ifOps = []goOperation.Operation{ + goOperation.Get([]byte("/123")), + } + ) + + resp, err := driver.Execute(ctx, predicates, ifOps, elseOps) + require.NoError(t, err) + require.True(t, resp.Succeeded) + require.Len(t, resp.Results, 1) + require.Empty(t, resp.Results[0].Values) +} + +func TestDriver_Execute_Get_NonEmpty(t *testing.T) { + t.Parallel() + + data := []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{{ + map[string]interface{}{ + "path": []byte("/123"), + "value": []byte("123"), + "mod_revision": 1000, + }, + }}, + "is_success": true, + }, + "revision": 1000, + }, + } + mock := gsTesting.NewMockDoer(t, + gsTesting.NewMockResponse(t, data), + ) + + driver := tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) + + var ( + ctx = context.Background() + predicates []goPredicate.Predicate + elseOps []goOperation.Operation + + ifOps = []goOperation.Operation{ + goOperation.Get([]byte("/123")), + } + ) + + resp, err := driver.Execute(ctx, predicates, ifOps, elseOps) + require.NoError(t, err) + require.True(t, resp.Succeeded) + require.Len(t, resp.Results, 1) + require.Len(t, resp.Results[0].Values, 1) + require.Equal(t, []byte("/123"), resp.Results[0].Values[0].Key) + require.Equal(t, []byte("123"), resp.Results[0].Values[0].Value) + require.Equal(t, int64(1000), resp.Results[0].Values[0].ModRevision) +} + +func TestDriver_Execute_Get_PrefixMulti(t *testing.T) { + t.Parallel() + + data := []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{ + {}, + { + map[string]interface{}{ + "path": []byte("/123/1"), + "value": []byte("124"), + "mod_revision": 1000, + }, + map[string]interface{}{ + "path": []byte("/123/2"), + "value": []byte("125"), + "mod_revision": 900, + }, + }, + { + map[string]interface{}{ + "path": []byte("/120"), + "value": []byte("121"), + "mod_revision": 800, + }, + }, + {}, + }, + "is_success": true, + }, + "revision": 1000, + }, + } + mock := gsTesting.NewMockDoer(t, + gsTesting.NewMockResponse(t, data), + ) + + driver := tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) + + var ( + ctx = context.Background() + predicates []goPredicate.Predicate + elseOps []goOperation.Operation + + ifOps = []goOperation.Operation{ + goOperation.Get([]byte("/122/")), + goOperation.Get([]byte("/123/")), + goOperation.Get([]byte("/120")), + goOperation.Get([]byte("/121")), + } + ) + + resp, err := driver.Execute(ctx, predicates, ifOps, elseOps) + require.NoError(t, err) + require.True(t, resp.Succeeded) + require.Len(t, resp.Results, 4) + require.Empty(t, resp.Results[0].Values) + require.Len(t, resp.Results[1].Values, 2) + require.Len(t, resp.Results[2].Values, 1) + require.Empty(t, resp.Results[3].Values) + + assert.Equal(t, []byte("/123/1"), resp.Results[1].Values[0].Key) + assert.Equal(t, []byte("124"), resp.Results[1].Values[0].Value) + assert.Equal(t, int64(1000), resp.Results[1].Values[0].ModRevision) + + assert.Equal(t, []byte("/123/2"), resp.Results[1].Values[1].Key) + assert.Equal(t, []byte("125"), resp.Results[1].Values[1].Value) + assert.Equal(t, int64(900), resp.Results[1].Values[1].ModRevision) + + assert.Equal(t, []byte("/120"), resp.Results[2].Values[0].Key) + assert.Equal(t, []byte("121"), resp.Results[2].Values[0].Value) + assert.Equal(t, int64(800), resp.Results[2].Values[0].ModRevision) +} + +func TestDriver_Execute_Delete(t *testing.T) { + t.Parallel() + + data := []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{{}}, + "is_success": true, + }, + "revision": 1000, + }, + } + mock := gsTesting.NewMockDoer(t, + gsTesting.NewMockResponse(t, data), + ) + + driver := tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) + + var ( + ctx = context.Background() + predicates []goPredicate.Predicate + elseOps []goOperation.Operation + + ifOps = []goOperation.Operation{ + goOperation.Delete([]byte("/123")), + } + ) + + resp, err := driver.Execute(ctx, predicates, ifOps, elseOps) + require.NoError(t, err) + require.True(t, resp.Succeeded) + require.Len(t, resp.Results, 1) + require.Empty(t, resp.Results[0].Values) +} + +func TestDriver_Execute_Put(t *testing.T) { + t.Parallel() + + data := []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{{}}, + "is_success": true, + }, + "revision": 1000, + }, + } + mock := gsTesting.NewMockDoer(t, + gsTesting.NewMockResponse(t, data), + ) + + driver := tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) + + var ( + ctx = context.Background() + predicates []goPredicate.Predicate + elseOps []goOperation.Operation + + ifOps = []goOperation.Operation{ + goOperation.Put([]byte("/123"), []byte("123")), + } + ) + + resp, err := driver.Execute(ctx, predicates, ifOps, elseOps) + require.NoError(t, err) + require.True(t, resp.Succeeded) + require.Len(t, resp.Results, 1) + require.Empty(t, resp.Results[0].Values) +} + +func TestDriver_Execute_InvalidBody(t *testing.T) { + t.Parallel() + + data := []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{{}}, + "is_success": true, + }, + "revision": 1000, + }, + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{{}}, + "is_success": true, + }, + "revision": 1000, + }, + } + mock := gsTesting.NewMockDoer(t, + gsTesting.NewMockResponse(t, data), + ) + + driver := tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) + + var ( + ctx = context.Background() + predicates []goPredicate.Predicate + elseOps []goOperation.Operation + + ifOps = []goOperation.Operation{ + goOperation.Delete([]byte("/123")), + } + ) + + _, err := driver.Execute(ctx, predicates, ifOps, elseOps) + require.Error(t, err) + require.ErrorIs(t, err, tcs.ErrUnexpectedResponse) +} + +func TestDriver_Execute_WithPredicatesAndElse(t *testing.T) { + t.Parallel() + + data := []interface{}{ + map[string]interface{}{ + "data": map[string]interface{}{ + "responses": [][]interface { + }{{}}, + "is_success": false, + }, + "revision": 1000, + }, + } + mock := gsTesting.NewMockDoer(t, + gsTesting.NewMockResponse(t, data), + ) + + driver := tcs.New(gsTesting.NewMockDoerWithWatcher(mock, nil)) + + var ( + ctx = context.Background() + predicates = []goPredicate.Predicate{ + goPredicate.VersionLess([]byte("/123"), 100), + goPredicate.VersionGreater([]byte("/123"), 100), + goPredicate.VersionEqual([]byte("/123"), 100), + goPredicate.VersionNotEqual([]byte("/123"), 100), + goPredicate.ValueEqual([]byte("/123"), []byte("123")), + goPredicate.ValueNotEqual([]byte("/123"), []byte("123")), + } + ifOps = []goOperation.Operation{ + goOperation.Delete([]byte("/123")), + } + + elseOps = []goOperation.Operation{ + goOperation.Put([]byte("/123"), []byte("123")), + } + ) + + resp, err := driver.Execute(ctx, predicates, ifOps, elseOps) + require.NoError(t, err) + require.False(t, resp.Succeeded) + require.Len(t, resp.Results, 1) + require.Empty(t, resp.Results[0].Values) +} diff --git a/driver/tcs/txn.go b/driver/tcs/txn.go new file mode 100644 index 0000000..099a66c --- /dev/null +++ b/driver/tcs/txn.go @@ -0,0 +1,84 @@ +package tcs + +import ( + "github.com/vmihailenco/msgpack/v5" + + "github.com/tarantool/go-storage/kv" + goOperation "github.com/tarantool/go-storage/operation" + goPredicate "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-storage/tx" +) + +type txnOpResponse struct { + Response []struct { + Path []byte `msgpack:"path"` + ModRevision int64 `msgpack:"mod_revision"` + Value []byte `msgpack:"value"` + } +} + +func (t *txnOpResponse) DecodeMsgpack(decoder *msgpack.Decoder) error { + err := decoder.Decode(&t.Response) + if err != nil { + return NewTxnOpResponseDecodingError(err) + } + + return nil +} + +type txnResponse struct { + Data struct { + IsSuccess bool `msgpack:"is_success"` + Responses []txnOpResponse `msgpack:"responses"` + } `msgpack:"data"` + Revision int64 `msgpack:"revision"` +} + +func (r txnResponse) asTxnResponse() tx.Response { + results := make([]tx.RequestResponse, 0, len(r.Data.Responses)) + for _, val := range r.Data.Responses { + keyValues := make([]kv.KeyValue, 0, len(val.Response)) + for _, resp := range val.Response { + modRevision := resp.ModRevision + if modRevision == 0 && r.Revision != 0 { + modRevision = r.Revision + } + + keyValues = append(keyValues, kv.KeyValue{ + Key: resp.Path, + Value: resp.Value, + ModRevision: modRevision, + }) + } + + results = append(results, tx.RequestResponse{ + Values: keyValues, + }) + } + + return tx.Response{ + Succeeded: r.Data.IsSuccess, + Results: results, + } +} + +type txnRequest struct { + _msgpack struct{} `msgpack:",omitempty"` + + Predicates []predicate `msgpack:"predicates"` + OnSuccess []operation `msgpack:"on_success"` + OnFailure []operation `msgpack:"on_failure"` +} + +func newTxnRequest( + predicates []goPredicate.Predicate, + onSuccess []goOperation.Operation, + onFailure []goOperation.Operation, +) txnRequest { + return txnRequest{ + _msgpack: struct{}{}, + Predicates: newPredicates(predicates), + OnSuccess: newOperations(onSuccess), + OnFailure: newOperations(onFailure), + } +} diff --git a/go.mod b/go.mod index e6dacfb..490d788 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/tarantool/go-option v1.0.0 github.com/tarantool/go-tarantool/v2 v2.4.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 go.etcd.io/etcd/client/v3 v3.6.5 ) @@ -21,7 +22,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/tarantool/go-iproto v1.1.0 // indirect - github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.etcd.io/etcd/api/v3 v3.6.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect diff --git a/internal/mocks/doer_watcher.go b/internal/mocks/doer_watcher.go new file mode 100644 index 0000000..c3f7a85 --- /dev/null +++ b/internal/mocks/doer_watcher.go @@ -0,0 +1,3 @@ +package mocks + +//go:generate go tool minimock -g -i github.com/tarantool/go-storage/driver/tcs.DoerWatcher -s _mock.go diff --git a/internal/mocks/doer_watcher_mock.go b/internal/mocks/doer_watcher_mock.go new file mode 100644 index 0000000..5b19e2c --- /dev/null +++ b/internal/mocks/doer_watcher_mock.go @@ -0,0 +1,739 @@ +// Code generated by http://github.com/gojuno/minimock (v3.4.7). DO NOT EDIT. + +package mocks + +import ( + "sync" + mm_atomic "sync/atomic" + mm_time "time" + + "github.com/gojuno/minimock/v3" + "github.com/tarantool/go-tarantool/v2" +) + +// DoerWatcherMock implements mm_tcs.DoerWatcher +type DoerWatcherMock struct { + t minimock.Tester + finishOnce sync.Once + + funcDo func(req tarantool.Request) (fut *tarantool.Future) + funcDoOrigin string + inspectFuncDo func(req tarantool.Request) + afterDoCounter uint64 + beforeDoCounter uint64 + DoMock mDoerWatcherMockDo + + funcNewWatcher func(key string, callback tarantool.WatchCallback) (w1 tarantool.Watcher, err error) + funcNewWatcherOrigin string + inspectFuncNewWatcher func(key string, callback tarantool.WatchCallback) + afterNewWatcherCounter uint64 + beforeNewWatcherCounter uint64 + NewWatcherMock mDoerWatcherMockNewWatcher +} + +// NewDoerWatcherMock returns a mock for mm_tcs.DoerWatcher +func NewDoerWatcherMock(t minimock.Tester) *DoerWatcherMock { + m := &DoerWatcherMock{t: t} + + if controller, ok := t.(minimock.MockController); ok { + controller.RegisterMocker(m) + } + + m.DoMock = mDoerWatcherMockDo{mock: m} + m.DoMock.callArgs = []*DoerWatcherMockDoParams{} + + m.NewWatcherMock = mDoerWatcherMockNewWatcher{mock: m} + m.NewWatcherMock.callArgs = []*DoerWatcherMockNewWatcherParams{} + + t.Cleanup(m.MinimockFinish) + + return m +} + +type mDoerWatcherMockDo struct { + optional bool + mock *DoerWatcherMock + defaultExpectation *DoerWatcherMockDoExpectation + expectations []*DoerWatcherMockDoExpectation + + callArgs []*DoerWatcherMockDoParams + mutex sync.RWMutex + + expectedInvocations uint64 + expectedInvocationsOrigin string +} + +// DoerWatcherMockDoExpectation specifies expectation struct of the DoerWatcher.Do +type DoerWatcherMockDoExpectation struct { + mock *DoerWatcherMock + params *DoerWatcherMockDoParams + paramPtrs *DoerWatcherMockDoParamPtrs + expectationOrigins DoerWatcherMockDoExpectationOrigins + results *DoerWatcherMockDoResults + returnOrigin string + Counter uint64 +} + +// DoerWatcherMockDoParams contains parameters of the DoerWatcher.Do +type DoerWatcherMockDoParams struct { + req tarantool.Request +} + +// DoerWatcherMockDoParamPtrs contains pointers to parameters of the DoerWatcher.Do +type DoerWatcherMockDoParamPtrs struct { + req *tarantool.Request +} + +// DoerWatcherMockDoResults contains results of the DoerWatcher.Do +type DoerWatcherMockDoResults struct { + fut *tarantool.Future +} + +// DoerWatcherMockDoOrigins contains origins of expectations of the DoerWatcher.Do +type DoerWatcherMockDoExpectationOrigins struct { + origin string + originReq string +} + +// Marks this method to be optional. The default behavior of any method with Return() is '1 or more', meaning +// the test will fail minimock's automatic final call check if the mocked method was not called at least once. +// Optional() makes method check to work in '0 or more' mode. +// It is NOT RECOMMENDED to use this option unless you really need it, as default behaviour helps to +// catch the problems when the expected method call is totally skipped during test run. +func (mmDo *mDoerWatcherMockDo) Optional() *mDoerWatcherMockDo { + mmDo.optional = true + return mmDo +} + +// Expect sets up expected params for DoerWatcher.Do +func (mmDo *mDoerWatcherMockDo) Expect(req tarantool.Request) *mDoerWatcherMockDo { + if mmDo.mock.funcDo != nil { + mmDo.mock.t.Fatalf("DoerWatcherMock.Do mock is already set by Set") + } + + if mmDo.defaultExpectation == nil { + mmDo.defaultExpectation = &DoerWatcherMockDoExpectation{} + } + + if mmDo.defaultExpectation.paramPtrs != nil { + mmDo.mock.t.Fatalf("DoerWatcherMock.Do mock is already set by ExpectParams functions") + } + + mmDo.defaultExpectation.params = &DoerWatcherMockDoParams{req} + mmDo.defaultExpectation.expectationOrigins.origin = minimock.CallerInfo(1) + for _, e := range mmDo.expectations { + if minimock.Equal(e.params, mmDo.defaultExpectation.params) { + mmDo.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmDo.defaultExpectation.params) + } + } + + return mmDo +} + +// ExpectReqParam1 sets up expected param req for DoerWatcher.Do +func (mmDo *mDoerWatcherMockDo) ExpectReqParam1(req tarantool.Request) *mDoerWatcherMockDo { + if mmDo.mock.funcDo != nil { + mmDo.mock.t.Fatalf("DoerWatcherMock.Do mock is already set by Set") + } + + if mmDo.defaultExpectation == nil { + mmDo.defaultExpectation = &DoerWatcherMockDoExpectation{} + } + + if mmDo.defaultExpectation.params != nil { + mmDo.mock.t.Fatalf("DoerWatcherMock.Do mock is already set by Expect") + } + + if mmDo.defaultExpectation.paramPtrs == nil { + mmDo.defaultExpectation.paramPtrs = &DoerWatcherMockDoParamPtrs{} + } + mmDo.defaultExpectation.paramPtrs.req = &req + mmDo.defaultExpectation.expectationOrigins.originReq = minimock.CallerInfo(1) + + return mmDo +} + +// Inspect accepts an inspector function that has same arguments as the DoerWatcher.Do +func (mmDo *mDoerWatcherMockDo) Inspect(f func(req tarantool.Request)) *mDoerWatcherMockDo { + if mmDo.mock.inspectFuncDo != nil { + mmDo.mock.t.Fatalf("Inspect function is already set for DoerWatcherMock.Do") + } + + mmDo.mock.inspectFuncDo = f + + return mmDo +} + +// Return sets up results that will be returned by DoerWatcher.Do +func (mmDo *mDoerWatcherMockDo) Return(fut *tarantool.Future) *DoerWatcherMock { + if mmDo.mock.funcDo != nil { + mmDo.mock.t.Fatalf("DoerWatcherMock.Do mock is already set by Set") + } + + if mmDo.defaultExpectation == nil { + mmDo.defaultExpectation = &DoerWatcherMockDoExpectation{mock: mmDo.mock} + } + mmDo.defaultExpectation.results = &DoerWatcherMockDoResults{fut} + mmDo.defaultExpectation.returnOrigin = minimock.CallerInfo(1) + return mmDo.mock +} + +// Set uses given function f to mock the DoerWatcher.Do method +func (mmDo *mDoerWatcherMockDo) Set(f func(req tarantool.Request) (fut *tarantool.Future)) *DoerWatcherMock { + if mmDo.defaultExpectation != nil { + mmDo.mock.t.Fatalf("Default expectation is already set for the DoerWatcher.Do method") + } + + if len(mmDo.expectations) > 0 { + mmDo.mock.t.Fatalf("Some expectations are already set for the DoerWatcher.Do method") + } + + mmDo.mock.funcDo = f + mmDo.mock.funcDoOrigin = minimock.CallerInfo(1) + return mmDo.mock +} + +// When sets expectation for the DoerWatcher.Do which will trigger the result defined by the following +// Then helper +func (mmDo *mDoerWatcherMockDo) When(req tarantool.Request) *DoerWatcherMockDoExpectation { + if mmDo.mock.funcDo != nil { + mmDo.mock.t.Fatalf("DoerWatcherMock.Do mock is already set by Set") + } + + expectation := &DoerWatcherMockDoExpectation{ + mock: mmDo.mock, + params: &DoerWatcherMockDoParams{req}, + expectationOrigins: DoerWatcherMockDoExpectationOrigins{origin: minimock.CallerInfo(1)}, + } + mmDo.expectations = append(mmDo.expectations, expectation) + return expectation +} + +// Then sets up DoerWatcher.Do return parameters for the expectation previously defined by the When method +func (e *DoerWatcherMockDoExpectation) Then(fut *tarantool.Future) *DoerWatcherMock { + e.results = &DoerWatcherMockDoResults{fut} + return e.mock +} + +// Times sets number of times DoerWatcher.Do should be invoked +func (mmDo *mDoerWatcherMockDo) Times(n uint64) *mDoerWatcherMockDo { + if n == 0 { + mmDo.mock.t.Fatalf("Times of DoerWatcherMock.Do mock can not be zero") + } + mm_atomic.StoreUint64(&mmDo.expectedInvocations, n) + mmDo.expectedInvocationsOrigin = minimock.CallerInfo(1) + return mmDo +} + +func (mmDo *mDoerWatcherMockDo) invocationsDone() bool { + if len(mmDo.expectations) == 0 && mmDo.defaultExpectation == nil && mmDo.mock.funcDo == nil { + return true + } + + totalInvocations := mm_atomic.LoadUint64(&mmDo.mock.afterDoCounter) + expectedInvocations := mm_atomic.LoadUint64(&mmDo.expectedInvocations) + + return totalInvocations > 0 && (expectedInvocations == 0 || expectedInvocations == totalInvocations) +} + +// Do implements mm_tcs.DoerWatcher +func (mmDo *DoerWatcherMock) Do(req tarantool.Request) (fut *tarantool.Future) { + mm_atomic.AddUint64(&mmDo.beforeDoCounter, 1) + defer mm_atomic.AddUint64(&mmDo.afterDoCounter, 1) + + mmDo.t.Helper() + + if mmDo.inspectFuncDo != nil { + mmDo.inspectFuncDo(req) + } + + mm_params := DoerWatcherMockDoParams{req} + + // Record call args + mmDo.DoMock.mutex.Lock() + mmDo.DoMock.callArgs = append(mmDo.DoMock.callArgs, &mm_params) + mmDo.DoMock.mutex.Unlock() + + for _, e := range mmDo.DoMock.expectations { + if minimock.Equal(*e.params, mm_params) { + mm_atomic.AddUint64(&e.Counter, 1) + return e.results.fut + } + } + + if mmDo.DoMock.defaultExpectation != nil { + mm_atomic.AddUint64(&mmDo.DoMock.defaultExpectation.Counter, 1) + mm_want := mmDo.DoMock.defaultExpectation.params + mm_want_ptrs := mmDo.DoMock.defaultExpectation.paramPtrs + + mm_got := DoerWatcherMockDoParams{req} + + if mm_want_ptrs != nil { + + if mm_want_ptrs.req != nil && !minimock.Equal(*mm_want_ptrs.req, mm_got.req) { + mmDo.t.Errorf("DoerWatcherMock.Do got unexpected parameter req, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmDo.DoMock.defaultExpectation.expectationOrigins.originReq, *mm_want_ptrs.req, mm_got.req, minimock.Diff(*mm_want_ptrs.req, mm_got.req)) + } + + } else if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { + mmDo.t.Errorf("DoerWatcherMock.Do got unexpected parameters, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmDo.DoMock.defaultExpectation.expectationOrigins.origin, *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) + } + + mm_results := mmDo.DoMock.defaultExpectation.results + if mm_results == nil { + mmDo.t.Fatal("No results are set for the DoerWatcherMock.Do") + } + return (*mm_results).fut + } + if mmDo.funcDo != nil { + return mmDo.funcDo(req) + } + mmDo.t.Fatalf("Unexpected call to DoerWatcherMock.Do. %v", req) + return +} + +// DoAfterCounter returns a count of finished DoerWatcherMock.Do invocations +func (mmDo *DoerWatcherMock) DoAfterCounter() uint64 { + return mm_atomic.LoadUint64(&mmDo.afterDoCounter) +} + +// DoBeforeCounter returns a count of DoerWatcherMock.Do invocations +func (mmDo *DoerWatcherMock) DoBeforeCounter() uint64 { + return mm_atomic.LoadUint64(&mmDo.beforeDoCounter) +} + +// Calls returns a list of arguments used in each call to DoerWatcherMock.Do. +// The list is in the same order as the calls were made (i.e. recent calls have a higher index) +func (mmDo *mDoerWatcherMockDo) Calls() []*DoerWatcherMockDoParams { + mmDo.mutex.RLock() + + argCopy := make([]*DoerWatcherMockDoParams, len(mmDo.callArgs)) + copy(argCopy, mmDo.callArgs) + + mmDo.mutex.RUnlock() + + return argCopy +} + +// MinimockDoDone returns true if the count of the Do invocations corresponds +// the number of defined expectations +func (m *DoerWatcherMock) MinimockDoDone() bool { + if m.DoMock.optional { + // Optional methods provide '0 or more' call count restriction. + return true + } + + for _, e := range m.DoMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + return false + } + } + + return m.DoMock.invocationsDone() +} + +// MinimockDoInspect logs each unmet expectation +func (m *DoerWatcherMock) MinimockDoInspect() { + for _, e := range m.DoMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + m.t.Errorf("Expected call to DoerWatcherMock.Do at\n%s with params: %#v", e.expectationOrigins.origin, *e.params) + } + } + + afterDoCounter := mm_atomic.LoadUint64(&m.afterDoCounter) + // if default expectation was set then invocations count should be greater than zero + if m.DoMock.defaultExpectation != nil && afterDoCounter < 1 { + if m.DoMock.defaultExpectation.params == nil { + m.t.Errorf("Expected call to DoerWatcherMock.Do at\n%s", m.DoMock.defaultExpectation.returnOrigin) + } else { + m.t.Errorf("Expected call to DoerWatcherMock.Do at\n%s with params: %#v", m.DoMock.defaultExpectation.expectationOrigins.origin, *m.DoMock.defaultExpectation.params) + } + } + // if func was set then invocations count should be greater than zero + if m.funcDo != nil && afterDoCounter < 1 { + m.t.Errorf("Expected call to DoerWatcherMock.Do at\n%s", m.funcDoOrigin) + } + + if !m.DoMock.invocationsDone() && afterDoCounter > 0 { + m.t.Errorf("Expected %d calls to DoerWatcherMock.Do at\n%s but found %d calls", + mm_atomic.LoadUint64(&m.DoMock.expectedInvocations), m.DoMock.expectedInvocationsOrigin, afterDoCounter) + } +} + +type mDoerWatcherMockNewWatcher struct { + optional bool + mock *DoerWatcherMock + defaultExpectation *DoerWatcherMockNewWatcherExpectation + expectations []*DoerWatcherMockNewWatcherExpectation + + callArgs []*DoerWatcherMockNewWatcherParams + mutex sync.RWMutex + + expectedInvocations uint64 + expectedInvocationsOrigin string +} + +// DoerWatcherMockNewWatcherExpectation specifies expectation struct of the DoerWatcher.NewWatcher +type DoerWatcherMockNewWatcherExpectation struct { + mock *DoerWatcherMock + params *DoerWatcherMockNewWatcherParams + paramPtrs *DoerWatcherMockNewWatcherParamPtrs + expectationOrigins DoerWatcherMockNewWatcherExpectationOrigins + results *DoerWatcherMockNewWatcherResults + returnOrigin string + Counter uint64 +} + +// DoerWatcherMockNewWatcherParams contains parameters of the DoerWatcher.NewWatcher +type DoerWatcherMockNewWatcherParams struct { + key string + callback tarantool.WatchCallback +} + +// DoerWatcherMockNewWatcherParamPtrs contains pointers to parameters of the DoerWatcher.NewWatcher +type DoerWatcherMockNewWatcherParamPtrs struct { + key *string + callback *tarantool.WatchCallback +} + +// DoerWatcherMockNewWatcherResults contains results of the DoerWatcher.NewWatcher +type DoerWatcherMockNewWatcherResults struct { + w1 tarantool.Watcher + err error +} + +// DoerWatcherMockNewWatcherOrigins contains origins of expectations of the DoerWatcher.NewWatcher +type DoerWatcherMockNewWatcherExpectationOrigins struct { + origin string + originKey string + originCallback string +} + +// Marks this method to be optional. The default behavior of any method with Return() is '1 or more', meaning +// the test will fail minimock's automatic final call check if the mocked method was not called at least once. +// Optional() makes method check to work in '0 or more' mode. +// It is NOT RECOMMENDED to use this option unless you really need it, as default behaviour helps to +// catch the problems when the expected method call is totally skipped during test run. +func (mmNewWatcher *mDoerWatcherMockNewWatcher) Optional() *mDoerWatcherMockNewWatcher { + mmNewWatcher.optional = true + return mmNewWatcher +} + +// Expect sets up expected params for DoerWatcher.NewWatcher +func (mmNewWatcher *mDoerWatcherMockNewWatcher) Expect(key string, callback tarantool.WatchCallback) *mDoerWatcherMockNewWatcher { + if mmNewWatcher.mock.funcNewWatcher != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by Set") + } + + if mmNewWatcher.defaultExpectation == nil { + mmNewWatcher.defaultExpectation = &DoerWatcherMockNewWatcherExpectation{} + } + + if mmNewWatcher.defaultExpectation.paramPtrs != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by ExpectParams functions") + } + + mmNewWatcher.defaultExpectation.params = &DoerWatcherMockNewWatcherParams{key, callback} + mmNewWatcher.defaultExpectation.expectationOrigins.origin = minimock.CallerInfo(1) + for _, e := range mmNewWatcher.expectations { + if minimock.Equal(e.params, mmNewWatcher.defaultExpectation.params) { + mmNewWatcher.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmNewWatcher.defaultExpectation.params) + } + } + + return mmNewWatcher +} + +// ExpectKeyParam1 sets up expected param key for DoerWatcher.NewWatcher +func (mmNewWatcher *mDoerWatcherMockNewWatcher) ExpectKeyParam1(key string) *mDoerWatcherMockNewWatcher { + if mmNewWatcher.mock.funcNewWatcher != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by Set") + } + + if mmNewWatcher.defaultExpectation == nil { + mmNewWatcher.defaultExpectation = &DoerWatcherMockNewWatcherExpectation{} + } + + if mmNewWatcher.defaultExpectation.params != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by Expect") + } + + if mmNewWatcher.defaultExpectation.paramPtrs == nil { + mmNewWatcher.defaultExpectation.paramPtrs = &DoerWatcherMockNewWatcherParamPtrs{} + } + mmNewWatcher.defaultExpectation.paramPtrs.key = &key + mmNewWatcher.defaultExpectation.expectationOrigins.originKey = minimock.CallerInfo(1) + + return mmNewWatcher +} + +// ExpectCallbackParam2 sets up expected param callback for DoerWatcher.NewWatcher +func (mmNewWatcher *mDoerWatcherMockNewWatcher) ExpectCallbackParam2(callback tarantool.WatchCallback) *mDoerWatcherMockNewWatcher { + if mmNewWatcher.mock.funcNewWatcher != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by Set") + } + + if mmNewWatcher.defaultExpectation == nil { + mmNewWatcher.defaultExpectation = &DoerWatcherMockNewWatcherExpectation{} + } + + if mmNewWatcher.defaultExpectation.params != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by Expect") + } + + if mmNewWatcher.defaultExpectation.paramPtrs == nil { + mmNewWatcher.defaultExpectation.paramPtrs = &DoerWatcherMockNewWatcherParamPtrs{} + } + mmNewWatcher.defaultExpectation.paramPtrs.callback = &callback + mmNewWatcher.defaultExpectation.expectationOrigins.originCallback = minimock.CallerInfo(1) + + return mmNewWatcher +} + +// Inspect accepts an inspector function that has same arguments as the DoerWatcher.NewWatcher +func (mmNewWatcher *mDoerWatcherMockNewWatcher) Inspect(f func(key string, callback tarantool.WatchCallback)) *mDoerWatcherMockNewWatcher { + if mmNewWatcher.mock.inspectFuncNewWatcher != nil { + mmNewWatcher.mock.t.Fatalf("Inspect function is already set for DoerWatcherMock.NewWatcher") + } + + mmNewWatcher.mock.inspectFuncNewWatcher = f + + return mmNewWatcher +} + +// Return sets up results that will be returned by DoerWatcher.NewWatcher +func (mmNewWatcher *mDoerWatcherMockNewWatcher) Return(w1 tarantool.Watcher, err error) *DoerWatcherMock { + if mmNewWatcher.mock.funcNewWatcher != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by Set") + } + + if mmNewWatcher.defaultExpectation == nil { + mmNewWatcher.defaultExpectation = &DoerWatcherMockNewWatcherExpectation{mock: mmNewWatcher.mock} + } + mmNewWatcher.defaultExpectation.results = &DoerWatcherMockNewWatcherResults{w1, err} + mmNewWatcher.defaultExpectation.returnOrigin = minimock.CallerInfo(1) + return mmNewWatcher.mock +} + +// Set uses given function f to mock the DoerWatcher.NewWatcher method +func (mmNewWatcher *mDoerWatcherMockNewWatcher) Set(f func(key string, callback tarantool.WatchCallback) (w1 tarantool.Watcher, err error)) *DoerWatcherMock { + if mmNewWatcher.defaultExpectation != nil { + mmNewWatcher.mock.t.Fatalf("Default expectation is already set for the DoerWatcher.NewWatcher method") + } + + if len(mmNewWatcher.expectations) > 0 { + mmNewWatcher.mock.t.Fatalf("Some expectations are already set for the DoerWatcher.NewWatcher method") + } + + mmNewWatcher.mock.funcNewWatcher = f + mmNewWatcher.mock.funcNewWatcherOrigin = minimock.CallerInfo(1) + return mmNewWatcher.mock +} + +// When sets expectation for the DoerWatcher.NewWatcher which will trigger the result defined by the following +// Then helper +func (mmNewWatcher *mDoerWatcherMockNewWatcher) When(key string, callback tarantool.WatchCallback) *DoerWatcherMockNewWatcherExpectation { + if mmNewWatcher.mock.funcNewWatcher != nil { + mmNewWatcher.mock.t.Fatalf("DoerWatcherMock.NewWatcher mock is already set by Set") + } + + expectation := &DoerWatcherMockNewWatcherExpectation{ + mock: mmNewWatcher.mock, + params: &DoerWatcherMockNewWatcherParams{key, callback}, + expectationOrigins: DoerWatcherMockNewWatcherExpectationOrigins{origin: minimock.CallerInfo(1)}, + } + mmNewWatcher.expectations = append(mmNewWatcher.expectations, expectation) + return expectation +} + +// Then sets up DoerWatcher.NewWatcher return parameters for the expectation previously defined by the When method +func (e *DoerWatcherMockNewWatcherExpectation) Then(w1 tarantool.Watcher, err error) *DoerWatcherMock { + e.results = &DoerWatcherMockNewWatcherResults{w1, err} + return e.mock +} + +// Times sets number of times DoerWatcher.NewWatcher should be invoked +func (mmNewWatcher *mDoerWatcherMockNewWatcher) Times(n uint64) *mDoerWatcherMockNewWatcher { + if n == 0 { + mmNewWatcher.mock.t.Fatalf("Times of DoerWatcherMock.NewWatcher mock can not be zero") + } + mm_atomic.StoreUint64(&mmNewWatcher.expectedInvocations, n) + mmNewWatcher.expectedInvocationsOrigin = minimock.CallerInfo(1) + return mmNewWatcher +} + +func (mmNewWatcher *mDoerWatcherMockNewWatcher) invocationsDone() bool { + if len(mmNewWatcher.expectations) == 0 && mmNewWatcher.defaultExpectation == nil && mmNewWatcher.mock.funcNewWatcher == nil { + return true + } + + totalInvocations := mm_atomic.LoadUint64(&mmNewWatcher.mock.afterNewWatcherCounter) + expectedInvocations := mm_atomic.LoadUint64(&mmNewWatcher.expectedInvocations) + + return totalInvocations > 0 && (expectedInvocations == 0 || expectedInvocations == totalInvocations) +} + +// NewWatcher implements mm_tcs.DoerWatcher +func (mmNewWatcher *DoerWatcherMock) NewWatcher(key string, callback tarantool.WatchCallback) (w1 tarantool.Watcher, err error) { + mm_atomic.AddUint64(&mmNewWatcher.beforeNewWatcherCounter, 1) + defer mm_atomic.AddUint64(&mmNewWatcher.afterNewWatcherCounter, 1) + + mmNewWatcher.t.Helper() + + if mmNewWatcher.inspectFuncNewWatcher != nil { + mmNewWatcher.inspectFuncNewWatcher(key, callback) + } + + mm_params := DoerWatcherMockNewWatcherParams{key, callback} + + // Record call args + mmNewWatcher.NewWatcherMock.mutex.Lock() + mmNewWatcher.NewWatcherMock.callArgs = append(mmNewWatcher.NewWatcherMock.callArgs, &mm_params) + mmNewWatcher.NewWatcherMock.mutex.Unlock() + + for _, e := range mmNewWatcher.NewWatcherMock.expectations { + if minimock.Equal(*e.params, mm_params) { + mm_atomic.AddUint64(&e.Counter, 1) + return e.results.w1, e.results.err + } + } + + if mmNewWatcher.NewWatcherMock.defaultExpectation != nil { + mm_atomic.AddUint64(&mmNewWatcher.NewWatcherMock.defaultExpectation.Counter, 1) + mm_want := mmNewWatcher.NewWatcherMock.defaultExpectation.params + mm_want_ptrs := mmNewWatcher.NewWatcherMock.defaultExpectation.paramPtrs + + mm_got := DoerWatcherMockNewWatcherParams{key, callback} + + if mm_want_ptrs != nil { + + if mm_want_ptrs.key != nil && !minimock.Equal(*mm_want_ptrs.key, mm_got.key) { + mmNewWatcher.t.Errorf("DoerWatcherMock.NewWatcher got unexpected parameter key, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmNewWatcher.NewWatcherMock.defaultExpectation.expectationOrigins.originKey, *mm_want_ptrs.key, mm_got.key, minimock.Diff(*mm_want_ptrs.key, mm_got.key)) + } + + if mm_want_ptrs.callback != nil && !minimock.Equal(*mm_want_ptrs.callback, mm_got.callback) { + mmNewWatcher.t.Errorf("DoerWatcherMock.NewWatcher got unexpected parameter callback, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmNewWatcher.NewWatcherMock.defaultExpectation.expectationOrigins.originCallback, *mm_want_ptrs.callback, mm_got.callback, minimock.Diff(*mm_want_ptrs.callback, mm_got.callback)) + } + + } else if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { + mmNewWatcher.t.Errorf("DoerWatcherMock.NewWatcher got unexpected parameters, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmNewWatcher.NewWatcherMock.defaultExpectation.expectationOrigins.origin, *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) + } + + mm_results := mmNewWatcher.NewWatcherMock.defaultExpectation.results + if mm_results == nil { + mmNewWatcher.t.Fatal("No results are set for the DoerWatcherMock.NewWatcher") + } + return (*mm_results).w1, (*mm_results).err + } + if mmNewWatcher.funcNewWatcher != nil { + return mmNewWatcher.funcNewWatcher(key, callback) + } + mmNewWatcher.t.Fatalf("Unexpected call to DoerWatcherMock.NewWatcher. %v %v", key, callback) + return +} + +// NewWatcherAfterCounter returns a count of finished DoerWatcherMock.NewWatcher invocations +func (mmNewWatcher *DoerWatcherMock) NewWatcherAfterCounter() uint64 { + return mm_atomic.LoadUint64(&mmNewWatcher.afterNewWatcherCounter) +} + +// NewWatcherBeforeCounter returns a count of DoerWatcherMock.NewWatcher invocations +func (mmNewWatcher *DoerWatcherMock) NewWatcherBeforeCounter() uint64 { + return mm_atomic.LoadUint64(&mmNewWatcher.beforeNewWatcherCounter) +} + +// Calls returns a list of arguments used in each call to DoerWatcherMock.NewWatcher. +// The list is in the same order as the calls were made (i.e. recent calls have a higher index) +func (mmNewWatcher *mDoerWatcherMockNewWatcher) Calls() []*DoerWatcherMockNewWatcherParams { + mmNewWatcher.mutex.RLock() + + argCopy := make([]*DoerWatcherMockNewWatcherParams, len(mmNewWatcher.callArgs)) + copy(argCopy, mmNewWatcher.callArgs) + + mmNewWatcher.mutex.RUnlock() + + return argCopy +} + +// MinimockNewWatcherDone returns true if the count of the NewWatcher invocations corresponds +// the number of defined expectations +func (m *DoerWatcherMock) MinimockNewWatcherDone() bool { + if m.NewWatcherMock.optional { + // Optional methods provide '0 or more' call count restriction. + return true + } + + for _, e := range m.NewWatcherMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + return false + } + } + + return m.NewWatcherMock.invocationsDone() +} + +// MinimockNewWatcherInspect logs each unmet expectation +func (m *DoerWatcherMock) MinimockNewWatcherInspect() { + for _, e := range m.NewWatcherMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + m.t.Errorf("Expected call to DoerWatcherMock.NewWatcher at\n%s with params: %#v", e.expectationOrigins.origin, *e.params) + } + } + + afterNewWatcherCounter := mm_atomic.LoadUint64(&m.afterNewWatcherCounter) + // if default expectation was set then invocations count should be greater than zero + if m.NewWatcherMock.defaultExpectation != nil && afterNewWatcherCounter < 1 { + if m.NewWatcherMock.defaultExpectation.params == nil { + m.t.Errorf("Expected call to DoerWatcherMock.NewWatcher at\n%s", m.NewWatcherMock.defaultExpectation.returnOrigin) + } else { + m.t.Errorf("Expected call to DoerWatcherMock.NewWatcher at\n%s with params: %#v", m.NewWatcherMock.defaultExpectation.expectationOrigins.origin, *m.NewWatcherMock.defaultExpectation.params) + } + } + // if func was set then invocations count should be greater than zero + if m.funcNewWatcher != nil && afterNewWatcherCounter < 1 { + m.t.Errorf("Expected call to DoerWatcherMock.NewWatcher at\n%s", m.funcNewWatcherOrigin) + } + + if !m.NewWatcherMock.invocationsDone() && afterNewWatcherCounter > 0 { + m.t.Errorf("Expected %d calls to DoerWatcherMock.NewWatcher at\n%s but found %d calls", + mm_atomic.LoadUint64(&m.NewWatcherMock.expectedInvocations), m.NewWatcherMock.expectedInvocationsOrigin, afterNewWatcherCounter) + } +} + +// MinimockFinish checks that all mocked methods have been called the expected number of times +func (m *DoerWatcherMock) MinimockFinish() { + m.finishOnce.Do(func() { + if !m.minimockDone() { + m.MinimockDoInspect() + + m.MinimockNewWatcherInspect() + } + }) +} + +// MinimockWait waits for all mocked methods to be called the expected number of times +func (m *DoerWatcherMock) MinimockWait(timeout mm_time.Duration) { + timeoutCh := mm_time.After(timeout) + for { + if m.minimockDone() { + return + } + select { + case <-timeoutCh: + m.MinimockFinish() + return + case <-mm_time.After(10 * mm_time.Millisecond): + } + } +} + +func (m *DoerWatcherMock) minimockDone() bool { + done := true + return done && + m.MinimockDoDone() && + m.MinimockNewWatcherDone() +} diff --git a/internal/mocks/driver_mock.go b/internal/mocks/driver_mock.go index bd6a2f1..4136fc6 100644 --- a/internal/mocks/driver_mock.go +++ b/internal/mocks/driver_mock.go @@ -27,7 +27,7 @@ type DriverMock struct { beforeExecuteCounter uint64 ExecuteMock mDriverMockExecute - funcWatch func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event) + funcWatch func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event, f1 func(), err error) funcWatchOrigin string inspectFuncWatch func(ctx context.Context, key []byte, opts ...watch.Option) afterWatchCounter uint64 @@ -500,6 +500,8 @@ type DriverMockWatchParamPtrs struct { // DriverMockWatchResults contains results of the Driver.Watch type DriverMockWatchResults struct { ch1 <-chan watch.Event + f1 func() + err error } // DriverMockWatchOrigins contains origins of expectations of the Driver.Watch @@ -626,7 +628,7 @@ func (mmWatch *mDriverMockWatch) Inspect(f func(ctx context.Context, key []byte, } // Return sets up results that will be returned by Driver.Watch -func (mmWatch *mDriverMockWatch) Return(ch1 <-chan watch.Event) *DriverMock { +func (mmWatch *mDriverMockWatch) Return(ch1 <-chan watch.Event, f1 func(), err error) *DriverMock { if mmWatch.mock.funcWatch != nil { mmWatch.mock.t.Fatalf("DriverMock.Watch mock is already set by Set") } @@ -634,13 +636,13 @@ func (mmWatch *mDriverMockWatch) Return(ch1 <-chan watch.Event) *DriverMock { if mmWatch.defaultExpectation == nil { mmWatch.defaultExpectation = &DriverMockWatchExpectation{mock: mmWatch.mock} } - mmWatch.defaultExpectation.results = &DriverMockWatchResults{ch1} + mmWatch.defaultExpectation.results = &DriverMockWatchResults{ch1, f1, err} mmWatch.defaultExpectation.returnOrigin = minimock.CallerInfo(1) return mmWatch.mock } // Set uses given function f to mock the Driver.Watch method -func (mmWatch *mDriverMockWatch) Set(f func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event)) *DriverMock { +func (mmWatch *mDriverMockWatch) Set(f func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event, f1 func(), err error)) *DriverMock { if mmWatch.defaultExpectation != nil { mmWatch.mock.t.Fatalf("Default expectation is already set for the Driver.Watch method") } @@ -671,8 +673,8 @@ func (mmWatch *mDriverMockWatch) When(ctx context.Context, key []byte, opts ...w } // Then sets up Driver.Watch return parameters for the expectation previously defined by the When method -func (e *DriverMockWatchExpectation) Then(ch1 <-chan watch.Event) *DriverMock { - e.results = &DriverMockWatchResults{ch1} +func (e *DriverMockWatchExpectation) Then(ch1 <-chan watch.Event, f1 func(), err error) *DriverMock { + e.results = &DriverMockWatchResults{ch1, f1, err} return e.mock } @@ -698,7 +700,7 @@ func (mmWatch *mDriverMockWatch) invocationsDone() bool { } // Watch implements mm_driver.Driver -func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event) { +func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event, f1 func(), err error) { mm_atomic.AddUint64(&mmWatch.beforeWatchCounter, 1) defer mm_atomic.AddUint64(&mmWatch.afterWatchCounter, 1) @@ -718,7 +720,7 @@ func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch. for _, e := range mmWatch.WatchMock.expectations { if minimock.Equal(*e.params, mm_params) { mm_atomic.AddUint64(&e.Counter, 1) - return e.results.ch1 + return e.results.ch1, e.results.f1, e.results.err } } @@ -755,7 +757,7 @@ func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch. if mm_results == nil { mmWatch.t.Fatal("No results are set for the DriverMock.Watch") } - return (*mm_results).ch1 + return (*mm_results).ch1, (*mm_results).f1, (*mm_results).err } if mmWatch.funcWatch != nil { return mmWatch.funcWatch(ctx, key, opts...) diff --git a/internal/mocks/tarantool_watcher.go b/internal/mocks/tarantool_watcher.go new file mode 100644 index 0000000..e3c098c --- /dev/null +++ b/internal/mocks/tarantool_watcher.go @@ -0,0 +1,3 @@ +package mocks + +//go:generate go tool minimock -g -i github.com/tarantool/go-tarantool/v2.Watcher -o tarantool_watcher_mock.go diff --git a/internal/mocks/tarantool_watcher_mock.go b/internal/mocks/tarantool_watcher_mock.go new file mode 100644 index 0000000..6047734 --- /dev/null +++ b/internal/mocks/tarantool_watcher_mock.go @@ -0,0 +1,248 @@ +// Code generated by http://github.com/gojuno/minimock (v3.4.7). DO NOT EDIT. + +package mocks + +import ( + "sync" + mm_atomic "sync/atomic" + mm_time "time" + + "github.com/gojuno/minimock/v3" +) + +// WatcherMock implements mm_tarantool.Watcher +type WatcherMock struct { + t minimock.Tester + finishOnce sync.Once + + funcUnregister func() + funcUnregisterOrigin string + inspectFuncUnregister func() + afterUnregisterCounter uint64 + beforeUnregisterCounter uint64 + UnregisterMock mWatcherMockUnregister +} + +// NewWatcherMock returns a mock for mm_tarantool.Watcher +func NewWatcherMock(t minimock.Tester) *WatcherMock { + m := &WatcherMock{t: t} + + if controller, ok := t.(minimock.MockController); ok { + controller.RegisterMocker(m) + } + + m.UnregisterMock = mWatcherMockUnregister{mock: m} + + t.Cleanup(m.MinimockFinish) + + return m +} + +type mWatcherMockUnregister struct { + optional bool + mock *WatcherMock + defaultExpectation *WatcherMockUnregisterExpectation + expectations []*WatcherMockUnregisterExpectation + + expectedInvocations uint64 + expectedInvocationsOrigin string +} + +// WatcherMockUnregisterExpectation specifies expectation struct of the Watcher.Unregister +type WatcherMockUnregisterExpectation struct { + mock *WatcherMock + + returnOrigin string + Counter uint64 +} + +// Marks this method to be optional. The default behavior of any method with Return() is '1 or more', meaning +// the test will fail minimock's automatic final call check if the mocked method was not called at least once. +// Optional() makes method check to work in '0 or more' mode. +// It is NOT RECOMMENDED to use this option unless you really need it, as default behaviour helps to +// catch the problems when the expected method call is totally skipped during test run. +func (mmUnregister *mWatcherMockUnregister) Optional() *mWatcherMockUnregister { + mmUnregister.optional = true + return mmUnregister +} + +// Expect sets up expected params for Watcher.Unregister +func (mmUnregister *mWatcherMockUnregister) Expect() *mWatcherMockUnregister { + if mmUnregister.mock.funcUnregister != nil { + mmUnregister.mock.t.Fatalf("WatcherMock.Unregister mock is already set by Set") + } + + if mmUnregister.defaultExpectation == nil { + mmUnregister.defaultExpectation = &WatcherMockUnregisterExpectation{} + } + + return mmUnregister +} + +// Inspect accepts an inspector function that has same arguments as the Watcher.Unregister +func (mmUnregister *mWatcherMockUnregister) Inspect(f func()) *mWatcherMockUnregister { + if mmUnregister.mock.inspectFuncUnregister != nil { + mmUnregister.mock.t.Fatalf("Inspect function is already set for WatcherMock.Unregister") + } + + mmUnregister.mock.inspectFuncUnregister = f + + return mmUnregister +} + +// Return sets up results that will be returned by Watcher.Unregister +func (mmUnregister *mWatcherMockUnregister) Return() *WatcherMock { + if mmUnregister.mock.funcUnregister != nil { + mmUnregister.mock.t.Fatalf("WatcherMock.Unregister mock is already set by Set") + } + + if mmUnregister.defaultExpectation == nil { + mmUnregister.defaultExpectation = &WatcherMockUnregisterExpectation{mock: mmUnregister.mock} + } + + mmUnregister.defaultExpectation.returnOrigin = minimock.CallerInfo(1) + return mmUnregister.mock +} + +// Set uses given function f to mock the Watcher.Unregister method +func (mmUnregister *mWatcherMockUnregister) Set(f func()) *WatcherMock { + if mmUnregister.defaultExpectation != nil { + mmUnregister.mock.t.Fatalf("Default expectation is already set for the Watcher.Unregister method") + } + + if len(mmUnregister.expectations) > 0 { + mmUnregister.mock.t.Fatalf("Some expectations are already set for the Watcher.Unregister method") + } + + mmUnregister.mock.funcUnregister = f + mmUnregister.mock.funcUnregisterOrigin = minimock.CallerInfo(1) + return mmUnregister.mock +} + +// Times sets number of times Watcher.Unregister should be invoked +func (mmUnregister *mWatcherMockUnregister) Times(n uint64) *mWatcherMockUnregister { + if n == 0 { + mmUnregister.mock.t.Fatalf("Times of WatcherMock.Unregister mock can not be zero") + } + mm_atomic.StoreUint64(&mmUnregister.expectedInvocations, n) + mmUnregister.expectedInvocationsOrigin = minimock.CallerInfo(1) + return mmUnregister +} + +func (mmUnregister *mWatcherMockUnregister) invocationsDone() bool { + if len(mmUnregister.expectations) == 0 && mmUnregister.defaultExpectation == nil && mmUnregister.mock.funcUnregister == nil { + return true + } + + totalInvocations := mm_atomic.LoadUint64(&mmUnregister.mock.afterUnregisterCounter) + expectedInvocations := mm_atomic.LoadUint64(&mmUnregister.expectedInvocations) + + return totalInvocations > 0 && (expectedInvocations == 0 || expectedInvocations == totalInvocations) +} + +// Unregister implements mm_tarantool.Watcher +func (mmUnregister *WatcherMock) Unregister() { + mm_atomic.AddUint64(&mmUnregister.beforeUnregisterCounter, 1) + defer mm_atomic.AddUint64(&mmUnregister.afterUnregisterCounter, 1) + + mmUnregister.t.Helper() + + if mmUnregister.inspectFuncUnregister != nil { + mmUnregister.inspectFuncUnregister() + } + + if mmUnregister.UnregisterMock.defaultExpectation != nil { + mm_atomic.AddUint64(&mmUnregister.UnregisterMock.defaultExpectation.Counter, 1) + + return + + } + if mmUnregister.funcUnregister != nil { + mmUnregister.funcUnregister() + return + } + mmUnregister.t.Fatalf("Unexpected call to WatcherMock.Unregister.") + +} + +// UnregisterAfterCounter returns a count of finished WatcherMock.Unregister invocations +func (mmUnregister *WatcherMock) UnregisterAfterCounter() uint64 { + return mm_atomic.LoadUint64(&mmUnregister.afterUnregisterCounter) +} + +// UnregisterBeforeCounter returns a count of WatcherMock.Unregister invocations +func (mmUnregister *WatcherMock) UnregisterBeforeCounter() uint64 { + return mm_atomic.LoadUint64(&mmUnregister.beforeUnregisterCounter) +} + +// MinimockUnregisterDone returns true if the count of the Unregister invocations corresponds +// the number of defined expectations +func (m *WatcherMock) MinimockUnregisterDone() bool { + if m.UnregisterMock.optional { + // Optional methods provide '0 or more' call count restriction. + return true + } + + for _, e := range m.UnregisterMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + return false + } + } + + return m.UnregisterMock.invocationsDone() +} + +// MinimockUnregisterInspect logs each unmet expectation +func (m *WatcherMock) MinimockUnregisterInspect() { + for _, e := range m.UnregisterMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + m.t.Error("Expected call to WatcherMock.Unregister") + } + } + + afterUnregisterCounter := mm_atomic.LoadUint64(&m.afterUnregisterCounter) + // if default expectation was set then invocations count should be greater than zero + if m.UnregisterMock.defaultExpectation != nil && afterUnregisterCounter < 1 { + m.t.Errorf("Expected call to WatcherMock.Unregister at\n%s", m.UnregisterMock.defaultExpectation.returnOrigin) + } + // if func was set then invocations count should be greater than zero + if m.funcUnregister != nil && afterUnregisterCounter < 1 { + m.t.Errorf("Expected call to WatcherMock.Unregister at\n%s", m.funcUnregisterOrigin) + } + + if !m.UnregisterMock.invocationsDone() && afterUnregisterCounter > 0 { + m.t.Errorf("Expected %d calls to WatcherMock.Unregister at\n%s but found %d calls", + mm_atomic.LoadUint64(&m.UnregisterMock.expectedInvocations), m.UnregisterMock.expectedInvocationsOrigin, afterUnregisterCounter) + } +} + +// MinimockFinish checks that all mocked methods have been called the expected number of times +func (m *WatcherMock) MinimockFinish() { + m.finishOnce.Do(func() { + if !m.minimockDone() { + m.MinimockUnregisterInspect() + } + }) +} + +// MinimockWait waits for all mocked methods to be called the expected number of times +func (m *WatcherMock) MinimockWait(timeout mm_time.Duration) { + timeoutCh := mm_time.After(timeout) + for { + if m.minimockDone() { + return + } + select { + case <-timeoutCh: + m.MinimockFinish() + return + case <-mm_time.After(10 * mm_time.Millisecond): + } + } +} + +func (m *WatcherMock) minimockDone() bool { + done := true + return done && + m.MinimockUnregisterDone() +} diff --git a/internal/testing/doer.go b/internal/testing/doer.go new file mode 100644 index 0000000..230d66b --- /dev/null +++ b/internal/testing/doer.go @@ -0,0 +1,85 @@ +package testing + +import ( + "bytes" + "sync" + + "github.com/tarantool/go-tarantool/v2" +) + +type doerResponse struct { + resp *MockResponse + err error +} + +// MockDoer is an implementation of the Doer interface +// used for testing purposes. +type MockDoer struct { + mu sync.Mutex + // Requests is a slice of received requests. + // It could be used to compare incoming requests with expected. + Requests []tarantool.Request + responses []doerResponse + t T +} + +// NewMockDoer creates a MockDoer by given responses. +// Each response could be one of two types: MockResponse or error. +func NewMockDoer(t T, responses ...interface{}) *MockDoer { + t.Helper() + + mockDoer := &MockDoer{ + mu: sync.Mutex{}, + t: t, + Requests: []tarantool.Request{}, + responses: []doerResponse{}, + } + + for _, response := range responses { + doerResp := doerResponse{ + resp: nil, + err: nil, + } + + switch resp := response.(type) { + case *MockResponse: + doerResp.resp = resp + case error: + doerResp.err = resp + default: + t.Fatalf("unsupported type: %T", response) + } + + mockDoer.responses = append(mockDoer.responses, doerResp) + } + + return mockDoer +} + +// Do returns a future with the current response or an error. +// It saves the current request into MockDoer.Requests. +func (d *MockDoer) Do(req tarantool.Request) *tarantool.Future { + d.mu.Lock() + defer d.mu.Unlock() + + d.Requests = append(d.Requests, req) + + mockReq := NewMockRequest() + fut := tarantool.NewFuture(mockReq) + + if len(d.responses) == 0 { + d.t.Fatalf("list of responses is empty") + } + + response := d.responses[0] + + if response.err != nil { + fut.SetError(response.err) + } else { + _ = fut.SetResponse(response.resp.header, bytes.NewBuffer(response.resp.data)) + } + + d.responses = d.responses[1:] + + return fut +} diff --git a/internal/testing/doerwithwatcher.go b/internal/testing/doerwithwatcher.go new file mode 100644 index 0000000..c2231e5 --- /dev/null +++ b/internal/testing/doerwithwatcher.go @@ -0,0 +1,96 @@ +// Package testing provides a mock implementation of the tarantool.Doer and other interfaces. +// It is used for testing purposes. +package testing + +import ( + "fmt" + "time" + + "github.com/tarantool/go-tarantool/v2" +) + +// MockDoerWithWatcher is a mock implementation of the tarantool.DoerWithWatcher interface. +type MockDoerWithWatcher struct { + mockDoer *MockDoer + + events map[string][]tarantool.WatchEvent +} + +// NewMockDoerWithWatcher returns a new mock doer with watcher. +func NewMockDoerWithWatcher(doer *MockDoer, events map[string][]tarantool.WatchEvent) *MockDoerWithWatcher { + return &MockDoerWithWatcher{ + mockDoer: doer, + events: events, + } +} + +const ( + delayBeforeFirstEvent = 100 * time.Millisecond +) + +// Do returns a new future. +func (m *MockDoerWithWatcher) Do(req tarantool.Request) *tarantool.Future { + return m.mockDoer.Do(req) +} + +// NewWatcher returns a new watcher. +func (m *MockDoerWithWatcher) NewWatcher(path string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + eventList, ok := m.events[path] + if !ok { + panic(fmt.Sprintf("event list %s not found", path)) + } + + dummy := &dummyWatcher{ + cb: callback, + isStopped: make(chan struct{}), + + eventListPos: 0, + eventList: eventList, + } + + go func() { + time.Sleep(delayBeforeFirstEvent) + dummy.Start() + }() + + return dummy, nil +} + +type dummyWatcher struct { + cb tarantool.WatchCallback + isStopped chan struct{} + + eventListPos int + eventList []tarantool.WatchEvent +} + +const ( + delayBetweenEvents = 10 * time.Millisecond +) + +func (w *dummyWatcher) Start() { + go func() { + for { + select { + case <-w.isStopped: + return + default: + } + + if w.eventListPos >= len(w.eventList) { + return + } + + w.cb(w.eventList[w.eventListPos]) + + w.eventListPos++ + + time.Sleep(delayBetweenEvents) + } + }() +} + +// Unregister stops the watcher. +func (w *dummyWatcher) Unregister() { + close(w.isStopped) +} diff --git a/internal/testing/request.go b/internal/testing/request.go new file mode 100644 index 0000000..c8c7779 --- /dev/null +++ b/internal/testing/request.go @@ -0,0 +1,54 @@ +package testing + +import ( + "context" + "io" + + "github.com/tarantool/go-iproto" + "github.com/tarantool/go-tarantool/v2" + "github.com/vmihailenco/msgpack/v5" +) + +// MockRequest is an empty mock request used for testing purposes. +type MockRequest struct { +} + +// NewMockRequest creates an empty MockRequest. +func NewMockRequest() *MockRequest { + return &MockRequest{} +} + +// Type returns an iproto type for MockRequest. +func (req *MockRequest) Type() iproto.Type { + return iproto.Type(0) +} + +// Async returns if MockRequest expects a response. +func (req *MockRequest) Async() bool { + return false +} + +// Body fills an msgpack.Encoder with the watch request body. +func (req *MockRequest) Body(_ tarantool.SchemaResolver, _ *msgpack.Encoder) error { + return nil +} + +// Conn returns the Connection object the request belongs to. +func (req *MockRequest) Conn() *tarantool.Connection { + return &tarantool.Connection{ + Greeting: nil, + } +} + +// Ctx returns a context of the MockRequest. +func (req *MockRequest) Ctx() context.Context { + return nil +} + +// Response creates a response for the MockRequest. +func (req *MockRequest) Response( + header tarantool.Header, body io.Reader, +) (tarantool.Response, error) { + resp, err := CreateMockResponse(header, body) + return resp, err +} diff --git a/internal/testing/response.go b/internal/testing/response.go new file mode 100644 index 0000000..56a1bb9 --- /dev/null +++ b/internal/testing/response.go @@ -0,0 +1,81 @@ +package testing + +import ( + "bytes" + "io" + + "github.com/vmihailenco/msgpack/v5" + + "github.com/tarantool/go-tarantool/v2" +) + +// MockResponse is a mock response used for testing purposes. +type MockResponse struct { + // header contains response header. + header tarantool.Header + // data contains data inside a response. + data []byte +} + +// NewMockResponse creates a new MockResponse with an empty header and the given data. +// body should be passed as a structure to be encoded. +// The encoded body is served as response data and will be decoded once the +// response is decoded. +func NewMockResponse(t T, body interface{}) *MockResponse { + t.Helper() + + buf := bytes.NewBuffer([]byte{}) + enc := msgpack.NewEncoder(buf) + + err := enc.Encode(body) + if err != nil { + t.Errorf("unexpected error while encoding: %s", err) + } + + return &MockResponse{ + header: tarantool.Header{RequestId: 0, Error: 0}, + data: buf.Bytes(), + } +} + +// CreateMockResponse creates a MockResponse from the header and a data, +// packed inside an io.Reader. +func CreateMockResponse(header tarantool.Header, body io.Reader) (*MockResponse, error) { + if body == nil { + return &MockResponse{header: header, data: nil}, nil + } + + data, err := io.ReadAll(body) + if err != nil { + return nil, err //nolint:wrapcheck + } + + return &MockResponse{header: header, data: data}, nil +} + +// Header returns a header for the MockResponse. +func (resp *MockResponse) Header() tarantool.Header { + return resp.header +} + +// Decode returns the result of decoding the response data as slice. +func (resp *MockResponse) Decode() ([]interface{}, error) { + if resp.data == nil { + return nil, nil + } + + dec := msgpack.NewDecoder(bytes.NewBuffer(resp.data)) + + return dec.DecodeSlice() //nolint:wrapcheck +} + +// DecodeTyped returns the result of decoding the response data. +func (resp *MockResponse) DecodeTyped(res interface{}) error { + if resp.data == nil { + return nil + } + + dec := msgpack.NewDecoder(bytes.NewBuffer(resp.data)) + + return dec.Decode(res) //nolint:wrapcheck +} diff --git a/internal/testing/t.go b/internal/testing/t.go new file mode 100644 index 0000000..4ed73cb --- /dev/null +++ b/internal/testing/t.go @@ -0,0 +1,40 @@ +package testing + +import ( + "fmt" + "os" +) + +// T is a dummy implementation of the testing.T interface to use in examples. +type T interface { + Helper() + Log(args ...interface{}) + Logf(format string, args ...interface{}) + Fatalf(format string, args ...interface{}) + Errorf(format string, args ...interface{}) +} + +type dummyT struct{} + +func (t *dummyT) Helper() {} + +func (t *dummyT) Log(args ...interface{}) { + _, _ = fmt.Fprintln(os.Stderr, args...) +} + +func (t *dummyT) Logf(format string, args ...interface{}) { + _, _ = fmt.Fprintf(os.Stderr, format, args...) +} + +func (t *dummyT) Fatalf(format string, args ...interface{}) { + panic("fatal error: " + fmt.Sprintf(format, args...)) +} + +func (t *dummyT) Errorf(format string, args ...interface{}) { + panic("error: " + fmt.Sprintf(format, args...)) +} + +// NewT returns a new dummy T instance. +func NewT() T { + return &dummyT{} +} diff --git a/kv/kv.go b/kv/kv.go index 85f6ed9..178d9a2 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -10,10 +10,6 @@ type KeyValue struct { // Value is the serialized representation of the value. Value []byte - // CreateRevision is the revision number when this key was created. - CreateRevision int64 // ModRevision is the revision number of the last modification to this key. ModRevision int64 - // Version is the version counter for key modifications. - Version int64 } diff --git a/storage_test.go b/storage_test.go index ec37fed..6e7ac73 100644 --- a/storage_test.go +++ b/storage_test.go @@ -131,7 +131,7 @@ func TestTx_Commit_Error(t *testing.T) { txInstance.If(pred).Then(thenOp) - expectedError := errors.New("driver execution failed") //nolint:err113 // Test error is fine + expectedError := errors.New("driver execution failed") mockDriver.ExecuteMock.Expect(ctx, []predicate.Predicate{pred}, diff --git a/tx/requestresponse.go b/tx/requestresponse.go index 3c25f0b..ee3c04d 100644 --- a/tx/requestresponse.go +++ b/tx/requestresponse.go @@ -4,12 +4,6 @@ import "github.com/tarantool/go-storage/kv" // RequestResponse represents the response for an individual transaction operation. type RequestResponse struct { - // KeyValue contains the result data for Get operations. - KeyValue *kv.KeyValue - - // Success indicates whether the operation was successful. - Success bool - - // Error contains any error that occurred during the operation. - Error error + // Values contains the result data for Get operations. + Values []kv.KeyValue } diff --git a/watch/event.go b/watch/event.go index 97a50a6..762fb3b 100644 --- a/watch/event.go +++ b/watch/event.go @@ -2,51 +2,8 @@ // It enables real-time monitoring of key changes through event streams. package watch -import ( - "github.com/tarantool/go-storage/kv" -) - -// EventType represents the type of watch event. -type EventType int - -const ( - // EventPut indicates a key was created or updated. - EventPut EventType = iota - // EventDelete indicates a key was deleted. - EventDelete -) - -func (t EventType) String() string { - switch t { - case EventPut: - return "Put" - case EventDelete: - return "Delete" - default: - return "Unknown" - } -} - // Event represents a change notification from the watch stream. type Event struct { - // Type indicates whether this is a put or delete event. - Type EventType - // Key is the key that was changed. - Key []byte - // Value contains the new value for put events, nil for delete events. - Value []byte - // Rev is the revision number of the event. - Rev int64 -} - -// AsKeyValue converts the Event to a KeyValue structure. -// For delete events, the Value field will be nil. -func (e *Event) AsKeyValue() kv.KeyValue { - return kv.KeyValue{ - Key: e.Key, - Value: e.Value, - CreateRevision: 0, - ModRevision: 0, - Version: 0, - } + // Prefix indicates key/prefix of what was changed. + Prefix []byte }