From 5a4430ee79a8434b3f1e3f50c1f0128355b8702f Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Wed, 2 Jan 2019 14:12:57 +0530 Subject: [PATCH 1/3] Fixes #2: Send bot start/stop messages to slack channel - Fix timestamps in delete notifications --- pkg/controller/controller.go | 75 +++++++++++++------ pkg/events/events.go | 12 ++- pkg/execute/executor.go | 6 +- pkg/filterengine/filters/ingress_validator.go | 2 +- pkg/notify/notify.go | 3 +- pkg/notify/slack.go | 31 +++++++- 6 files changed, 97 insertions(+), 32 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 677f1a17e..631788e40 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1,9 +1,9 @@ package controller import ( - //"fmt" "os" "os/signal" + "strconv" "strings" "syscall" "time" @@ -11,7 +11,7 @@ import ( "github.com/infracloudio/kubeops/pkg/config" "github.com/infracloudio/kubeops/pkg/events" "github.com/infracloudio/kubeops/pkg/filterengine" - "github.com/infracloudio/kubeops/pkg/logging" + log "github.com/infracloudio/kubeops/pkg/logging" "github.com/infracloudio/kubeops/pkg/notify" "github.com/infracloudio/kubeops/pkg/utils" @@ -34,10 +34,23 @@ func findNamespace(ns string) string { // RegisterInformers creates new informer controllers to watch k8s resources func RegisterInformers(c *config.Config) { + sendMessage("...and now my watch begins! :crossed_swords:") startTime = time.Now().Local() + + // Get resync period + rsyncTimeStr, ok := os.LookupEnv("INFORMERS_RESYNC_PERIOD") + if !ok { + rsyncTimeStr = "30" + } + + rsyncTime, err := strconv.Atoi(rsyncTimeStr) + if err != nil { + log.Logger.Fatal("Error in reading INFORMERS_RESYNC_PERIOD env var.", err) + } + // Register informers for resource lifecycle events if len(c.Resources) > 0 { - logging.Logger.Info("Registering resource lifecycle informer") + log.Logger.Info("Registering resource lifecycle informer") for _, r := range c.Resources { if _, ok := utils.ResourceGetterMap[r.Name]; !ok { continue @@ -47,7 +60,7 @@ func RegisterInformers(c *config.Config) { continue } for _, ns := range r.Namespaces { - logging.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns) + log.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns) watchlist := cache.NewListWatchFromClient( utils.ResourceGetterMap[r.Name], r.Name, findNamespace(ns), fields.Everything()) @@ -55,7 +68,7 @@ func RegisterInformers(c *config.Config) { _, controller := cache.NewInformer( watchlist, object, - 30*time.Minute, + time.Duration(rsyncTime)*time.Minute, registerEventHandlers(r.Name, r.Events), ) stopCh := make(chan struct{}) @@ -69,14 +82,14 @@ func RegisterInformers(c *config.Config) { // Register informers for k8s events if len(c.Events.Types) > 0 { - logging.Logger.Info("Registering kubernetes events informer") + log.Logger.Info("Registering kubernetes events informer") watchlist := cache.NewListWatchFromClient( utils.KubeClient.CoreV1().RESTClient(), "events", apiV1.NamespaceAll, fields.Everything()) _, controller := cache.NewInformer( watchlist, &apiV1.Event{}, - 30*time.Minute, + time.Duration(rsyncTime)*time.Minute, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) @@ -89,12 +102,12 @@ func RegisterInformers(c *config.Config) { ns := eventObj.InvolvedObject.Namespace eType := strings.ToLower(eventObj.Type) - logging.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType) + log.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType) // Filter and forward if (utils.AllowedEventKindsMap[utils.EventKind{kind, "all"}] || utils.AllowedEventKindsMap[utils.EventKind{kind, ns}]) && (utils.AllowedEventTypesMap[eType]) { - logging.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace) - logEvent(obj, "events", "create", err) + log.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace) + sendEvent(obj, "events", "create", err) } }, }, @@ -109,7 +122,7 @@ func RegisterInformers(c *config.Config) { signal.Notify(sigterm, syscall.SIGTERM) signal.Notify(sigterm, syscall.SIGINT) <-sigterm - + sendMessage("my watch has ended!") } func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) { @@ -117,33 +130,33 @@ func registerEventHandlers(resourceType string, events []string) (handlerFns cac if event == "all" || event == "create" { handlerFns.AddFunc = func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) - logging.Logger.Debugf("Processing add to %v: %s", resourceType, key) - logEvent(obj, resourceType, "create", err) + log.Logger.Debugf("Processing add to %v: %s", resourceType, key) + sendEvent(obj, resourceType, "create", err) } } if event == "all" || event == "update" { handlerFns.UpdateFunc = func(old, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) - logging.Logger.Debugf("Processing update to %v: %s", resourceType, key) - logEvent(new, resourceType, "update", err) + log.Logger.Debugf("Processing update to %v: %s", resourceType, key) + sendEvent(new, resourceType, "update", err) } } if event == "all" || event == "delete" { handlerFns.DeleteFunc = func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) - logging.Logger.Debugf("Processing delete to %v: %s", resourceType, key) - logEvent(obj, resourceType, "delete", err) + log.Logger.Debugf("Processing delete to %v: %s", resourceType, key) + sendEvent(obj, resourceType, "delete", err) } } } return handlerFns } -func logEvent(obj interface{}, kind, eventType string, err error) { +func sendEvent(obj interface{}, kind, eventType string, err error) { if err != nil { - logging.Logger.Error("Error while receiving event: ", err.Error()) + log.Logger.Error("Error while receiving event: ", err.Error()) return } @@ -151,14 +164,23 @@ func logEvent(obj interface{}, kind, eventType string, err error) { if eventType == "create" { objectMeta := utils.GetObjectMetaData(obj) if objectMeta.CreationTimestamp.Sub(startTime).Seconds() <= 0 { - logging.Logger.Debug("Skipping older events") + log.Logger.Debug("Skipping older events") + return + } + } + + // Skip older events + if eventType == "delete" { + objectMeta := utils.GetObjectMetaData(obj) + if objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 { + log.Logger.Debug("Skipping older events") return } } // Check if Notify disabled if !config.Notify { - logging.Logger.Info("Skipping notification") + log.Logger.Info("Skipping notification") return } @@ -168,5 +190,14 @@ func logEvent(obj interface{}, kind, eventType string, err error) { // Send notification to communication chennel notifier := notify.NewSlack() - notifier.Send(event) + notifier.SendEvent(event) +} + +func sendMessage(msg string) { + if len(msg) <= 0 { + return + } + + notifier := notify.NewSlack() + notifier.SendMessage(msg) } diff --git a/pkg/events/events.go b/pkg/events/events.go index 1391beab1..bc2b9f0d6 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -51,7 +51,7 @@ var LevelMap map[string]Level func init() { LevelMap = make(map[string]Level) LevelMap["create"] = Info - LevelMap["Update"] = Debug + LevelMap["update"] = Warn LevelMap["delete"] = Critical LevelMap["error"] = Error LevelMap["Warning"] = Critical @@ -69,7 +69,15 @@ func New(object interface{}, eventType string, kind string) Event { Kind: objectTypeMeta.Kind, Level: LevelMap[eventType], Type: eventType, - TimeStamp: objectMeta.CreationTimestamp.Time, + } + + // Add TimeStamps + if eventType == "create" { + event.TimeStamp = objectMeta.CreationTimestamp.Time + } + + if eventType == "delete" { + event.TimeStamp = objectMeta.DeletionTimestamp.Time } if kind != "events" { diff --git a/pkg/execute/executor.go b/pkg/execute/executor.go index 52a56af64..76a0a4583 100644 --- a/pkg/execute/executor.go +++ b/pkg/execute/executor.go @@ -132,11 +132,13 @@ func runNotifierCommand(args []string) string { } if args[1] == "start" { config.Notify = true - return "Notifier started!" + log.Logger.Info("Notifier enabled") + return "Brace yourselves, notifications are coming." } if args[1] == "stop" { config.Notify = false - return "Notifier stopped!" + log.Logger.Info("Notifier disabled") + return "Sure! I won't send you notifications anymore." } if args[1] == "status" { if config.Notify == false { diff --git a/pkg/filterengine/filters/ingress_validator.go b/pkg/filterengine/filters/ingress_validator.go index 455a26fea..c14d7dcde 100644 --- a/pkg/filterengine/filters/ingress_validator.go +++ b/pkg/filterengine/filters/ingress_validator.go @@ -38,7 +38,7 @@ func (iv *IngressValidator) Run(object interface{}, event *events.Event) { } _, err := ValidServicePort(serviceName, ns, int32(servicePort)) if err != nil { - event.Messages = append(event.Messages, "Warning: Service "+serviceName+" used in ingress config does not exist or port not exposed\n") + event.Messages = append(event.Messages, "Service "+serviceName+" used in ingress config does not exist or port not exposed\n") event.Level = events.Warn } } diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index 76b590e34..e4ff1f8f3 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -6,5 +6,6 @@ import ( // Notifier to send event notification on the communication channels type Notifier interface { - Send(events.Event) error + SendEvent(events.Event) error + SendMessage(string) error } diff --git a/pkg/notify/slack.go b/pkg/notify/slack.go index a6a7360d7..ae4e69d2b 100644 --- a/pkg/notify/slack.go +++ b/pkg/notify/slack.go @@ -40,8 +40,8 @@ func NewSlack() Notifier { } } -// Send event notification to slack -func (s *Slack) Send(event events.Event) error { +// SendEvent sends event notification to slack +func (s *Slack) SendEvent(event events.Event) error { log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", event)) api := slack.New(s.Token) @@ -63,7 +63,12 @@ func (s *Slack) Send(event events.Event) error { }, }, Footer: "kubeops", - Ts: json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)), + } + + // Add timestamp + ts := json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)) + if ts > "0" { + attachment.Ts = ts } if event.Namespace != "" { @@ -114,7 +119,6 @@ func (s *Slack) Send(event events.Event) error { attachment.Color = attachmentColor[event.Level] params.Attachments = []slack.Attachment{attachment} - log.Logger.Infof("Sending message on %v with token %s", s.Channel, s.Token) channelID, timestamp, err := api.PostMessage(s.Channel, "", params) if err != nil { log.Logger.Errorf("Error in sending slack message %s", err.Error()) @@ -124,3 +128,22 @@ func (s *Slack) Send(event events.Event) error { log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp) return nil } + +// SendMessage sends message to slack channel +func (s *Slack) SendMessage(msg string) error { + log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", msg)) + + api := slack.New(s.Token) + params := slack.PostMessageParameters{ + AsUser: true, + } + + channelID, timestamp, err := api.PostMessage(s.Channel, msg, params) + if err != nil { + log.Logger.Errorf("Error in sending slack message %s", err.Error()) + return err + } + + log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp) + return nil +} From f85b330c68e6c74246bcc6d68fa321289ffc6df0 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Wed, 2 Jan 2019 14:14:18 +0530 Subject: [PATCH 2/3] Update image tag --- helm/kubeops/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/kubeops/values.yaml b/helm/kubeops/values.yaml index 326173d12..d298426fc 100644 --- a/helm/kubeops/values.yaml +++ b/helm/kubeops/values.yaml @@ -6,7 +6,7 @@ replicaCount: 1 image: repository: infracloud/kubeops - tag: "0.5" + tag: "0.6" pullPolicy: Always nameOverride: "" From 0f6218c17e6b270fe3bee5e4fb54edd21f9167fb Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Thu, 3 Jan 2019 11:23:02 +0530 Subject: [PATCH 3/3] kubeops#2: Address review comments --- pkg/controller/controller.go | 10 ++++++++-- pkg/execute/executor.go | 14 ++++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 631788e40..35bd71cda 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -22,6 +22,11 @@ import ( var startTime time.Time +const ( + controllerStartMsg = "...and now my watch begins! :crossed_swords:" + controllerStopMsg = "my watch has ended!" +) + func findNamespace(ns string) string { if ns == "all" { return apiV1.NamespaceAll @@ -34,7 +39,7 @@ func findNamespace(ns string) string { // RegisterInformers creates new informer controllers to watch k8s resources func RegisterInformers(c *config.Config) { - sendMessage("...and now my watch begins! :crossed_swords:") + sendMessage(controllerStartMsg) startTime = time.Now().Local() // Get resync period @@ -122,7 +127,7 @@ func RegisterInformers(c *config.Config) { signal.Notify(sigterm, syscall.SIGTERM) signal.Notify(sigterm, syscall.SIGINT) <-sigterm - sendMessage("my watch has ended!") + sendMessage(controllerStopMsg) } func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) { @@ -195,6 +200,7 @@ func sendEvent(obj interface{}, kind, eventType string, err error) { func sendMessage(msg string) { if len(msg) <= 0 { + log.Logger.Warn("sendMessage received string with length 0. Hence skipping.") return } diff --git a/pkg/execute/executor.go b/pkg/execute/executor.go index 76a0a4583..34d18b32e 100644 --- a/pkg/execute/executor.go +++ b/pkg/execute/executor.go @@ -33,6 +33,12 @@ var validNotifierCommands = map[string]bool{ var kubectlBinary = "/usr/local/bin/kubectl" +const ( + notifierStopMsg = "Brace yourselves, notifications are coming." + notifierStartMsg = "Sure! I won't send you notifications anymore." + unsupportedCmdMsg = "Command not supported. Please run '@kubeops help' to see supported commands." +) + // Executor is an interface for processes to execute commands type Executor interface { Execute() string @@ -59,7 +65,7 @@ func (e *DefaultExecutor) Execute() string { if validNotifierCommands[args[0]] { return runNotifierCommand(args) } - return "Command not supported. Please run '@kubeops help' to see supported commands" + return unsupportedCmdMsg } func printHelp() string { @@ -88,7 +94,7 @@ func printHelp() string { } func printDefaultMsg() string { - return "Command not supported. Please run '@kubeops help' to see supported commands" + return unsupportedCmdMsg } func runKubectlCommand(args []string) string { @@ -133,12 +139,12 @@ func runNotifierCommand(args []string) string { if args[1] == "start" { config.Notify = true log.Logger.Info("Notifier enabled") - return "Brace yourselves, notifications are coming." + return notifierStartMsg } if args[1] == "stop" { config.Notify = false log.Logger.Info("Notifier disabled") - return "Sure! I won't send you notifications anymore." + return notifierStopMsg } if args[1] == "status" { if config.Notify == false {