-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster_worker.go
108 lines (88 loc) · 3.34 KB
/
cluster_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
log "github.com/Sirupsen/logrus"
"github.com/gogo/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
)
func (c *RaftCluster) handleRegionHeartbeat(region *RegionInfo) error {
// If the region peer count is 0, then we should not handle this.
if len(region.GetPeers()) == 0 {
log.Warnf("invalid region, zero region peer count - %v", region)
return errors.Errorf("invalid region, zero region peer count - %v", region)
}
c.coordinator.dispatch(region)
return nil
}
func (c *RaftCluster) handleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
reqRegion := request.GetRegion()
startKey := reqRegion.GetStartKey()
region, _ := c.GetRegionByKey(startKey)
// If the request epoch is less than current region epoch, then returns an error.
reqRegionEpoch := reqRegion.GetRegionEpoch()
regionEpoch := region.GetRegionEpoch()
if reqRegionEpoch.GetVersion() < regionEpoch.GetVersion() ||
reqRegionEpoch.GetConfVer() < regionEpoch.GetConfVer() {
return nil, errors.Errorf("invalid region epoch, request: %v, currenrt: %v", reqRegionEpoch, regionEpoch)
}
newRegionID, err := c.s.idAlloc.Alloc()
if err != nil {
return nil, errors.Trace(err)
}
peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.s.idAlloc.Alloc(); err != nil {
return nil, errors.Trace(err)
}
}
split := &pdpb.AskSplitResponse{
NewRegionId: newRegionID,
NewPeerIds: peerIDs,
}
return split, nil
}
func (c *RaftCluster) checkSplitRegion(left *metapb.Region, right *metapb.Region) error {
if left == nil || right == nil {
return errors.New("invalid split region")
}
if !bytes.Equal(left.GetEndKey(), right.GetStartKey()) {
return errors.New("invalid split region")
}
if len(right.GetEndKey()) == 0 || bytes.Compare(left.GetStartKey(), right.GetEndKey()) < 0 {
return nil
}
return errors.New("invalid split region")
}
func (c *RaftCluster) handleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb.ReportSplitResponse, error) {
left := request.GetLeft()
right := request.GetRight()
err := c.checkSplitRegion(left, right)
if err != nil {
log.Warnf("report split region is invalid - %v, %v", request, errors.ErrorStack(err))
return nil, errors.Trace(err)
}
// Build origin region by using left and right.
originRegion := proto.Clone(right).(*metapb.Region)
originRegion.RegionEpoch = nil
originRegion.StartKey = left.GetStartKey()
// Wrap report split as an Operator, and add it into history cache.
op := newSplitOperator(originRegion, left, right)
c.coordinator.histories.add(originRegion.GetId(), op)
log.Infof("[region %d] region split, generate new region: %v", originRegion.GetId(), left)
c.coordinator.postEvent(op, evtEnd)
return &pdpb.ReportSplitResponse{}, nil
}