forked from lni/dragonboat
/
sessionmanager.go
135 lines (119 loc) · 4.36 KB
/
sessionmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright 2017-2019 Lei Ni (nilei81@gmail.com) and other Dragonboat authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rsm
import (
"io"
sm "github.com/lni/dragonboat/v3/statemachine"
)
var _ ILoadableSessions = &SessionManager{}
// SessionManager is the wrapper struct that implements client session related
// functionalites used in the IManagedStateMachine interface.
type SessionManager struct {
sessions *lrusession
}
// NewSessionManager returns a new SessionManager instance.
func NewSessionManager() *SessionManager {
return &SessionManager{
sessions: newLRUSession(LRUMaxSessionCount),
}
}
// GetSessionHash returns an uint64 integer representing the state of the
// session manager.
func (ds *SessionManager) GetSessionHash() uint64 {
return ds.sessions.getHash()
}
// UpdateRespondedTo updates the responded to value of the specified
// client session.
func (ds *SessionManager) UpdateRespondedTo(session *Session,
respondedTo uint64) {
session.clearTo(RaftSeriesID(respondedTo))
}
// RegisterClientID registers a new client, it returns the input client id
// if it is previously unknown, or 0 when the client has already been
// registered.
func (ds *SessionManager) RegisterClientID(clientID uint64) sm.Result {
es, ok := ds.sessions.getSession(RaftClientID(clientID))
if ok {
if es.ClientID != RaftClientID(clientID) {
plog.Panicf("returned an expected session, got id %d, want %d",
es.ClientID, clientID)
}
plog.Warningf("client ID %d already exist", clientID)
return sm.Result{}
}
s := newSession(RaftClientID(clientID))
ds.sessions.addSession(RaftClientID(clientID), *s)
return sm.Result{Value: clientID}
}
// UnregisterClientID removes the specified client session from the system.
// It returns the client id if the client is successfully removed, or 0
// if the client session does not exist.
func (ds *SessionManager) UnregisterClientID(clientID uint64) sm.Result {
es, ok := ds.sessions.getSession(RaftClientID(clientID))
if !ok {
return sm.Result{}
}
if es.ClientID != RaftClientID(clientID) {
plog.Panicf("returned an expected session, got id %d, want %d",
es.ClientID, clientID)
}
ds.sessions.delSession(RaftClientID(clientID))
return sm.Result{Value: clientID}
}
// ClientRegistered returns whether the specified client exists in the system.
func (ds *SessionManager) ClientRegistered(clientID uint64) (*Session, bool) {
es, ok := ds.sessions.getSession(RaftClientID(clientID))
if ok {
if es.ClientID != RaftClientID(clientID) {
plog.Panicf("returned an expected session, got id %d, want %d",
es.ClientID, clientID)
}
}
return es, ok
}
// UpdateRequired return a tuple of request result, responded before,
// update required.
func (ds *SessionManager) UpdateRequired(session *Session,
seriesID uint64) (sm.Result, bool, bool) {
if session.hasResponded(RaftSeriesID(seriesID)) {
return sm.Result{}, true, false
}
v, ok := session.getResponse(RaftSeriesID(seriesID))
if ok {
return v, false, false
}
return sm.Result{}, false, true
}
// MustHaveClientSeries checks whether the session manager contains a client
// session identified as clientID and whether it has seriesID responded.
func (ds *SessionManager) MustHaveClientSeries(session *Session,
seriesID uint64) {
_, ok := session.getResponse(RaftSeriesID(seriesID))
if ok {
panic("already has response in session")
}
}
// AddResponse adds the specified result to the session.
func (ds *SessionManager) AddResponse(session *Session,
seriesID uint64, result sm.Result) {
session.addResponse(RaftSeriesID(seriesID), result)
}
// SaveSessions saves the sessions to the provided io.writer.
func (ds *SessionManager) SaveSessions(writer io.Writer) error {
return ds.sessions.save(writer)
}
// LoadSessions loads and restores sessions from io.Reader.
func (ds *SessionManager) LoadSessions(reader io.Reader, v SSVersion) error {
return ds.sessions.load(reader, v)
}