forked from anacrolix/dht
/
transaction.go
72 lines (63 loc) · 1.32 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package dht
import (
"sync"
"time"
"github.com/anacrolix/dht/krpc"
)
// Transaction keeps track of a message exchange between nodes, such as a
// query message and a response message.
type Transaction struct {
remoteAddr Addr
t string
onResponse func(krpc.Msg)
onTimeout func()
onSendError func(error)
querySender func() error
queryResendDelay func() time.Duration
mu sync.Mutex
gotResponse bool
timer *time.Timer
retries int
lastSend time.Time
}
func (t *Transaction) handleResponse(m krpc.Msg) {
t.mu.Lock()
t.gotResponse = true
t.mu.Unlock()
t.onResponse(m)
}
func (t *Transaction) key() transactionKey {
return transactionKey{
t.remoteAddr.String(),
t.t,
}
}
func (t *Transaction) startResendTimer() {
t.timer = time.AfterFunc(t.queryResendDelay(), t.resendCallback)
}
func (t *Transaction) resendCallback() {
t.mu.Lock()
defer t.mu.Unlock()
if t.gotResponse {
return
}
if t.retries == 2 {
go t.onTimeout()
return
}
t.retries++
if err := t.sendQuery(); err != nil {
go t.onSendError(err)
return
}
if t.timer.Reset(t.queryResendDelay()) {
panic("timer should have fired to get here")
}
}
func (t *Transaction) sendQuery() error {
if err := t.querySender(); err != nil {
return err
}
t.lastSend = time.Now()
return nil
}