forked from kyma-project/kyma
/
main.go
93 lines (78 loc) · 2.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil"
"os"
"os/signal"
"regexp"
"strconv"
"syscall"
"github.com/kyma-project/kyma/components/event-bus/cmd/event-bus-publish/application"
"github.com/kyma-project/kyma/components/event-bus/internal/publish"
)
const (
allowedIDChars = `^[a-zA-Z0-9_\-]+$`
)
var (
isValidID = regexp.MustCompile(allowedIDChars).MatchString
sema chan struct{}
)
func main() {
log.Println("Publish :: Starting up")
publishOpts := publish.ParseFlags()
startPublish(publishOpts)
}
func startPublish(publishOpts *publish.Options) {
if !isValidID(publishOpts.ClientID) {
log.Fatal("invalid client_id ", publishOpts.ClientID)
}
// enforce a limit of concurrent requests processed in parallel.
sema = make(chan struct{}, publishOpts.NoOfConcurrentRequests)
publishApplication := application.NewPublishApplication(publishOpts)
defer publishApplication.Stop()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
rtr := limitParallelRequests(publishApplication.ServerMux)
if publishOpts.TraceRequests {
logger(rtr)
}
srv := &http.Server{
Addr: ":" + strconv.Itoa(publishOpts.Port),
Handler: rtr,
}
go func() {
log.Fatal(srv.ListenAndServe())
}()
killSignal := <-interrupt
switch killSignal {
case os.Interrupt:
log.Println("Got os interrupt...")
case syscall.SIGTERM:
log.Println("Got SIGTERM")
}
log.Println("The service is shutting down....")
srv.Shutdown(context.Background())
log.Println("Done..")
}
func limitParallelRequests(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sema <- struct{}{} // acquire a semaphore
defer func() { <-sema }()
h.ServeHTTP(w, r)
})
}
func logger(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("%s requested %s", r.RemoteAddr, r.URL)
dump, err := httputil.DumpRequest(r, true)
log.Printf("%q", dump)
if err != nil {
http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
return
}
h.ServeHTTP(w, r)
})
}