-
Notifications
You must be signed in to change notification settings - Fork 0
/
agent.go
116 lines (96 loc) · 2.37 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package gostun
import (
"errors"
"sync"
"time"
)
/*
すべてのハンドラがTransactionTimeOutErrを処理するまで、
指定された時刻より前にデッドラインを持つすべてのトランザクションをblockする。
エージェントが既に閉じられている場合、ErrAgentを返す
*/
var (
ErrAgent = errors.New("agent closed")
TransactionTimeOutErr = errors.New("transaction is timed out")
)
// process of transaction in message
type Agent struct {
transactions map[transactionID]TransactionAgent
mux sync.Mutex
nonHandler Handler // non-registered transactions
closed bool
}
type transactionID [TransactionIDSize]byte //12byte, 96bit
// transaction in progress
type TransactionAgent struct {
ID transactionID
Timeout time.Time
handler Handler // if transaction is succeed will be called
}
type AgentHandle struct {
handler Handler
}
// reference http.HandlerFunc same work
type Handler interface {
HandleEvent(e MessageObj)
}
type MessageObj struct {
Msg *Message
Err error
}
func NewAgent() *Agent {
h := AgentHandle{}
a := &Agent{
transactions: make(map[transactionID]TransactionAgent),
nonHandler: h.handler,
}
return a
}
func (a *Agent) ProcessHandle(m *Message) error {
e := MessageObj{
Msg: m,
}
tr, ok := a.transactions[m.TransactionID]
delete(a.transactions, m.TransactionID) //delete maps entry
if ok {
tr.handler.HandleEvent(e) // HandleEvent implement
} else if a.nonHandler != nil {
a.nonHandler.HandleEvent(e) // the transaction is not registered
}
return nil
}
/*
The value for RTO SHOULD be cached by a client after the completion
of the transaction, and used as the starting value for RTO for the
next transaction to the same server (based on equality of IP
address).
*/
//timeout したときに作動
func (a *Agent) TimeOutHandle(trate time.Time) error {
call := make([]Handler, 0, 100)
remove := make([]transactionID, 0, 100)
a.mux.Lock()
if a.closed {
a.mux.Unlock()
return ErrAgent
}
for i, tr := range a.transactions {
if tr.Timeout.Before(trate) {
call = append(call, tr.handler)
remove = append(remove, i)
}
}
// no registered transactions
for _, id := range remove {
delete(a.transactions, id)
}
a.mux.Unlock()
e := MessageObj{
Err: TransactionTimeOutErr,
}
// return transactions
for _, h := range call {
h.HandleEvent(e)
}
return nil
}