Skip to content

Commit

Permalink
cache: added region cache warmup
Browse files Browse the repository at this point in the history
This change adds an option in gohbase to prepopulate its region
cache. It does a single scan of the meta table and cache all
regions.
  • Loading branch information
gogochickenleg committed May 6, 2024
1 parent fa78846 commit 5aac5a1
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 0 deletions.
8 changes: 8 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Client interface {
CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
expectedValue []byte) (bool, error)
SendBatch(ctx context.Context, batch []hrpc.Call) (res []hrpc.RPCResult, allOK bool)
CacheRegions(table []byte) error
Close()
}

Expand Down Expand Up @@ -391,3 +392,10 @@ func (c *client) CheckAndPut(p *hrpc.Mutate, family string,

return r.GetProcessed(), nil
}

// CacheRegions scan the meta region to get all the regions and populate to cache.
// This can be used to warm up region cache
func (c *client) CacheRegions(table []byte) error {
_, err := c.findAllRegions(context.Background(), table)
return err
}
41 changes: 41 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2374,3 +2374,44 @@ func TestDebugState(t *testing.T) {
assert.Equal(t, 1, len(keyRegionCache.(map[string]interface{})))
assert.Equal(t, 1, len(clientRegionCache.(map[string]interface{}))) // only have one client
}

type regionInfoAndAddr struct {
regionInfo hrpc.RegionInfo
addr string
}

// Test loading region cache
func TestCacheRegions(t *testing.T) {
c := gohbase.NewClient(*host)
defer c.Close()

// make sure region cache is empty at startup
var jsonUnMarshalStart map[string]interface{}
jsonVal, err := gohbase.DebugState(c)
err = json.Unmarshal(jsonVal, &jsonUnMarshalStart)
if err != nil {
t.Fatalf("Encoutered eror when Unmarshalling: %v", err)
}
cacheLength := len(jsonUnMarshalStart["KeyRegionCache"].(map[string]interface{}))
if cacheLength != 0 {
t.Fatal("expected empty region cache when creating a new client")
}

c.CacheRegions([]byte(table))
// This ensures CacheRegions function completes execution before
// invoking DebugState, addressing potential data race issues
time.Sleep(3 * time.Minute)

var jsonUnMarshalCached map[string]interface{}
jsonVal, err = gohbase.DebugState(c)
err = json.Unmarshal(jsonVal, &jsonUnMarshalCached)
if err != nil {
t.Fatalf("Encoutered eror when Unmarshalling: %v", err)
}
// CreateTable init function starts hbase with 4 regions
cacheLength = len(jsonUnMarshalCached["KeyRegionCache"].(map[string]interface{}))
if cacheLength != 4 {
t.Fatalf("Expect 4 regions but got: %v", cacheLength)
}

}
130 changes: 130 additions & 0 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"google.golang.org/protobuf/proto"
)

type regionInfoAndAddr struct {
regionInfo hrpc.RegionInfo
addr string
}

// Constants
var (
// Name of the meta region.
Expand Down Expand Up @@ -554,6 +559,85 @@ func (c *client) lookupRegion(ctx context.Context,
}
}

func (c *client) lookupAllRegions(ctx context.Context,
table []byte) ([]regionInfoAndAddr, error) {
var regs []regionInfoAndAddr
var err error
backoff := backoffStart
for {
// If it takes longer than regionLookupTimeout, fail so that we can sleep
lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout)
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
}).Debug("looking up regions")

regs, err = c.metaLookupForTable(lookupCtx, table)
cancel()
if err == TableNotFound {
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"err": err,
}).Debug("hbase:meta does not know about this table")

return nil, err
} else if err == ErrClientClosed {
return nil, err
}

if err == nil {
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"regionsAndAddr": regs,
}).Debug("looked up all regions")

return regs, nil
}

log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"backoff": backoff,
"err": err,
}).Error("failed looking up regions")

// This will be hit if there was an error locating the region
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
return nil, err
}
}
}

func (c *client) findAllRegions(ctx context.Context, table []byte) ([]regionInfoAndAddr, error) {
regs, err := c.lookupAllRegions(ctx, table)
if err != nil {
return nil, err
}
for _, regaddr := range regs {
reg, addr := regaddr.regionInfo, regaddr.addr
reg.MarkUnavailable()

if reg != c.metaRegionInfo && reg != c.adminRegionInfo {
// Check that the region wasn't added to
// the cache while we were looking it up.
overlaps, replaced := c.regions.put(reg)
if !replaced {
// the same or younger regions are already in cache
continue
}

// otherwise, new region in cache, delete overlaps from client's cache
for _, r := range overlaps {
c.clients.del(r)
}
}

// Start a goroutine to connect to the region
go c.establishRegion(reg, addr)
}

return regs, nil
}

func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) {
// The region was not in the cache, it
// must be looked up in the meta table
Expand Down Expand Up @@ -678,6 +762,52 @@ func (c *client) metaLookup(ctx context.Context,
return reg, addr, nil
}

// Creates the META key to search for all regions
func createAllRegionSearchKey(table []byte) []byte {
metaKey := make([]byte, 0, len(table)+1)
metaKey = append(metaKey, table...)
// '.' is the first byte greater than ','. Meta table entry has
// the format table,key,timestamp. By adding '.' to the stop row
// we scan all keys for table
metaKey = append(metaKey, '.')
return metaKey
}

// metaLookupForTable checks meta table for all the region in which the given table is.
func (c *client) metaLookupForTable(ctx context.Context,
table []byte) ([]regionInfoAndAddr, error) {
metaKey := createAllRegionSearchKey(table)
rpc, err := hrpc.NewScanRange(ctx, metaTableName, table, metaKey,
hrpc.Families(infoFamily))
if err != nil {
return nil, err
}

var regions []regionInfoAndAddr
scanner := c.Scan(rpc)
for {
resp, err := scanner.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}

reg, addr, err := region.ParseRegionInfo(resp)
if err != nil {
return nil, err
}

regions = append(regions, regionInfoAndAddr{regionInfo: reg, addr: addr})
}

if len(regions) == 0 {
return nil, TableNotFound
}
return regions, nil
}

func fullyQualifiedTable(reg hrpc.RegionInfo) []byte {
namespace := reg.Namespace()
table := reg.Table()
Expand Down
14 changes: 14 additions & 0 deletions rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,20 @@ func TestMetaLookupCanceledContext(t *testing.T) {
}
}

func TestMetaLookupAllRegionsCanceledContext(t *testing.T) {
c := newMockClient(nil)
// pretend regionserver:0 has meta table
rc := c.clients.put("regionserver:0", c.metaRegionInfo, newRegionClientFn("regionserver:0"))
c.metaRegionInfo.SetClient(rc)

ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := c.metaLookupForTable(ctx, []byte("tablenotfound"))
if err != context.Canceled {
t.Errorf("Expected error %v, got error %v", context.Canceled, err)
}
}

func TestConcurrentRetryableError(t *testing.T) {
ctrl := test.NewController(t)
defer ctrl.Finish()
Expand Down
14 changes: 14 additions & 0 deletions test/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5aac5a1

Please sign in to comment.