Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: added region cache warmup #250

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Client interface {
CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
expectedValue []byte) (bool, error)
SendBatch(ctx context.Context, batch []hrpc.Call) (res []hrpc.RPCResult, allOK bool)
CacheRegions(table []byte) error
Close()
}

Expand Down Expand Up @@ -391,3 +392,10 @@ func (c *client) CheckAndPut(p *hrpc.Mutate, family string,

return r.GetProcessed(), nil
}

// CacheRegions scan the meta region to get all the regions and populate to cache.
// This can be used to warm up region cache
func (c *client) CacheRegions(table []byte) error {
_, err := c.findAllRegions(context.Background(), table)
return err
}
38 changes: 38 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2374,3 +2374,41 @@ func TestDebugState(t *testing.T) {
assert.Equal(t, 1, len(keyRegionCache.(map[string]interface{})))
assert.Equal(t, 1, len(clientRegionCache.(map[string]interface{}))) // only have one client
}

type regionInfoAndAddr struct {
regionInfo hrpc.RegionInfo
addr string
}

// Test loading region cache
func TestCacheRegions(t *testing.T) {
c := gohbase.NewClient(*host)
defer c.Close()

// make sure region cache is empty at startup
var jsonUnMarshalStart map[string]interface{}
jsonVal, err := gohbase.DebugState(c)
err = json.Unmarshal(jsonVal, &jsonUnMarshalStart)
if err != nil {
t.Fatalf("Encoutered eror when Unmarshalling: %v", err)
}
cacheLength := len(jsonUnMarshalStart["KeyRegionCache"].(map[string]interface{}))
if cacheLength != 0 {
t.Fatal("expected empty region cache when creating a new client")
}

c.CacheRegions([]byte(table))

var jsonUnMarshalCached map[string]interface{}
jsonVal, err = gohbase.DebugState(c)
err = json.Unmarshal(jsonVal, &jsonUnMarshalCached)
if err != nil {
t.Fatalf("Encoutered eror when Unmarshalling: %v", err)
}
// CreateTable init function starts hbase with 4 regions
cacheLength = len(jsonUnMarshalCached["KeyRegionCache"].(map[string]interface{}))
if cacheLength != 4 {
t.Fatalf("Expect 4 regions but got: %v", cacheLength)
}

}
14 changes: 9 additions & 5 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func (e NotServingRegionError) Error() string {

// client manages a connection to a RegionServer.
type client struct {
conn net.Conn
connM sync.Mutex
conn net.Conn

// Address of the RegionServer.
addr string
Expand Down Expand Up @@ -784,9 +785,12 @@ func (c *client) MarshalJSON() ([]byte, error) {

// if conn is nil then we don't want to panic. So just get the addresses if conn is not nil
var localAddr, remoteAddr Address
if c.conn != nil {
localAddress := c.conn.LocalAddr()
remoteAddress := c.conn.RemoteAddr()
c.connM.Lock()
conn := c.conn
c.connM.Unlock()
if conn != nil {
localAddress := conn.LocalAddr()
remoteAddress := conn.RemoteAddr()
localAddr = Address{localAddress.Network(), localAddress.String()}
remoteAddr = Address{remoteAddress.Network(), remoteAddress.String()}
}
Expand All @@ -805,7 +809,7 @@ func (c *client) MarshalJSON() ([]byte, error) {
RegionServerAddress: c.addr,
ClientType: c.ctype,
InFlight: inFlight,
Id: c.id,
Id: atomic.LoadUint32(&c.id),
Done_status: done_status,
}

Expand Down
2 changes: 1 addition & 1 deletion region/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (i *info) MarshalJSON() ([]byte, error) {
StopKey: string(i.stopKey),
ContextInstance: fmt.Sprintf("%p", (i.ctx)),
Err: ctxError,
Client: fmt.Sprintf("%p", (i.client)),
Client: fmt.Sprintf("%p", (i.Client())),
Available: !i.IsUnavailable(),
}
jsonVal, err := json.Marshal(state)
Expand Down
7 changes: 5 additions & 2 deletions region/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.

func (c *client) Dial(ctx context.Context) error {
c.dialOnce.Do(func() {
var err error
c.conn, err = c.dialer(ctx, "tcp", c.addr)
conn, err := c.dialer(ctx, "tcp", c.addr)
if err != nil {
c.fail(fmt.Errorf("failed to dial RegionServer: %s", err))
return
}

c.connM.Lock()
c.conn = conn
c.connM.Unlock()

// time out send hello if it take long
if deadline, ok := ctx.Deadline(); ok {
if err = c.conn.SetWriteDeadline(deadline); err != nil {
Expand Down
130 changes: 130 additions & 0 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"google.golang.org/protobuf/proto"
)

type regionInfoAndAddr struct {
regionInfo hrpc.RegionInfo
addr string
}

// Constants
var (
// Name of the meta region.
Expand Down Expand Up @@ -554,6 +559,85 @@ func (c *client) lookupRegion(ctx context.Context,
}
}

func (c *client) lookupAllRegions(ctx context.Context,
table []byte) ([]regionInfoAndAddr, error) {
var regs []regionInfoAndAddr
var err error
backoff := backoffStart
for {
// If it takes longer than regionLookupTimeout, fail so that we can sleep
lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout)
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
}).Debug("looking up regions")

regs, err = c.metaLookupForTable(lookupCtx, table)
cancel()
if err == TableNotFound {
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"err": err,
}).Debug("hbase:meta does not know about this table")

return nil, err
} else if err == ErrClientClosed {
return nil, err
}

if err == nil {
log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"regionsAndAddr": regs,
}).Debug("looked up all regions")

return regs, nil
}

log.WithFields(log.Fields{
"table": strconv.Quote(string(table)),
"backoff": backoff,
"err": err,
}).Error("failed looking up regions")

// This will be hit if there was an error locating the region
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
return nil, err
}
}
}

