-
-
Notifications
You must be signed in to change notification settings - Fork 275
/
operation_base.go
185 lines (154 loc) Β· 4.95 KB
/
operation_base.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package terminal
import (
"time"
"github.com/tevino/abool"
)
// OperationBase provides the basic operation functionality.
type OperationBase struct {
terminal Terminal
id uint32
stopped abool.AtomicBool
}
// InitOperationBase initialize the operation with the ID and attached terminal.
// Should not be overridden by implementations.
func (op *OperationBase) InitOperationBase(t Terminal, opID uint32) {
op.id = opID
op.terminal = t
}
// ID returns the ID of the operation.
// Should not be overridden by implementations.
func (op *OperationBase) ID() uint32 {
return op.id
}
// Type returns the operation's type ID.
// Should be overridden by implementations to return correct type ID.
func (op *OperationBase) Type() string {
return "unknown"
}
// Deliver delivers a message to the operation.
// Meant to be overridden by implementations.
func (op *OperationBase) Deliver(_ *Msg) *Error {
return ErrIncorrectUsage.With("Deliver not implemented for this operation")
}
// NewMsg creates a new message from this operation.
// Should not be overridden by implementations.
func (op *OperationBase) NewMsg(data []byte) *Msg {
msg := NewMsg(data)
msg.FlowID = op.id
msg.Type = MsgTypeData
// Debug unit leaks.
msg.debugWithCaller(2)
return msg
}
// NewEmptyMsg creates a new empty message from this operation.
// Should not be overridden by implementations.
func (op *OperationBase) NewEmptyMsg() *Msg {
msg := NewEmptyMsg()
msg.FlowID = op.id
msg.Type = MsgTypeData
// Debug unit leaks.
msg.debugWithCaller(2)
return msg
}
// Send sends a message to the other side.
// Should not be overridden by implementations.
func (op *OperationBase) Send(msg *Msg, timeout time.Duration) *Error {
// Add and update metadata.
msg.FlowID = op.id
if msg.Type == MsgTypeData && msg.Unit.IsHighPriority() && UsePriorityDataMsgs {
msg.Type = MsgTypePriorityData
}
// Wait for processing slot.
msg.Unit.WaitForSlot()
// Send message.
tErr := op.terminal.Send(msg, timeout)
if tErr != nil {
// Finish message unit on failure.
msg.Finish()
}
return tErr
}
// Flush sends all messages waiting in the terminal.
// Meant to be overridden by implementations.
func (op *OperationBase) Flush(timeout time.Duration) {
op.terminal.Flush(timeout)
}
// Stopped returns whether the operation has stopped.
// Should not be overridden by implementations.
func (op *OperationBase) Stopped() bool {
return op.stopped.IsSet()
}
// markStopped marks the operation as stopped.
// It returns whether the stop flag was set.
func (op *OperationBase) markStopped() bool {
return op.stopped.SetToIf(false, true)
}
// Stop stops the operation by unregistering it from the terminal and calling HandleStop().
// Should not be overridden by implementations.
func (op *OperationBase) Stop(self Operation, err *Error) {
// Stop operation from terminal.
op.terminal.StopOperation(self, err)
}
// HandleStop gives the operation the ability to cleanly shut down.
// The returned error is the error to send to the other side.
// Should never be called directly. Call Stop() instead.
// Meant to be overridden by implementations.
func (op *OperationBase) HandleStop(err *Error) (errorToSend *Error) {
return err
}
// Terminal returns the terminal the operation is linked to.
// Should not be overridden by implementations.
func (op *OperationBase) Terminal() Terminal {
return op.terminal
}
// OneOffOperationBase is an operation base for operations that just have one
// message and a error return.
type OneOffOperationBase struct {
OperationBase
Result chan *Error
}
// Init initializes the single operation base.
func (op *OneOffOperationBase) Init() {
op.Result = make(chan *Error, 1)
}
// HandleStop gives the operation the ability to cleanly shut down.
// The returned error is the error to send to the other side.
// Should never be called directly. Call Stop() instead.
func (op *OneOffOperationBase) HandleStop(err *Error) (errorToSend *Error) {
select {
case op.Result <- err:
default:
}
return err
}
// MessageStreamOperationBase is an operation base for receiving a message stream.
// Every received message must be finished by the implementing operation.
type MessageStreamOperationBase struct {
OperationBase
Delivered chan *Msg
Ended chan *Error
}
// Init initializes the operation base.
func (op *MessageStreamOperationBase) Init(deliverQueueSize int) {
op.Delivered = make(chan *Msg, deliverQueueSize)
op.Ended = make(chan *Error, 1)
}
// Deliver delivers data to the operation.
func (op *MessageStreamOperationBase) Deliver(msg *Msg) *Error {
select {
case op.Delivered <- msg:
return nil
default:
return ErrIncorrectUsage.With("request was not waiting for data")
}
}
// HandleStop gives the operation the ability to cleanly shut down.
// The returned error is the error to send to the other side.
// Should never be called directly. Call Stop() instead.
func (op *MessageStreamOperationBase) HandleStop(err *Error) (errorToSend *Error) {
select {
case op.Ended <- err:
default:
}
return err
}