Skip to content

Commit

Permalink
bugfix: allow multiple Future.Get*()
Browse files Browse the repository at this point in the history
After the patch, a user can call Future.GetTyped() or Future.Get()
multiple times in any order.

Needed for tarantool/tt#54
  • Loading branch information
oleg-jukovec committed Sep 21, 2022
1 parent 8b58928 commit fc20be0
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Addresses in ConnectionPool may be changed from an external code (#208)
- ConnectionPool recreates connections too often (#208)
- A connection is still opened after ConnectionPool.Close() (#208)
- Future.GetTyped() after Future.Get() does not decode response
correctly (#213)

## [1.8.0] - 2022-08-17

Expand Down
6 changes: 0 additions & 6 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,6 @@ func (fut *Future) Get() (*Response, error) {
return fut.resp, fut.err
}
err := fut.resp.decodeBody()
if err != nil {
fut.err = err
}
return fut.resp, err
}

Expand All @@ -200,9 +197,6 @@ func (fut *Future) GetTyped(result interface{}) error {
return fut.err
}
err := fut.resp.decodeBodyTyped(result)
if err != nil {
fut.err = err
}
return err
}

Expand Down
6 changes: 6 additions & 0 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (resp *Response) decodeHeader(d *decoder) (err error) {

func (resp *Response) decodeBody() (err error) {
if resp.buf.Len() > 2 {
offset := resp.buf.Offset()
defer resp.buf.Seek(offset)

var l int
var stmtID, bindCount uint64

Expand Down Expand Up @@ -211,6 +214,9 @@ func (resp *Response) decodeBody() (err error) {

func (resp *Response) decodeBodyTyped(res interface{}) (err error) {
if resp.buf.Len() > 0 {
offset := resp.buf.Offset()
defer resp.buf.Seek(offset)

var l int
d := newDecoder(&resp.buf)
if l, err = d.DecodeMapLen(); err != nil {
Expand Down
15 changes: 15 additions & 0 deletions smallbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ func (s *smallBuf) Bytes() []byte {
return nil
}

func (s *smallBuf) Offset() int {
return s.p
}

func (s *smallBuf) Seek(offset int) error {
if offset < 0 {
return errors.New("too small offset")
}
if offset > len(s.b) {
return errors.New("too big offset")
}
s.p = offset
return nil
}

type smallWBuf struct {
b []byte
sum uint
Expand Down
99 changes: 97 additions & 2 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,81 @@ func BenchmarkSQLSerial(b *testing.B) {
}
}

func TestFutureMultipleGetGetTyped(t *testing.T) {
conn := test_helpers.ConnectWithValidation(t, server, opts)
defer conn.Close()

fut := conn.Call17Async("simple_concat", []interface{}{"1"})

for i := 0; i < 30; i++ {
// [0, 10) fut.Get()
// [10, 20) fut.GetTyped()
// [20, 30) Mix
get := false
if (i < 10) || (i >= 20 && i%2 == 0) {
get = true
}

if get {
resp, err := fut.Get()
if err != nil {
t.Errorf("Failed to call Get(): %s", err)
}
if val, ok := resp.Data[0].(string); !ok || val != "11" {
t.Errorf("Wrong Get() result: %v", resp.Data)
}
} else {
tpl := struct {
Val string
}{}
err := fut.GetTyped(&tpl)
if err != nil {
t.Errorf("Failed to call GetTyped(): %s", err)
}
if tpl.Val != "11" {
t.Errorf("Wrong GetTyped() result: %v", tpl)
}
}
}
}

func TestFutureMultipleGetWithError(t *testing.T) {
conn := test_helpers.ConnectWithValidation(t, server, opts)
defer conn.Close()

fut := conn.Call17Async("non_exist", []interface{}{"1"})

for i := 0; i < 2; i++ {
if _, err := fut.Get(); err == nil {
t.Fatalf("An error expected")
}
}
}

func TestFutureMultipleGetTypedWithError(t *testing.T) {
conn := test_helpers.ConnectWithValidation(t, server, opts)
defer conn.Close()

fut := conn.Call17Async("simple_concat", []interface{}{"1"})

wrongTpl := struct {
Val int
}{}
goodTpl := struct {
Val string
}{}

if err := fut.GetTyped(&wrongTpl); err == nil {
t.Fatalf("An error expected")
}
if err := fut.GetTyped(&goodTpl); err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if goodTpl.Val != "11" {
t.Fatalf("Wrong result: %s", goodTpl.Val)
}
}

///////////////////

func TestClient(t *testing.T) {
Expand Down Expand Up @@ -1069,7 +1144,7 @@ func TestClientSessionPush(t *testing.T) {
} else if len(resp.Data) < 1 {
t.Errorf("Response.Data is empty after Call17Async")
} else if val, err := convertUint64(resp.Data[0]); err != nil || val != pushMax {
t.Errorf("result is not {{1}} : %v", resp.Data)
t.Errorf("Result is not %d: %v", pushMax, resp.Data)
}

// It will will be iterated with a timeout.
Expand Down Expand Up @@ -1103,7 +1178,7 @@ func TestClientSessionPush(t *testing.T) {
} else {
respCnt += 1
if val, err := convertUint64(resp.Data[0]); err != nil || val != pushMax {
t.Errorf("result is not {{1}} : %v", resp.Data)
t.Errorf("Result is not %d: %v", pushMax, resp.Data)
}
}
}
Expand All @@ -1120,6 +1195,26 @@ func TestClientSessionPush(t *testing.T) {
t.Errorf("Expect %d responses but got %d", 1, respCnt)
}
}

// We can collect original responses after iterations.
for _, fut := range []*Future{fut0, fut1, fut2} {
resp, err := fut.Get()
if err != nil {
t.Errorf("Unable to call fut.Get(): %s", err)
} else if val, err := convertUint64(resp.Data[0]); err != nil || val != pushMax {
t.Errorf("Result is not %d: %v", pushMax, resp.Data)
}

tpl := struct {
Val int
}{}
err = fut.GetTyped(&tpl)
if err != nil {
t.Errorf("Unable to call fut.GetTyped(): %s", err)
} else if tpl.Val != pushMax {
t.Errorf("Result is not %d: %d", pushMax, tpl.Val)
}
}
}

const (
Expand Down

0 comments on commit fc20be0

Please sign in to comment.