forked from boramalper/magnetico
-
Notifications
You must be signed in to change notification settings - Fork 5
/
transport.go
176 lines (148 loc) · 4.58 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package mainline
import (
"errors"
"log"
"net"
"time"
"github.com/anacrolix/torrent/bencode"
)
var (
//Throttle rate that transport will have at Start time. Set <= 0 for unlimited requests
DefaultThrottleRate = -1
)
type Transport struct {
conn *net.UDPConn
laddr *net.UDPAddr
started bool
buffer []byte
// OnMessage is the function that will be called when Transport receives a packet that is
// successfully unmarshalled as a syntactically correct Message (but -of course- the checking
// the semantic correctness of the Message is left to Protocol).
onMessage func(*Message, *net.UDPAddr)
throttlingRate int //available messages per second. If <=0, it is considered disabled
throttleTicketsChannel chan struct{} //channel giving tickets (allowance) to make send a message
}
func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr)) *Transport {
t := new(Transport)
/* The field size sets a theoretical limit of 65,535 bytes (8 byte header + 65,527 bytes of
* data) for a UDP datagram. However the actual limit for the data length, which is imposed by
* the underlying IPv4 protocol, is 65,507 bytes (65,535 − 8 byte UDP header − 20 byte IP
* header).
*
* In IPv6 jumbograms it is possible to have UDP packets of size greater than 65,535 bytes.
* RFC 2675 specifies that the length field is set to zero if the length of the UDP header plus
* UDP data is greater than 65,535.
*
* https://en.wikipedia.org/wiki/User_Datagram_Protocol
*/
t.buffer = make([]byte, 65507)
t.onMessage = onMessage
t.throttleTicketsChannel = make(chan struct{})
t.SetThrottle(DefaultThrottleRate)
var err error
t.laddr, err = net.ResolveUDPAddr("udp", laddr)
if err != nil {
log.Panicf("Could not resolve the UDP address for the trawler! %v", err)
}
return t
}
// Sets t throttle rate at runtime
func (t *Transport) SetThrottle(rate int) {
t.throttlingRate = rate
}
func (t *Transport) Start() {
// Why check whether the Transport `t` started or not, here and not -for instance- in
// t.Terminate()?
// Because in t.Terminate() the programmer (i.e. you & me) would stumble upon an error while
// trying close an uninitialised net.UDPConn or something like that: it's mostly harmless
// because its effects are immediate. But if you try to start a Transport `t` for the second
// (or the third, 4th, ...) time, it will keep spawning goroutines and any small mistake may
// end up in a debugging horror.
// Here ends my justification.
if t.started {
log.Panicln("Attempting to Start() a mainline/Transport that has been already started! (Programmer error.)")
}
t.started = true
var err error
t.conn, err = net.ListenUDP("udp", t.laddr)
if err != nil {
log.Fatalf("Could NOT bind the socket! %v", err)
}
go t.readMessages()
go t.Throttle()
}
func (t *Transport) Terminate() {
t.conn.Close()
}
// readMessages is a goroutine!
func (t *Transport) readMessages() {
for {
n, from, err := t.conn.ReadFromUDP(t.buffer)
if err != nil {
break
}
if n == 0 {
/* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit
* zero-length datagrams. When such a datagram is received, the return value (n) is 0.
*/
continue
}
var msg Message
err = bencode.Unmarshal(t.buffer[:n], &msg)
if err != nil {
// couldn't unmarshal packet data
continue
}
t.onMessage(&msg, from)
}
}
// Manages throttling for transport. To be called as a routine at Start time. Should never return.
func (t *Transport) Throttle() {
if t.throttlingRate > 0 {
resetChannel := make(chan struct{})
dealer := func(resetRequest chan struct{}) {
ticketGiven := 0
tooManyTicketGiven := false
for {
select {
case <-t.throttleTicketsChannel:
{
ticketGiven++
if ticketGiven >= t.throttlingRate {
tooManyTicketGiven = true
break
}
}
case <-resetRequest:
{
return
}
}
if tooManyTicketGiven {
break
}
}
<-resetRequest
}
go dealer(resetChannel)
for range time.Tick(1 * time.Second) {
resetChannel <- struct{}{}
go dealer(resetChannel)
}
} else {
//no limit, keep giving tickets to whoever requests it
for {
<-t.throttleTicketsChannel
}
}
}
func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) error {
//get ticket
t.throttleTicketsChannel <- struct{}{}
data, err := bencode.Marshal(msg)
if err != nil {
return errors.New("could not marshal an outgoing message! (programmer error)")
}
_, err = t.conn.WriteToUDP(data, addr)
return err
}