diff --git a/client.go b/client.go index 76132896..00bd3e85 100644 --- a/client.go +++ b/client.go @@ -43,6 +43,7 @@ type Client interface { CheckAndPut(p *hrpc.Mutate, family string, qualifier string, expectedValue []byte) (bool, error) SendBatch(ctx context.Context, batch []hrpc.Call) (res []hrpc.RPCResult, allOK bool) + CacheRegions(table []byte) error Close() } @@ -391,3 +392,10 @@ func (c *client) CheckAndPut(p *hrpc.Mutate, family string, return r.GetProcessed(), nil } + +// CacheRegions scan the meta region to get all the regions and populate to cache. +// This can be used to warm up region cache +func (c *client) CacheRegions(table []byte) error { + _, err := c.findAllRegions(context.Background(), table) + return err +} diff --git a/integration_test.go b/integration_test.go index 63389ed9..7918d1dc 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2374,3 +2374,41 @@ func TestDebugState(t *testing.T) { assert.Equal(t, 1, len(keyRegionCache.(map[string]interface{}))) assert.Equal(t, 1, len(clientRegionCache.(map[string]interface{}))) // only have one client } + +type regionInfoAndAddr struct { + regionInfo hrpc.RegionInfo + addr string +} + +// Test loading region cache +func TestCacheRegions(t *testing.T) { + c := gohbase.NewClient(*host) + defer c.Close() + + // make sure region cache is empty at startup + var jsonUnMarshalStart map[string]interface{} + jsonVal, err := gohbase.DebugState(c) + err = json.Unmarshal(jsonVal, &jsonUnMarshalStart) + if err != nil { + t.Fatalf("Encoutered eror when Unmarshalling: %v", err) + } + cacheLength := len(jsonUnMarshalStart["KeyRegionCache"].(map[string]interface{})) + if cacheLength != 0 { + t.Fatal("expected empty region cache when creating a new client") + } + + c.CacheRegions([]byte(table)) + + var jsonUnMarshalCached map[string]interface{} + jsonVal, err = gohbase.DebugState(c) + err = json.Unmarshal(jsonVal, &jsonUnMarshalCached) + if err != nil { + t.Fatalf("Encoutered eror when Unmarshalling: %v", err) + } + // CreateTable init function starts hbase with 4 regions + cacheLength = len(jsonUnMarshalCached["KeyRegionCache"].(map[string]interface{})) + if cacheLength != 4 { + t.Fatalf("Expect 4 regions but got: %v", cacheLength) + } + +} diff --git a/region/client.go b/region/client.go index acd15703..7336b66d 100644 --- a/region/client.go +++ b/region/client.go @@ -158,7 +158,8 @@ func (e NotServingRegionError) Error() string { // client manages a connection to a RegionServer. type client struct { - conn net.Conn + connM sync.Mutex + conn net.Conn // Address of the RegionServer. addr string @@ -784,9 +785,12 @@ func (c *client) MarshalJSON() ([]byte, error) { // if conn is nil then we don't want to panic. So just get the addresses if conn is not nil var localAddr, remoteAddr Address - if c.conn != nil { - localAddress := c.conn.LocalAddr() - remoteAddress := c.conn.RemoteAddr() + c.connM.Lock() + conn := c.conn + c.connM.Unlock() + if conn != nil { + localAddress := conn.LocalAddr() + remoteAddress := conn.RemoteAddr() localAddr = Address{localAddress.Network(), localAddress.String()} remoteAddr = Address{remoteAddress.Network(), remoteAddress.String()} } @@ -805,7 +809,7 @@ func (c *client) MarshalJSON() ([]byte, error) { RegionServerAddress: c.addr, ClientType: c.ctype, InFlight: inFlight, - Id: c.id, + Id: atomic.LoadUint32(&c.id), Done_status: done_status, } diff --git a/region/info.go b/region/info.go index 2b6a3431..4000e559 100644 --- a/region/info.go +++ b/region/info.go @@ -384,7 +384,7 @@ func (i *info) MarshalJSON() ([]byte, error) { StopKey: string(i.stopKey), ContextInstance: fmt.Sprintf("%p", (i.ctx)), Err: ctxError, - Client: fmt.Sprintf("%p", (i.client)), + Client: fmt.Sprintf("%p", (i.Client())), Available: !i.IsUnavailable(), } jsonVal, err := json.Marshal(state) diff --git a/region/new.go b/region/new.go index 4ed37f2f..27fca777 100644 --- a/region/new.go +++ b/region/new.go @@ -50,13 +50,16 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time. func (c *client) Dial(ctx context.Context) error { c.dialOnce.Do(func() { - var err error - c.conn, err = c.dialer(ctx, "tcp", c.addr) + conn, err := c.dialer(ctx, "tcp", c.addr) if err != nil { c.fail(fmt.Errorf("failed to dial RegionServer: %s", err)) return } + c.connM.Lock() + c.conn = conn + c.connM.Unlock() + // time out send hello if it take long if deadline, ok := ctx.Deadline(); ok { if err = c.conn.SetWriteDeadline(deadline); err != nil { diff --git a/rpc.go b/rpc.go index e9166a38..0b20ce27 100644 --- a/rpc.go +++ b/rpc.go @@ -24,6 +24,11 @@ import ( "google.golang.org/protobuf/proto" ) +type regionInfoAndAddr struct { + regionInfo hrpc.RegionInfo + addr string +} + // Constants var ( // Name of the meta region. @@ -554,6 +559,85 @@ func (c *client) lookupRegion(ctx context.Context, } } +func (c *client) lookupAllRegions(ctx context.Context, + table []byte) ([]regionInfoAndAddr, error) { + var regs []regionInfoAndAddr + var err error + backoff := backoffStart + for { + // If it takes longer than regionLookupTimeout, fail so that we can sleep + lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout) + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + }).Debug("looking up regions") + + regs, err = c.metaLookupForTable(lookupCtx, table) + cancel() + if err == TableNotFound { + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + "err": err, + }).Debug("hbase:meta does not know about this table") + + return nil, err + } else if err == ErrClientClosed { + return nil, err + } + + if err == nil { + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + "regionsAndAddr": regs, + }).Debug("looked up all regions") + + return regs, nil + } + + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + "backoff": backoff, + "err": err, + }).Error("failed looking up regions") + + // This will be hit if there was an error locating the region + backoff, err = sleepAndIncreaseBackoff(ctx, backoff) + if err != nil { + return nil, err + } + } +} + +func (c *client) findAllRegions(ctx context.Context, table []byte) ([]regionInfoAndAddr, error) { + regs, err := c.lookupAllRegions(ctx, table) + if err != nil { + return nil, err + } + for _, regaddr := range regs { + reg, addr := regaddr.regionInfo, regaddr.addr + reg.MarkUnavailable() + + if reg != c.metaRegionInfo && reg != c.adminRegionInfo { + // Check that the region wasn't added to + // the cache while we were looking it up. + overlaps, replaced := c.regions.put(reg) + if !replaced { + // the same or younger regions are already in cache + continue + } + + // otherwise, new region in cache, delete overlaps from client's cache + for _, r := range overlaps { + c.clients.del(r) + } + } + + // Start a goroutine to connect to the region + go c.establishRegion(reg, addr) + } + + return regs, nil +} + func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) { // The region was not in the cache, it // must be looked up in the meta table @@ -678,6 +762,52 @@ func (c *client) metaLookup(ctx context.Context, return reg, addr, nil } +// Creates the META key to search for all regions +func createAllRegionSearchKey(table []byte) []byte { + metaKey := make([]byte, 0, len(table)+1) + metaKey = append(metaKey, table...) + // '.' is the first byte greater than ','. Meta table entry has + // the format table,key,timestamp. By adding '.' to the stop row + // we scan all keys for table + metaKey = append(metaKey, '.') + return metaKey +} + +// metaLookupForTable checks meta table for all the region in which the given table is. +func (c *client) metaLookupForTable(ctx context.Context, + table []byte) ([]regionInfoAndAddr, error) { + metaKey := createAllRegionSearchKey(table) + rpc, err := hrpc.NewScanRange(ctx, metaTableName, table, metaKey, + hrpc.Families(infoFamily)) + if err != nil { + return nil, err + } + + var regions []regionInfoAndAddr + scanner := c.Scan(rpc) + for { + resp, err := scanner.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + reg, addr, err := region.ParseRegionInfo(resp) + if err != nil { + return nil, err + } + + regions = append(regions, regionInfoAndAddr{regionInfo: reg, addr: addr}) + } + + if len(regions) == 0 { + return nil, TableNotFound + } + return regions, nil +} + func fullyQualifiedTable(reg hrpc.RegionInfo) []byte { namespace := reg.Namespace() table := reg.Table() diff --git a/rpc_test.go b/rpc_test.go index 17727f69..c87b2e8c 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -702,6 +702,20 @@ func TestMetaLookupCanceledContext(t *testing.T) { } } +func TestMetaLookupAllRegionsCanceledContext(t *testing.T) { + c := newMockClient(nil) + // pretend regionserver:0 has meta table + rc := c.clients.put("regionserver:0", c.metaRegionInfo, newRegionClientFn("regionserver:0")) + c.metaRegionInfo.SetClient(rc) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := c.metaLookupForTable(ctx, []byte("tablenotfound")) + if err != context.Canceled { + t.Errorf("Expected error %v, got error %v", context.Canceled, err) + } +} + func TestConcurrentRetryableError(t *testing.T) { ctrl := test.NewController(t) defer ctrl.Finish() diff --git a/test/mock/client.go b/test/mock/client.go index cdb6fb60..fb3e926c 100644 --- a/test/mock/client.go +++ b/test/mock/client.go @@ -50,6 +50,20 @@ func (mr *MockClientMockRecorder) Append(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Append", reflect.TypeOf((*MockClient)(nil).Append), arg0) } +// CacheRegions mocks base method. +func (m *MockClient) CacheRegions(arg0 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CacheRegions", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CacheRegions indicates an expected call of CacheRegions. +func (mr *MockClientMockRecorder) CacheRegions(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CacheRegions", reflect.TypeOf((*MockClient)(nil).CacheRegions), arg0) +} + // CheckAndPut mocks base method. func (m *MockClient) CheckAndPut(arg0 *hrpc.Mutate, arg1, arg2 string, arg3 []byte) (bool, error) { m.ctrl.T.Helper()