Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go] Add client batching integration tests #1555

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 62 additions & 6 deletions src/clients/go/tb_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package tigerbeetle_go
import (
"bytes"
"fmt"
"math/big"
"os"
"os/exec"
"runtime"
"sync"
"testing"
"unsafe"

Expand All @@ -14,10 +16,11 @@ import (
)

const (
TIGERBEETLE_PORT = "3000"
TIGERBEETLE_CLUSTER_ID uint64 = 0
TIGERBEETLE_REPLICA_ID uint32 = 0
TIGERBEETLE_REPLICA_COUNT uint32 = 1
TIGERBEETLE_PORT = "3000"
TIGERBEETLE_CLUSTER_ID uint64 = 0
TIGERBEETLE_REPLICA_ID uint32 = 0
TIGERBEETLE_REPLICA_COUNT uint32 = 1
TIGERBEETLE_CONCURRENCY_MAX uint = 4096
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we bump this to 8192 to reflect the value in tb_client/context.zig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the way we handle TooManyOutstanding isn't quite right. This test exposes a bug when running with concurrency_max higher than the size of the DemuxPool (which is 4096).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. tb_client wasn't adjusted properly after DemuxPool was introduced. It should be addressed by the intrusive packets PR at least.

)

func HexStringToUint128(value string) types.Uint128 {
Expand Down Expand Up @@ -70,8 +73,7 @@ func WithClient(s testing.TB, withClient func(Client)) {
})

addresses := []string{"127.0.0.1:" + TIGERBEETLE_PORT}
concurrencyMax := uint(32)
client, err := NewClient(types.ToUint128(TIGERBEETLE_CLUSTER_ID), addresses, concurrencyMax)
client, err := NewClient(types.ToUint128(TIGERBEETLE_CLUSTER_ID), addresses, TIGERBEETLE_CONCURRENCY_MAX)
if err != nil {
s.Fatal(err)
}
Expand Down Expand Up @@ -230,6 +232,60 @@ func doTestClient(s *testing.T, client Client) {
assert.Equal(t, types.ToUint128(0), accountB.DebitsPending)
})

s.Run("can create concurrent transfers", func(t *testing.T) {
const TRANSFERS_MAX = 1_000_000
concurrencyMax := make(chan struct{}, TIGERBEETLE_CONCURRENCY_MAX)

accounts, err := client.LookupAccounts([]types.Uint128{accountA.ID, accountB.ID})
if err != nil {
t.Fatal(err)
}
assert.Len(t, accounts, 2)
accountACredits := accounts[0].CreditsPosted.BigInt()
accountBDebits := accounts[1].DebitsPosted.BigInt()

var waitGroup sync.WaitGroup
for i := 0; i < TRANSFERS_MAX; i++ {
waitGroup.Add(1)

go func(i int) {
defer waitGroup.Done()

concurrencyMax <- struct{}{}
results, err := client.CreateTransfers([]types.Transfer{
{
ID: types.ToUint128(uint64(TRANSFERS_MAX + i)),
CreditAccountID: accountA.ID,
DebitAccountID: accountB.ID,
Amount: types.ToUint128(1),
Ledger: 1,
Code: 1,
},
})
<-concurrencyMax
if err != nil {
t.Fatal(err)
}

assert.Empty(t, results)
}(i)
}
waitGroup.Wait()

accounts, err = client.LookupAccounts([]types.Uint128{accountA.ID, accountB.ID})
if err != nil {
t.Fatal(err)
}
assert.Len(t, accounts, 2)
accountACreditsAfter := accounts[0].CreditsPosted.BigInt()
accountBDebitsAfter := accounts[1].DebitsPosted.BigInt()

// Each transfer moves ONE unit,
// so the credit/debit must differ from TRANSFERS_MAX units:
assert.Equal(t, TRANSFERS_MAX, big.NewInt(0).Sub(&accountACreditsAfter, &accountACredits).Int64())
assert.Equal(t, TRANSFERS_MAX, big.NewInt(0).Sub(&accountBDebitsAfter, &accountBDebits).Int64())
})

s.Run("can query transfers for an account", func(t *testing.T) {
// Create a new account:
accountC := types.Account{
Expand Down