func (c *client) findAllRegions(ctx context.Context, table []byte) ([]regionInfoAndAddr, error) {
regs, err := c.lookupAllRegions(ctx, table)
if err != nil {
return nil, err
}
for _, regaddr := range regs {
reg, addr := regaddr.regionInfo, regaddr.addr
reg.MarkUnavailable()

if reg != c.metaRegionInfo && reg != c.adminRegionInfo {
// Check that the region wasn't added to
// the cache while we were looking it up.
overlaps, replaced := c.regions.put(reg)
if !replaced {
// the same or younger regions are already in cache
continue
}

// otherwise, new region in cache, delete overlaps from client's cache
for _, r := range overlaps {
c.clients.del(r)
aaronbee marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Start a goroutine to connect to the region
go c.establishRegion(reg, addr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should establish a connection immediately. I would think that just loading the cache is enough and once one of the region is being used, then a connection will be established. No?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the SendRPC function, we first call getRegionFromCache to retrieve the region from the cache. If the region is not found in the cache, we then call findRegion to look up the region from the meta table. It seems that we are not establishing a connection when calling getRegionFromCache, but we are establishing one in findRegion. This leads me to think that we should establish a connection when loading a region into the cache. But I could be missing something

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dethi's suggestion is a good one, to make the connection establishment lazy, but I don't think it would work well today.
What you could do is remove the MarkUnavailable() and establishRegion calls. Then when an RPC is made getRegionAndClientForRPC will see that reg.Client() == nil for that region and then call reestablishRegion. The problem is that the way this works today is it calls establishRegion with "" as the addr, which results in re-looking up the region from the meta region. So, then we are back to a flurry of requests to the meta region during start up (though, only one per region).

I think we should come up with a better fix for this in a later change.

Copy link
Collaborator

@dethi dethi May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah my concern is because if you have 10k regions, this will cause 10k goroutines to start trying to establish a connection to maybe ~100 RegionServers. They will all get block behind a lock, that ensure that we establish only one client connection to a RegionServers, so that part isn't a problem. It's more the impact of all these goroutines on the Go scheduler, the CPU usage of the client on startup and what it would mean for the RegionServers as well if you have a few 10s clients doing the same thing at once. Because one of the thing we do in establishRegion is a Get to validate that we have properly established the connection to the right regionserver and that the region is online, see isRegionEstablished. That could cause quite a bit of storm.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sucks, and we are doing O(number of regions * number of RegionServers) work during this warmup because clientRegionCache.put is going to iterate over every RegionServer for each region added.

I still think we should try to merge this change as-is so that we can try out the tradeoffs and continue to work to improve things here. We might be starting 10k goroutines, but at least we aren't sending out 10k scan requests to the meta region.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, but need to get the CI to pass first

}

return regs, nil
}

func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) {
// The region was not in the cache, it
// must be looked up in the meta table
Expand Down Expand Up @@ -678,6 +762,52 @@ func (c *client) metaLookup(ctx context.Context,
return reg, addr, nil
}

// Creates the META key to search for all regions
func createAllRegionSearchKey(table []byte) []byte {
metaKey := make([]byte, 0, len(table)+1)
metaKey = append(metaKey, table...)
// '.' is the first byte greater than ','. Meta table entry has
// the format table,key,timestamp. By adding '.' to the stop row
// we scan all keys for table
metaKey = append(metaKey, '.')
return metaKey
}

// metaLookupForTable checks meta table for all the region in which the given table is.
func (c *client) metaLookupForTable(ctx context.Context,
table []byte) ([]regionInfoAndAddr, error) {
metaKey := createAllRegionSearchKey(table)
rpc, err := hrpc.NewScanRange(ctx, metaTableName, table, metaKey,
hrpc.Families(infoFamily))
if err != nil {
return nil, err
}

var regions []regionInfoAndAddr
scanner := c.Scan(rpc)
for {
resp, err := scanner.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}

reg, addr, err := region.ParseRegionInfo(resp)
if err != nil {
return nil, err
}

regions = append(regions, regionInfoAndAddr{regionInfo: reg, addr: addr})
}

if len(regions) == 0 {
return nil, TableNotFound
}
return regions, nil
}

func fullyQualifiedTable(reg hrpc.RegionInfo) []byte {
namespace := reg.Namespace()
table := reg.Table()
Expand Down
14 changes: 14 additions & 0 deletions rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,20 @@ func TestMetaLookupCanceledContext(t *testing.T) {
}
}

func TestMetaLookupAllRegionsCanceledContext(t *testing.T) {
c := newMockClient(nil)
// pretend regionserver:0 has meta table
rc := c.clients.put("regionserver:0", c.metaRegionInfo, newRegionClientFn("regionserver:0"))
c.metaRegionInfo.SetClient(rc)

ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := c.metaLookupForTable(ctx, []byte("tablenotfound"))
if err != context.Canceled {
t.Errorf("Expected error %v, got error %v", context.Canceled, err)
}
}

func TestConcurrentRetryableError(t *testing.T) {
ctrl := test.NewController(t)
defer ctrl.Finish()
Expand Down
14 changes: 14 additions & 0 deletions test/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading