forked from kyma-project/kyma
/
application.go
85 lines (75 loc) · 2.97 KB
/
application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package application
import (
"log"
"net/http"
"github.com/kyma-project/kyma/components/event-bus/internal/common"
"github.com/kyma-project/kyma/components/event-bus/internal/push/actors"
"github.com/kyma-project/kyma/components/event-bus/internal/push/controllers"
pushOpts "github.com/kyma-project/kyma/components/event-bus/internal/push/opts"
"github.com/kyma-project/kyma/components/event-bus/internal/trace"
"k8s.io/client-go/tools/cache"
)
// PushApplication ...
type PushApplication struct {
SubscriptionsSupervisor *actors.SubscriptionsSupervisor
subscriptionsController *controllers.SubscriptionsController
ServerMux *http.ServeMux
tracer trace.Tracer
}
// NewPushApplication ...
func NewPushApplication(pushOpts *pushOpts.Options, informer ...cache.SharedIndexInformer) *PushApplication {
log.Println("Push :: Initializing application")
tracer := trace.StartNewTracer(&pushOpts.Options)
subscriptionsSupervisor := actors.StartSubscriptionsSupervisor(pushOpts, &tracer)
var subscriptionsController *controllers.SubscriptionsController
if len(informer) > 0 {
subscriptionsController = controllers.StartSubscriptionsControllerWithInformer(subscriptionsSupervisor, informer[0], pushOpts)
} else {
subscriptionsController = controllers.StartSubscriptionsController(subscriptionsSupervisor, pushOpts)
}
serveMux := http.NewServeMux()
serveMux.Handle("/v1/status/live", statusLiveHandler(subscriptionsSupervisor))
serveMux.Handle("/v1/status/ready", statusReadyHandler(subscriptionsSupervisor))
return &PushApplication{
SubscriptionsSupervisor: subscriptionsSupervisor,
subscriptionsController: subscriptionsController,
ServerMux: serveMux,
tracer: tracer,
}
}
// Stop ...
func (a *PushApplication) Stop() {
a.subscriptionsController.Stop()
a.SubscriptionsSupervisor.PoisonPill()
a.tracer.Stop()
}
var statusLive, statusReady common.StatusReady
func statusLiveHandler(subscriptionsSupervisor *actors.SubscriptionsSupervisor) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if subscriptionsSupervisor != nil && subscriptionsSupervisor.IsRunning() {
if statusLive.SetReady() {
log.Printf("statusLiveHandler :: Status: READY")
}
w.WriteHeader(http.StatusOK)
} else {
statusLive.SetNotReady()
log.Printf("statusLiveHandler :: Status: NOT_READY")
w.WriteHeader(http.StatusBadGateway)
}
})
}
func statusReadyHandler(subscriptionsSupervisor *actors.SubscriptionsSupervisor) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if subscriptionsSupervisor != nil && subscriptionsSupervisor.IsNATSConnected() {
if statusReady.SetReady() {
log.Printf("statusReadyHandler :: Status: READY")
}
w.WriteHeader(http.StatusOK)
} else {
statusReady.SetNotReady()
log.Printf("statusReadyHandler :: Status: NOT_READY")
w.WriteHeader(http.StatusBadGateway)
go subscriptionsSupervisor.ReconnectToNATSStreaming()
}
})
}