Skip to content

Commit

Permalink
Revert "server: use etcd election for PD leader election (#336)"
Browse files Browse the repository at this point in the history
This reverts commit da1bf08.
  • Loading branch information
huachaohuang committed Oct 20, 2016
1 parent 17ed1cc commit 4f67c48
Show file tree
Hide file tree
Showing 20 changed files with 345 additions and 489 deletions.
6 changes: 0 additions & 6 deletions server/api/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const (
)

const (
errServerIsClosed = "server is closed"
errNoLeaderFound = "no leader found"
errRedirectFailed = "redirect failed"
errRedirectToNotLeader = "redirect to not leader"
Expand All @@ -42,11 +41,6 @@ func newRedirector(s *server.Server) *redirector {
}

func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
if h.s.IsClosed() {
http.Error(w, errServerIsClosed, http.StatusInternalServerError)
return
}

if h.s.IsLeader() {
next(w, r)
return
Expand Down
2 changes: 1 addition & 1 deletion server/api/redirector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *testRedirectorSuite) TestNotLeader(c *C) {
}

func mustRequest(c *C, s *server.Server) *http.Response {
unixAddr := []string{s.GetAddr(), apiPrefix, "/api/v1/leader"}
unixAddr := []string{s.GetAddr(), apiPrefix, "/api/v1/version"}
httpAddr := mustUnixAddrToHTTPAddr(c, strings.Join(unixAddr, ""))
client := newUnixSocketClient()
resp, err := client.Get(httpAddr)
Expand Down
2 changes: 1 addition & 1 deletion server/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *testClusterCacheSuite) TearDownSuite(c *C) {
}

func (s *testClusterCacheSuite) TestCache(c *C) {
leaderPd := mustGetLeader(c, s.svr)
leaderPd := mustGetLeader(c, s.client, s.svr.getLeaderPath())

conn, err := rpcConnect(leaderPd.GetAddr())
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (c *RaftCluster) saveStore(store *metapb.Store) error {

storePath := makeStoreKey(c.clusterRoot, store.GetId())

resp, err := c.s.txn().Then(clientv3.OpPut(storePath, string(storeValue))).Commit()
resp, err := c.s.leaderTxn().Then(clientv3.OpPut(storePath, string(storeValue))).Commit()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -608,7 +608,7 @@ func (c *RaftCluster) putConfig(meta *metapb.Cluster) error {
return errors.Trace(err)
}

resp, err := c.s.txn().Then(clientv3.OpPut(c.clusterRoot, string(metaValue))).Commit()
resp, err := c.s.leaderTxn().Then(clientv3.OpPut(c.clusterRoot, string(metaValue))).Commit()
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *testClusterBaseSuite) newRegion(c *C, regionID uint64, startKey []byte,
}

func (s *testClusterSuite) TestBootstrap(c *C) {
leader := mustGetLeader(c, s.svr)
leader := mustGetLeader(c, s.client, s.svr.getLeaderPath())

conn, err := rpcConnect(leader.GetAddr())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s *testClusterBaseSuite) getClusterConfig(c *C, conn net.Conn, clusterID u
}

func (s *testClusterSuite) TestGetPutConfig(c *C) {
leader := mustGetLeader(c, s.svr)
leader := mustGetLeader(c, s.client, s.svr.getLeaderPath())

conn, err := rpcConnect(leader.GetAddr())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -492,7 +492,7 @@ func (s *testClusterSuite) TestClosedChannel(c *C) {
defer cleanup()
go svr.Run()

leader := mustGetLeader(c, svr)
leader := mustGetLeader(c, svr.client, svr.getLeaderPath())

conn, err := rpcConnect(leader.GetAddr())
c.Assert(err, IsNil)
Expand Down
14 changes: 7 additions & 7 deletions server/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit(c *C) {
err := cluster.putConfig(meta)
c.Assert(err, IsNil)

leaderPD := mustGetLeader(c, s.svr)
leaderPD := mustGetLeader(c, s.client, s.svr.getLeaderPath())
conn, err := rpcConnect(leaderPD.GetAddr())
c.Assert(err, IsNil)
defer conn.Close()
Expand Down Expand Up @@ -416,7 +416,7 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit2(c *C) {
c.Assert(cluster, NotNil)

r1, _ := cluster.getRegion([]byte("a"))
leaderPd := mustGetLeader(c, s.svr)
leaderPd := mustGetLeader(c, s.client, s.svr.getLeaderPath())
conn, err := rpcConnect(leaderPd.GetAddr())
c.Assert(err, IsNil)
defer conn.Close()
Expand Down Expand Up @@ -459,7 +459,7 @@ func (s *testClusterWorkerSuite) TestHeartbeatChangePeer(c *C) {
region, _ := cluster.getRegion(regionKey)
c.Assert(region.Peers, HasLen, 1)

leaderPd := mustGetLeader(c, s.svr)
leaderPd := mustGetLeader(c, s.client, s.svr.getLeaderPath())

conn, err := rpcConnect(leaderPd.GetAddr())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -517,7 +517,7 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplitAddPeer(c *C) {
err := cluster.putConfig(meta)
c.Assert(err, IsNil)

leaderPD := mustGetLeader(c, s.svr)
leaderPD := mustGetLeader(c, s.client, s.svr.getLeaderPath())
conn, err := rpcConnect(leaderPD.GetAddr())
c.Assert(err, IsNil)
defer conn.Close()
Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *testClusterWorkerSuite) TestStoreHeartbeat(c *C) {
stores := cluster.GetStores()
c.Assert(stores, HasLen, 5)

leaderPd := mustGetLeader(c, s.svr)
leaderPd := mustGetLeader(c, s.client, s.svr.getLeaderPath())
conn, err := rpcConnect(leaderPd.GetAddr())
c.Assert(err, IsNil)
defer conn.Close()
Expand Down Expand Up @@ -581,7 +581,7 @@ func (s *testClusterWorkerSuite) TestReportSplit(c *C) {
stores := cluster.GetStores()
c.Assert(stores, HasLen, 5)

leaderPd := mustGetLeader(c, s.svr)
leaderPd := mustGetLeader(c, s.client, s.svr.getLeaderPath())
conn, err := rpcConnect(leaderPd.GetAddr())
c.Assert(err, IsNil)
defer conn.Close()
Expand Down Expand Up @@ -621,7 +621,7 @@ func (s *testClusterWorkerSuite) TestBalanceOperatorPriority(c *C) {
})
c.Assert(err, IsNil)

leaderPd := mustGetLeader(c, s.svr)
leaderPd := mustGetLeader(c, s.client, s.svr.getLeaderPath())
conn, err := rpcConnect(leaderPd.GetAddr())
c.Assert(err, IsNil)
defer conn.Close()
Expand Down
2 changes: 1 addition & 1 deletion server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *conn) handleRegionHeartbeat(req *pdpb.Request) (*pdpb.Response, error)

// TODO: we can update in etcd asynchronously later.
if len(ops) > 0 {
resp, err := c.s.txn().Then(ops...).Commit()
resp, err := c.s.leaderTxn().Then(ops...).Commit()
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 3 additions & 1 deletion server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ func (s *testConnSuite) TestReconnect(c *C) {
newLeader.Close()
time.Sleep(time.Second)

// Request will fail with no majority.
// Request will fail with no leader.
for i := 0; i < 2; i++ {
svr := followers[i]
if svr != newLeader {
resp := mustRequest(c, svr)
err := resp.GetHeader().GetError()
c.Assert(err, NotNil)
c.Logf("Response error: %v", err)
c.Assert(svr.IsLeader(), IsFalse)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (alloc *idAllocator) generate() (uint64, error) {

end += allocStep
value = uint64ToBytes(end)
resp, err := alloc.s.txn().If(cmp).Then(clientv3.OpPut(key, string(value))).Commit()
resp, err := alloc.s.leaderTxn(cmp).Then(clientv3.OpPut(key, string(value))).Commit()
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions server/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *testAllocIDSuite) TearDownSuite(c *C) {
}

func (s *testAllocIDSuite) TestID(c *C) {
mustGetLeader(c, s.svr)
mustGetLeader(c, s.client, s.svr.getLeaderPath())

var last uint64
for i := uint64(0); i < allocStep; i++ {
Expand Down Expand Up @@ -80,7 +80,7 @@ func (s *testAllocIDSuite) TestID(c *C) {
}

func (s *testAllocIDSuite) TestCommand(c *C) {
leader := mustGetLeader(c, s.svr)
leader := mustGetLeader(c, s.client, s.svr.getLeaderPath())

conn, err := rpcConnect(leader.GetAddr())
c.Assert(err, IsNil)
Expand Down
5 changes: 1 addition & 4 deletions server/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,8 @@ func startPdWith(cfg *Config) (*Server, error) {
default:
}

go svr.Run()

// Let the server runs before returning it to prevent data race.
time.Sleep(time.Second)
svrCh <- svr
svr.Run()
}()

timer := time.NewTimer(30 * time.Second)
Expand Down
Loading

0 comments on commit 4f67c48

Please sign in to comment.