Skip to content

Commit

Permalink
gohbase: modify behavior of scanner.Next()
Browse files Browse the repository at this point in the history
- In case context expires during scanning, return actual context.Err()
  instead of io.EOF. This fixes an issue where TableNotFound is
  returned to client in case context is expired during region lookup.
- Get rid of pre-fetching of next results. This can be trivially
  done by the callers if desired.

Change-Id: I89befc64126e9610845c4f8132ebff2dd2fa411d
  • Loading branch information
timoha committed May 24, 2018
1 parent 0a34d3c commit 7748655
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 276 deletions.
12 changes: 4 additions & 8 deletions hrpc/scan.go
Expand Up @@ -38,15 +38,11 @@ const (
// otherwise Close method should be called.
type Scanner interface {
// Next returns a row at a time.
// Once all rows are returned, subsequent calls will return nil and io.EOF.
// Once all rows are returned, subsequent calls will return io.EOF error.
//
// In case of an error or Close() was called, only the first call to Next() will
// return partial result (could be not a complete row) and the actual error,
// the subsequent calls will return nil and io.EOF.
//
// In case a scan rpc has an expired context, partial result and io.EOF will be
// returned. Clients should check the error of the context they passed if they
// want to if the scanner was closed because of the deadline.
// In case of an error, only the first call to Next() will return partial
// result (could be not a complete row) and the actual error,
// the subsequent calls will return io.EOF error.
//
// This method is thread safe.
Next() (*Result, error)
Expand Down
11 changes: 4 additions & 7 deletions integration_test.go
Expand Up @@ -1013,7 +1013,7 @@ func checkResultRow(t *testing.T, res *hrpc.Result, expectedRow string, err, exp
}

func TestScannerClose(t *testing.T) {
key := "TestScannerClose"
key := t.Name()
c := gohbase.NewClient(*host)
defer c.Close()

Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestScannerClose(t *testing.T) {
}

func TestScannerContextCancel(t *testing.T) {
key := "TestScanner"
key := t.Name()
c := gohbase.NewClient(*host)
defer c.Close()

Expand Down Expand Up @@ -1086,11 +1086,8 @@ func TestScannerContextCancel(t *testing.T) {

cancel()

// make sure we get io.EOF eventually
for {
if _, err = scanner.Next(); err == io.EOF {
break
}
if _, err = scanner.Next(); err != context.Canceled {
t.Fatalf("unexpected error %v, expected %v", err, context.Canceled)
}
}

Expand Down
9 changes: 6 additions & 3 deletions region/test_new.go
Expand Up @@ -262,9 +262,7 @@ func (c *testClient) QueueRPC(call hrpc.Call) {
if bytes.HasSuffix(call.Key(), bytes.Repeat([]byte{0}, 17)) {
// meta region probe, return empty to signify that region is online
call.ResultChan() <- hrpc.RPCResult{}
return
}
if bytes.HasPrefix(call.Key(), []byte("test,")) {
} else if bytes.HasPrefix(call.Key(), []byte("test,")) {
call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
Results: []*pb.Result{metaRow}}}
} else if bytes.HasPrefix(call.Key(), []byte("test1,,")) {
Expand All @@ -273,6 +271,11 @@ func (c *testClient) QueueRPC(call hrpc.Call) {
} else if bytes.HasPrefix(call.Key(), []byte("nsre,,")) {
call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
Results: []*pb.Result{nsreRegion}}}
} else if bytes.HasPrefix(call.Key(), []byte("tablenotfound,")) {
call.ResultChan() <- hrpc.RPCResult{Msg: &pb.ScanResponse{
Results: []*pb.Result{},
MoreResults: proto.Bool(false),
}}
} else {
call.ResultChan() <- hrpc.RPCResult{Msg: makeRegionResult(call.Key())}
}
Expand Down
6 changes: 0 additions & 6 deletions rpc.go
Expand Up @@ -367,12 +367,6 @@ func (c *client) metaLookup(ctx context.Context,

scanner := c.Scan(rpc)
resp, err := scanner.Next()
// TODO: the scanner might quietly fetch one
// more row before we close.
// Need to implement Limit(int) to avoid this behavior.
// There's a server side support for limit starting hbase 2.0
// otherwise we need to add tricky code on client side which might
// introduce unnecessary complexity.
scanner.Close()
if err == io.EOF {
return nil, "", TableNotFound
Expand Down
38 changes: 38 additions & 0 deletions rpc_test.go
Expand Up @@ -675,6 +675,44 @@ func TestErrConnotFindRegion(t *testing.T) {
}
}

func TestMetaLookupTableNotFound(t *testing.T) {
c := newMockClient(nil)

rc, err := region.NewClient(context.Background(), "regionserver:0",
region.RegionClient, 0, 0, "root", region.DefaultReadTimeout)
if err != nil {
t.Fatal(err)
}
// pretend regionserver:0 has meta table
c.metaRegionInfo.SetClient(rc)
c.clients.put(rc, c.metaRegionInfo)

_, _, err = c.metaLookup(context.Background(), []byte("tablenotfound"), []byte(t.Name()))
if err != TableNotFound {
t.Errorf("Expected error %v, got error %v", TableNotFound, err)
}
}

func TestMetaLookupCanceledContext(t *testing.T) {
c := newMockClient(nil)

rc, err := region.NewClient(context.Background(), "regionserver:0",
region.RegionClient, 0, 0, "root", region.DefaultReadTimeout)
if err != nil {
t.Fatal(err)
}
// pretend regionserver:0 has meta table
c.metaRegionInfo.SetClient(rc)
c.clients.put(rc, c.metaRegionInfo)

ctx, cancel := context.WithCancel(context.Background())
cancel()
_, _, err = c.metaLookup(ctx, []byte("tablenotfound"), []byte(t.Name()))
if err != context.Canceled {
t.Errorf("Expected error %v, got error %v", context.Canceled, err)
}
}

func TestConcurrentRetryableError(t *testing.T) {
ctrl := test.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit 7748655

Please sign in to comment.