Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
837 lines (767 sloc) 20.2 KB
// Copyright 2017-2019 Lei Ni (nilei81@gmail.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dragonboat
import (
"crypto/md5"
"encoding/binary"
"errors"
"fmt"
"io"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"
"github.com/lni/dragonboat/client"
"github.com/lni/dragonboat/internal/settings"
"github.com/lni/dragonboat/internal/utils/random"
"github.com/lni/dragonboat/logger"
pb "github.com/lni/dragonboat/raftpb"
)
const (
badKeyCheck bool = false
sysGcMillisecond uint64 = 15000
)
var (
defaultGCTick uint64 = 2
plog = logger.GetLogger("dragonboat")
)
var (
// ErrInvalidSession indicates that the specified client session is invalid.
ErrInvalidSession = errors.New("invalid session")
// ErrTimeoutTooSmall indicates that the specified timeout value is too small.
ErrTimeoutTooSmall = errors.New("specified timeout value is too small")
// ErrPayloadTooBig indicates that the payload is too big.
ErrPayloadTooBig = errors.New("payload is too big")
// ErrSystemBusy indicates that the system is too busy to handle the request.
ErrSystemBusy = errors.New("system is too busy try again later")
// ErrClusterClosed indicates that the requested cluster is being shut down.
ErrClusterClosed = errors.New("raft cluster already closed")
// ErrBadKey indicates that the key is bad, retry the request is recommended.
ErrBadKey = errors.New("bad key try again later")
// ErrPendingConfigChangeExist indicates that there is already a pending
// membership change exist in the system.
ErrPendingConfigChangeExist = errors.New("pending config change request exist")
// ErrTimeout indicates that the operation timed out.
ErrTimeout = errors.New("timeout")
// ErrSystemStopped indicates that the system is being shut down.
ErrSystemStopped = errors.New("system stopped")
// ErrCanceled indicates that the request has been canceled.
ErrCanceled = errors.New("request canceled")
// ErrRejected indicates that the request has been rejected.
ErrRejected = errors.New("request rejected")
)
// IsTempError returns a boolean value indicating whether the specified error
// is a temporary error that worth to be retried later with the exact same
// input, potentially on a more suitable NodeHost instance.
func IsTempError(err error) bool {
return err == ErrSystemBusy ||
err == ErrBadKey ||
err == ErrPendingConfigChangeExist ||
err == ErrClusterClosed ||
err == ErrSystemStopped
}
// RequestResultCode is the result code returned to the client to indicate the
// outcome of the request.
type RequestResultCode int
// RequestResult is the result struct returned for the request.
type RequestResult struct {
// code is the result state of the request.
code RequestResultCode
// Result is the returned result from the Update method of the IStateMachine
// instance. Result is only available when making a proposal and the Code
// value is RequestCompleted.
result uint64
}
// Timeout returns a boolean value indicating whether the Request timed out.
func (rr *RequestResult) Timeout() bool {
return rr.code == requestTimeout
}
// Completed returns a boolean value indicating the request request completed
// successfully. For proposals, it means the proposal has been committed by the
// Raft cluster and applied on the local node. For ReadIndex operation, it means
// the cluster is now ready for a local read.
func (rr *RequestResult) Completed() bool {
return rr.code == requestCompleted
}
// Terminated returns a boolean value indicating the request terminated due to
// the requested Raft cluster is being shut down.
func (rr *RequestResult) Terminated() bool {
return rr.code == requestTerminated
}
// Rejected returns a boolean value indicating the request is rejected. For a
// proposal, it means that the used client session instance is not registered
// or it has been evicted on the server side. When requesting a client session
// to be registered, Rejected means the another client session with the same
// client ID has already been registered. When requesting a client session to
// be unregistered, Rejected means the specified client session is not found
// on the server side. For a membership change request, it means the request
// is out of order and thus ignored. Note that the out-of-order check when
// making membership changes is only imposed when IMasterClient is used in
// NodeHost.
func (rr *RequestResult) Rejected() bool {
return rr.code == requestRejected
}
// GetResult returns the result value of the request. When making a proposal,
// the returned result is the value returned by the Update method of the
// IStateMachine instance.
func (rr *RequestResult) GetResult() uint64 {
return rr.result
}
const (
requestTimeout RequestResultCode = iota
requestCompleted
requestTerminated
requestRejected
)
var requestResultCodeName = [...]string{
"RequestTimeout",
"RequestCompleted",
"RequestTerminated",
"RequestRejected",
}
func (c RequestResultCode) String() string {
return requestResultCodeName[uint64(c)]
}
func getTerminatedResult() RequestResult {
return RequestResult{
code: requestTerminated,
}
}
func getTimeoutResult() RequestResult {
return RequestResult{
code: requestTimeout,
}
}
const (
maxProposalPayloadSize = settings.MaxProposalPayloadSize
)
type logicalClock struct {
tick uint64
lastGcTime uint64
gcTick uint64
tickInMillisecond uint64
}
func (p *logicalClock) increaseTick() {
atomic.AddUint64(&p.tick, 1)
}
func (p *logicalClock) getTick() uint64 {
return atomic.LoadUint64(&p.tick)
}
func (p *logicalClock) getTimeoutTick(timeout time.Duration) uint64 {
timeoutMs := uint64(timeout.Nanoseconds() / 1000000)
return timeoutMs / p.tickInMillisecond
}
// ICompleteHandler is a handler interface that will be invoked when the request
// in completed. This interface is used by the language bindings, applications
// are not expected to directly use this interface.
type ICompleteHandler interface {
Notify(RequestResult)
Release()
}
// RequestState is the object used to provide request result to users.
type RequestState struct {
data []byte
key uint64
clientID uint64
seriesID uint64
respondedTo uint64
deadline uint64
completeHandler ICompleteHandler
// CompleteC is a channel for delivering request result to users.
CompletedC chan RequestResult
pool *sync.Pool
}
func (r *RequestState) notify(result RequestResult) {
if r.completeHandler == nil {
select {
case r.CompletedC <- result:
default:
plog.Panicf("RequestState.CompletedC is full")
}
} else {
r.completeHandler.Notify(result)
r.completeHandler.Release()
r.Release()
}
}
// Release puts the RequestState object back to the sync.Pool pool.
func (r *RequestState) Release() {
if r.pool != nil {
r.data = nil
r.deadline = 0
r.key = 0
r.seriesID = 0
r.clientID = 0
r.respondedTo = 0
r.completeHandler = nil
r.pool.Put(r)
}
}
type proposalShard struct {
mu sync.Mutex
proposals *entryQueue
pending map[uint64]*RequestState
pool *sync.Pool
stopped bool
expireNotified uint64
logicalClock
}
type keyGenerator struct {
randMu sync.Mutex
rand *rand.Rand
}
func (k *keyGenerator) nextKey() uint64 {
k.randMu.Lock()
v := k.rand.Uint64()
k.randMu.Unlock()
return v
}
type pendingProposal struct {
shards []*proposalShard
keyg []*keyGenerator
ps uint64
}
type systemCtxGcTime struct {
ctx pb.SystemCtx
expireTime uint64
}
type pendingReadIndex struct {
seqKey uint64
mu sync.Mutex
// user generated ctx->requestState
pending map[uint64]*RequestState
// system ctx->appliedIndex
batches map[pb.SystemCtx]uint64
// user generated ctx->batched system ctx
mapping map[uint64]pb.SystemCtx
// cached system ctx used to call node.ReadIndex
system pb.SystemCtx
systemGcTime []systemCtxGcTime
requests *readIndexQueue
stopped bool
pool *sync.Pool
logicalClock
}
type pendingConfigChange struct {
mu sync.Mutex
pending *RequestState
confChangeC chan<- *RequestState
logicalClock
}
func newPendingConfigChange(confChangeC chan<- *RequestState,
tickInMillisecond uint64) *pendingConfigChange {
gcTick := defaultGCTick
if gcTick == 0 {
panic("invalid gcTick")
}
lcu := logicalClock{
tickInMillisecond: tickInMillisecond,
gcTick: gcTick,
}
p := &pendingConfigChange{
confChangeC: confChangeC,
logicalClock: lcu,
}
return p
}
func (p *pendingConfigChange) close() {
p.mu.Lock()
defer p.mu.Unlock()
if p.confChangeC != nil {
if p.pending != nil {
p.pending.notify(getTerminatedResult())
p.pending = nil
}
close(p.confChangeC)
p.confChangeC = nil
}
}
func (p *pendingConfigChange) request(cc pb.ConfigChange,
timeout time.Duration) (*RequestState, error) {
p.mu.Lock()
defer p.mu.Unlock()
timeoutTick := p.getTimeoutTick(timeout)
if timeoutTick == 0 {
return nil, ErrTimeoutTooSmall
}
if p.pending != nil {
return nil, ErrPendingConfigChangeExist
}
if p.confChangeC == nil {
return nil, ErrClusterClosed
}
data, err := cc.Marshal()
if err != nil {
panic(err)
}
req := &RequestState{
key: random.LockGuardedRand.Uint64(),
data: data,
deadline: p.getTick() + timeoutTick,
CompletedC: make(chan RequestResult, 1),
}
select {
case p.confChangeC <- req:
p.pending = req
return req, nil
default:
}
return nil, ErrSystemBusy
}
func (p *pendingConfigChange) gc() {
p.mu.Lock()
defer p.mu.Unlock()
if p.pending == nil {
return
}
now := p.getTick()
if now-p.lastGcTime < p.gcTick {
return
}
p.lastGcTime = now
if p.pending.deadline < now {
p.pending.notify(getTimeoutResult())
p.pending = nil
}
}
func (p *pendingConfigChange) apply(key uint64, rejected bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.pending == nil {
return
}
var v RequestResult
if rejected {
v.code = requestRejected
} else {
v.code = requestCompleted
}
if p.pending.key == key {
p.pending.notify(v)
p.pending = nil
}
}
func newPendingReadIndex(pool *sync.Pool, requests *readIndexQueue,
tickInMillisecond uint64) *pendingReadIndex {
gcTick := defaultGCTick
if gcTick == 0 {
panic("invalid gcTick")
}
lcu := logicalClock{
tickInMillisecond: tickInMillisecond,
gcTick: gcTick,
}
p := &pendingReadIndex{
pending: make(map[uint64]*RequestState),
batches: make(map[pb.SystemCtx]uint64),
mapping: make(map[uint64]pb.SystemCtx),
systemGcTime: make([]systemCtxGcTime, 0),
requests: requests,
logicalClock: lcu,
pool: pool,
}
p.system = p.nextSystemCtx()
return p
}
func (p *pendingReadIndex) close() {
p.mu.Lock()
defer p.mu.Unlock()
p.stopped = true
if p.requests != nil {
p.requests.close()
tmp := p.requests.get()
for _, v := range tmp {
v.notify(getTerminatedResult())
}
}
for _, v := range p.pending {
v.notify(getTerminatedResult())
}
}
func (p *pendingReadIndex) read(handler ICompleteHandler,
timeout time.Duration) (*RequestState, error) {
timeoutTick := p.getTimeoutTick(timeout)
if timeoutTick == 0 {
return nil, ErrTimeoutTooSmall
}
req := p.pool.Get().(*RequestState)
req.completeHandler = handler
req.key = p.nextUserCtx()
req.deadline = p.getTick() + timeoutTick
if len(req.CompletedC) > 0 {
req.CompletedC = make(chan RequestResult, 1)
}
ok, closed := p.requests.add(req)
if closed {
return nil, ErrClusterClosed
}
if !ok {
return nil, ErrSystemBusy
}
return req, nil
}
func (p *pendingReadIndex) nextUserCtx() uint64 {
return atomic.AddUint64(&p.seqKey, 1)
}
func (p *pendingReadIndex) nextSystemCtx() pb.SystemCtx {
for {
v := pb.SystemCtx{
Low: random.LockGuardedRand.Uint64(),
High: random.LockGuardedRand.Uint64(),
}
if v.Low != 0 && v.High != 0 {
return v
}
}
}
func (p *pendingReadIndex) nextCtx() pb.SystemCtx {
p.mu.Lock()
defer p.mu.Unlock()
v := p.system
p.system = p.nextSystemCtx()
expireTick := sysGcMillisecond / p.tickInMillisecond
p.systemGcTime = append(p.systemGcTime,
systemCtxGcTime{
ctx: v,
expireTime: p.getTick() + expireTick,
})
return v
}
func (p *pendingReadIndex) peepNextCtx() pb.SystemCtx {
p.mu.Lock()
v := p.system
p.mu.Unlock()
return v
}
func (p *pendingReadIndex) addReadyToRead(readStates []pb.ReadyToRead) {
if len(readStates) == 0 {
return
}
p.mu.Lock()
for _, v := range readStates {
p.batches[v.SystemCtx] = v.Index
}
p.mu.Unlock()
}
func (p *pendingReadIndex) addPendingRead(system pb.SystemCtx,
reqs []*RequestState) {
p.mu.Lock()
defer p.mu.Unlock()
if p.stopped {
return
}
for _, req := range reqs {
if _, ok := p.pending[req.key]; ok {
panic("key already in the pending map")
}
p.pending[req.key] = req
p.mapping[req.key] = system
}
}
func (p *pendingReadIndex) applied(applied uint64) {
// FIXME:
// when there is no pending request, we still want to get a chance to cleanup
// systemGcTime. the parameter sysGcMillisecond is how many
// milliseconds are allowed before the readIndex message is considered as
// timeout. as we send one msgIndex per 1 millisecond by default, we expect
// the max length of systemGcTime to be less than
// sysGcMillisecond.
// here as you can see the one msgIndex per 1 millisecond by default
// is not taken into consideration when checking the systemGcTime,
// this need to be fixed.
p.mu.Lock()
if p.stopped {
p.mu.Unlock()
return
}
if len(p.pending) == 0 &&
uint64(len(p.systemGcTime)) < sysGcMillisecond {
p.mu.Unlock()
return
}
now := p.getTick()
toDelete := make([]uint64, 0)
for userKey, req := range p.pending {
systemCtx, ok := p.mapping[userKey]
if !ok {
panic("mapping is missing")
}
bindex, bok := p.batches[systemCtx]
if !bok || bindex > applied {
continue
} else {
toDelete = append(toDelete, userKey)
var v RequestResult
if req.deadline > now {
v.code = requestCompleted
} else {
v.code = requestTimeout
}
req.notify(v)
}
}
for _, v := range toDelete {
delete(p.pending, v)
delete(p.mapping, v)
}
if now-p.lastGcTime < p.gcTick {
p.mu.Unlock()
return
}
p.lastGcTime = now
p.gc(now)
p.mu.Unlock()
}
func (p *pendingReadIndex) gc(now uint64) {
toDeleteCount := 0
for _, v := range p.systemGcTime {
if v.expireTime < now {
delete(p.batches, v.ctx)
toDeleteCount++
} else {
break
}
}
if toDeleteCount > 0 {
p.systemGcTime = p.systemGcTime[toDeleteCount:]
}
if len(p.pending) > 0 {
toDelete := make([]uint64, 0)
for userKey, req := range p.pending {
if req.deadline < now {
req.notify(getTimeoutResult())
toDelete = append(toDelete, userKey)
}
}
for _, v := range toDelete {
delete(p.pending, v)
delete(p.mapping, v)
}
}
}
func getRandomGenerator(clusterID uint64,
nodeID uint64, addr string, partition uint64) *keyGenerator {
pid := os.Getpid()
nano := time.Now().UnixNano()
seedStr := fmt.Sprintf("%d-%d-%d-%d-%s-%d",
pid, nano, clusterID, nodeID, addr, partition)
m := md5.New()
if _, err := io.WriteString(m, seedStr); err != nil {
panic(err)
}
md5sum := m.Sum(nil)
seed := binary.LittleEndian.Uint64(md5sum)
return &keyGenerator{rand: rand.New(rand.NewSource(int64(seed)))}
}
func newPendingProposal(pool *sync.Pool,
proposals *entryQueue, clusterID uint64, nodeID uint64, raftAddress string,
tickInMillisecond uint64) *pendingProposal {
ps := uint64(16)
p := &pendingProposal{
shards: make([]*proposalShard, ps),
keyg: make([]*keyGenerator, ps),
ps: ps,
}
for i := uint64(0); i < ps; i++ {
p.shards[i] = newPendingProposalShard(pool,
proposals, tickInMillisecond)
p.keyg[i] = getRandomGenerator(clusterID, nodeID, raftAddress, i)
}
return p
}
func (p *pendingProposal) propose(session *client.Session,
cmd []byte, handler ICompleteHandler,
timeout time.Duration) (*RequestState, error) {
key := p.nextKey(session.ClientID)
pp := p.shards[key%p.ps]
return pp.propose(session, cmd, key, handler, timeout)
}
func (p *pendingProposal) close() {
for _, pp := range p.shards {
pp.close()
}
}
func (p *pendingProposal) applied(clientID uint64,
seriesID uint64, key uint64, result uint64, rejected bool) {
pp := p.shards[key%p.ps]
pp.applied(clientID, seriesID, key, result, rejected)
}
func (p *pendingProposal) nextKey(clientID uint64) uint64 {
return p.keyg[clientID%p.ps].nextKey()
}
func (p *pendingProposal) increaseTick() {
for i := uint64(0); i < p.ps; i++ {
p.shards[i].increaseTick()
}
}
func (p *pendingProposal) gc() {
for i := uint64(0); i < p.ps; i++ {
pp := p.shards[i]
pp.gc()
}
}
func newPendingProposalShard(pool *sync.Pool,
proposals *entryQueue, tickInMillisecond uint64) *proposalShard {
gcTick := defaultGCTick
if gcTick == 0 {
panic("invalid gcTick")
}
lcu := logicalClock{
tickInMillisecond: tickInMillisecond,
gcTick: gcTick,
}
p := &proposalShard{
proposals: proposals,
pending: make(map[uint64]*RequestState),
logicalClock: lcu,
pool: pool,
}
return p
}
func (p *proposalShard) propose(session *client.Session,
cmd []byte, key uint64, handler ICompleteHandler,
timeout time.Duration) (*RequestState, error) {
timeoutTick := p.getTimeoutTick(timeout)
if timeoutTick == 0 {
return nil, ErrTimeoutTooSmall
}
if uint64(len(cmd)) > maxProposalPayloadSize {
return nil, ErrPayloadTooBig
}
entry := pb.Entry{
Key: key,
ClientID: session.ClientID,
SeriesID: session.SeriesID,
RespondedTo: session.RespondedTo,
Cmd: prepareProposalPayload(cmd),
}
req := p.pool.Get().(*RequestState)
req.clientID = session.ClientID
req.seriesID = session.SeriesID
req.completeHandler = handler
req.key = entry.Key
req.deadline = p.getTick() + timeoutTick
if len(req.CompletedC) > 0 {
req.CompletedC = make(chan RequestResult, 1)
}
p.mu.Lock()
if badKeyCheck {
_, ok := p.pending[entry.Key]
if ok {
plog.Warningf("bad key")
p.mu.Unlock()
return nil, ErrBadKey
}
}
p.pending[entry.Key] = req
p.mu.Unlock()
added, stopped := p.proposals.add(entry)
if stopped {
plog.Warningf("dropping proposals, cluster stopped")
p.mu.Lock()
delete(p.pending, entry.Key)
p.mu.Unlock()
return nil, ErrClusterClosed
}
if !added {
p.mu.Lock()
delete(p.pending, entry.Key)
p.mu.Unlock()
plog.Warningf("dropping proposals, overloaded")
return nil, ErrSystemBusy
}
return req, nil
}
func (p *proposalShard) close() {
p.mu.Lock()
defer p.mu.Unlock()
p.stopped = true
if p.proposals != nil {
p.proposals.close()
}
for _, c := range p.pending {
c.notify(getTerminatedResult())
}
}
func (p *proposalShard) getProposal(clientID uint64,
seriesID uint64, key uint64, now uint64) *RequestState {
p.mu.Lock()
if p.stopped {
p.mu.Unlock()
return nil
}
ps, ok := p.pending[key]
if ok && ps.deadline >= now {
if ps.clientID == clientID && ps.seriesID == seriesID {
delete(p.pending, key)
p.mu.Unlock()
return ps
}
}
p.mu.Unlock()
return nil
}
func (p *proposalShard) applied(clientID uint64,
seriesID uint64, key uint64, result uint64, rejected bool) {
now := p.getTick()
var code RequestResultCode
if rejected {
code = requestRejected
} else {
code = requestCompleted
}
ps := p.getProposal(clientID, seriesID, key, now)
if ps != nil {
ps.notify(RequestResult{code: code, result: result})
}
tick := p.getTick()
if tick != p.expireNotified {
p.gcAt(now)
p.expireNotified = tick
}
}
func (p *proposalShard) gc() {
now := p.getTick()
p.gcAt(now)
}
func (p *proposalShard) gcAt(now uint64) {
p.mu.Lock()
defer p.mu.Unlock()
if p.stopped {
return
}
if now-p.lastGcTime < p.gcTick {
return
}
p.lastGcTime = now
deletedKeys := make(map[uint64]bool)
for key, pRec := range p.pending {
if pRec.deadline < now {
pRec.notify(getTimeoutResult())
deletedKeys[key] = true
}
}
if len(deletedKeys) == 0 {
return
}
for key := range deletedKeys {
delete(p.pending, key)
}
}
func prepareProposalPayload(cmd []byte) []byte {
dst := make([]byte, len(cmd))
copy(dst, cmd)
return dst
}