-
Notifications
You must be signed in to change notification settings - Fork 2
/
mq.go
98 lines (82 loc) · 2.45 KB
/
mq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"flag"
"log"
"net/http"
"time"
)
var (
workers int
peers int
root string
port string
address string
)
type FrontHandler struct {
Store *Store
Router *Router
Endpoints map[string]func(*Session)
}
type Session struct {
Store *Store
Match *RouteMatch
Request *http.Request
Response http.ResponseWriter
}
func (handler *FrontHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
match := handler.Router.Match(request.Method, request.URL.Path)
if match != nil {
session := &Session{
Store: handler.Store,
Match: match,
Request: request,
Response: response,
}
handler.Endpoints[match.Name](session)
return
}
response.WriteHeader(http.StatusNotFound)
}
func init() {
flag.IntVar(&workers, "workers", 8, "Number of workers")
flag.IntVar(&peers, "peers", 0, "Number of peers")
flag.StringVar(&root, "root", "/tmp/mq", "File system storage path")
flag.StringVar(&port, "port", "8080", "Port to listen on")
flag.StringVar(&address, "address", "0.0.0.0", "Address to listen on")
flag.Parse()
}
func main() {
store := NewStore(workers, peers, root)
// Our storage mechanism needs to make sure our folders
// and workers are standing up.
store.PrepareFolders()
store.PrepareWorkers()
router := &Router{}
router.AddRoute("GetQueue", "GET", "^/(?P<queue>[a-z]+)$")
router.AddRoute("CreateQueue", "PUT", "^/(?P<queue>[a-z]+)$")
router.AddRoute("DeleteQueue", "DELETE", "^/(?P<queue>[a-z]+)$")
router.AddRoute("CreateMessage", "POST", "^/(?P<queue>[a-z]+)/messages$")
router.AddRoute("GetMessage", "GET", "^/(?P<queue>[a-z]+)/messages$")
router.AddRoute("DeleteMessage", "DELETE", "^/(?P<queue>[a-z]+)/messages/(?P<message>[a-z0-9-]+)$")
handler := &FrontHandler{
Store: store,
Router: router,
Endpoints: make(map[string]func(*Session)),
}
// Our handler functions by name. This can easily be looked up by the name
// our RouteMatch contains.
handler.Endpoints["GetQueue"] = GetQueue
handler.Endpoints["CreateQueue"] = CreateQueue
handler.Endpoints["DeleteQueue"] = DeleteQueue
handler.Endpoints["CreateMessage"] = CreateMessage
handler.Endpoints["GetMessage"] = GetMessage
handler.Endpoints["DeleteMessage"] = DeleteMessage
server := &http.Server{
Addr: address + ":" + port,
Handler: handler,
ReadTimeout: 120 * time.Second,
WriteTimeout: 120 * time.Second,
MaxHeaderBytes: 1 << 20,
}
log.Println(server.ListenAndServe())
}