forked from jjeffery/stomp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
153 lines (134 loc) · 3.48 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package server
import (
"github.com/jjeffery/stomp/frame"
"github.com/jjeffery/stomp/server/client"
"github.com/jjeffery/stomp/server/queue"
"github.com/jjeffery/stomp/server/topic"
"log"
"net"
"strings"
"time"
)
type requestProcessor struct {
server *Server
ch chan client.Request
tm *topic.Manager
qm *queue.Manager
stop bool // has stop been requested
}
func newRequestProcessor(server *Server) *requestProcessor {
proc := &requestProcessor{
server: server,
ch: make(chan client.Request, 128),
tm: topic.NewManager(),
}
if server.QueueStorage == nil {
proc.qm = queue.NewManager(queue.NewMemoryQueueStorage())
} else {
proc.qm = queue.NewManager(server.QueueStorage)
}
return proc
}
func (proc *requestProcessor) Serve(l net.Listener) error {
go proc.Listen(l)
for {
r := <-proc.ch
switch r.Op {
case client.SubscribeOp:
if isQueueDestination(r.Sub.Destination()) {
queue := proc.qm.Find(r.Sub.Destination())
// todo error handling
queue.Subscribe(r.Sub)
} else {
topic := proc.tm.Find(r.Sub.Destination())
topic.Subscribe(r.Sub)
}
case client.UnsubscribeOp:
if isQueueDestination(r.Sub.Destination()) {
queue := proc.qm.Find(r.Sub.Destination())
// todo error handling
queue.Unsubscribe(r.Sub)
} else {
topic := proc.tm.Find(r.Sub.Destination())
topic.Unsubscribe(r.Sub)
}
case client.EnqueueOp:
destination, ok := r.Frame.Contains(frame.Destination)
if !ok {
// should not happen, already checked in lower layer
panic("missing destination")
}
if isQueueDestination(destination) {
queue := proc.qm.Find(destination)
queue.Enqueue(r.Frame)
} else {
topic := proc.tm.Find(destination)
topic.Enqueue(r.Frame)
}
case client.RequeueOp:
destination, ok := r.Frame.Contains(frame.Destination)
if !ok {
// should not happen, already checked in lower layer
panic("missing destination")
}
// only requeue to queues, should never happen for topics
if isQueueDestination(destination) {
queue := proc.qm.Find(destination)
queue.Requeue(r.Frame)
}
}
}
// this is no longer required for go 1.1
panic("not reached")
}
func isQueueDestination(dest string) bool {
return strings.HasPrefix(dest, QueuePrefix)
}
func (proc *requestProcessor) Listen(l net.Listener) {
config := newConfig(proc.server)
timeout := time.Duration(0) // how long to sleep on accept failure
for {
rw, err := l.Accept()
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
if timeout == 0 {
timeout = 5 * time.Millisecond
} else {
timeout *= 2
}
if max := 5 * time.Second; timeout > max {
timeout = max
}
log.Printf("stomp: Accept error: %v; retrying in %v", err, timeout)
time.Sleep(timeout)
continue
}
return
}
timeout = 0
// TODO: need to pass Server to connection so it has access to
// configuration parameters.
_ = client.NewConn(config, rw, proc.ch)
}
// This is no longer required for go 1.1
panic("not reached")
}
type config struct {
server *Server
}
func newConfig(s *Server) *config {
return &config{server: s}
}
func (c *config) HeartBeat() time.Duration {
if c.server.HeartBeat == time.Duration(0) {
return DefaultHeartBeat
}
return c.server.HeartBeat
}
func (c *config) Authenticate(login, passcode string) bool {
if c.server.Authenticator != nil {
return c.server.Authenticator.Authenticate(login, passcode)
}
// no authentication defined
return true
}