Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Jun 7, 2017
1 parent 25224fe commit 89dd059
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
35 changes: 23 additions & 12 deletions pd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Client interface {
// Close closes the client.
Close()
// GetTSAsync gets a timestamp from PD, it's a nonblock function.
GetTSAsync() *TSOResponse
GetTSAsync(ctx context.Context) *TSOResponse
}

type tsoRequest struct {
Expand Down Expand Up @@ -399,6 +399,11 @@ func (c *client) Close() {
req.done <- errors.Trace(errClosing)
}

c.asyncTSOMu.Lock()
c.asyncTSOMu.cli.close()
c.asyncTSOMu.cli = nil
c.asyncTSOMu.Unlock()

c.connMu.Lock()
defer c.connMu.Unlock()
for _, cc := range c.connMu.clientConns {
Expand Down Expand Up @@ -539,9 +544,8 @@ func addrsToUrls(addrs []string) []string {
}

func (c *client) pdTSOClient() pdpb.PD_TsoClient {
ctx := context.Background()
for {
stream, err := c.leaderClient().Tso(ctx)
stream, err := c.leaderClient().Tso(c.ctx)
if err == nil {
return stream
}
Expand All @@ -557,15 +561,20 @@ func (c *client) pdTSOClient() pdpb.PD_TsoClient {

// TSOResponse is the result of GetTSAsync.
type TSOResponse struct {
wg sync.WaitGroup
ch chan struct{}
ctx context.Context
err error
physical int64
logical int64
}

// Wait gets the result data, it may block the caller if data not available yet.
func (r *TSOResponse) Wait() (int64, int64, error) {
r.wg.Wait()
select {
case <-r.ch:
case <-r.ctx.Done():
r.err = r.ctx.Err()
}
return r.physical, r.logical, errors.Trace(r.err)
}

Expand Down Expand Up @@ -601,7 +610,7 @@ func (async *asyncTSOClient) close() {
for unconsumed := range async.workCh {
for i := 0; i < len(unconsumed); i++ {
unconsumed[i].err = errors.New("tso client closed")
unconsumed[i].wg.Done()
close(unconsumed[i].ch)
}
}
}
Expand Down Expand Up @@ -670,30 +679,32 @@ func finishBatch(batch []*TSOResponse, physical, logical int64, err error) {
val.physical = physical
val.logical = logical + int64(i)
val.err = err
val.wg.Done()
close(val.ch)
}
}

func (async *asyncTSOClient) call() (*TSOResponse, error) {
func (async *asyncTSOClient) call(ctx context.Context) (*TSOResponse, error) {
async.mu.RLock()
if async.mu.unhealth {
return nil, errors.New("This asyncTSOClient maybe stale")
}
async.mu.RUnlock()

ret := &TSOResponse{}
ret.wg.Add(1)
ret := &TSOResponse{
ch: make(chan struct{}),
ctx: ctx,
}
async.input <- ret
return ret, nil
}

func (c *client) GetTSAsync() *TSOResponse {
func (c *client) GetTSAsync(ctx context.Context) *TSOResponse {
for {
c.asyncTSOMu.RLock()
cli := c.asyncTSOMu.cli
c.asyncTSOMu.RUnlock()

ret, err := cli.call()
ret, err := cli.call(ctx)
if err == nil {
return ret
}
Expand Down
8 changes: 4 additions & 4 deletions pd-client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func (s *testClientSuite) TestTSO(c *C) {
func (s *testClientSuite) TestTSOAsync(c *C) {
var tss []int64
for i := 0; i < 100; i++ {
resp := s.client.GetTSAsync()
resp.wg.Wait()
resp := s.client.GetTSAsync(context.Background())
resp.Wait()
c.Assert(resp.err, IsNil)
tss = append(tss, resp.physical<<18+resp.logical)
}
Expand All @@ -201,8 +201,8 @@ func (s *testClientSuite) TestTSOAsyncRace(c *C) {
<-begin

for j := 0; j < 100; j++ {
resp := s.client.GetTSAsync()
resp.wg.Wait()
resp := s.client.GetTSAsync(context.Background())
resp.Wait()
c.Assert(resp.err, IsNil)

}
Expand Down

0 comments on commit 89dd059

Please sign in to comment.