Skip to content

Commit

Permalink
Merge pull request #3 from infracloudio/fix-notification-msgs
Browse files Browse the repository at this point in the history
Send bot start/stop messages to slack channel
  • Loading branch information
sanketsudake authored Jan 3, 2019
2 parents 334bdda + 0f6218c commit 55f636f
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 35 deletions.
2 changes: 1 addition & 1 deletion helm/kubeops/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replicaCount: 1

image:
repository: infracloud/kubeops
tag: "0.5"
tag: "0.6"
pullPolicy: Always

nameOverride: ""
Expand Down
81 changes: 59 additions & 22 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package controller

import (
//"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/infracloudio/kubeops/pkg/config"
"github.com/infracloudio/kubeops/pkg/events"
"github.com/infracloudio/kubeops/pkg/filterengine"
"github.com/infracloudio/kubeops/pkg/logging"
log "github.com/infracloudio/kubeops/pkg/logging"
"github.com/infracloudio/kubeops/pkg/notify"
"github.com/infracloudio/kubeops/pkg/utils"

Expand All @@ -22,6 +22,11 @@ import (

var startTime time.Time

const (
controllerStartMsg = "...and now my watch begins! :crossed_swords:"
controllerStopMsg = "my watch has ended!"
)

func findNamespace(ns string) string {
if ns == "all" {
return apiV1.NamespaceAll
Expand All @@ -34,10 +39,23 @@ func findNamespace(ns string) string {

// RegisterInformers creates new informer controllers to watch k8s resources
func RegisterInformers(c *config.Config) {
sendMessage(controllerStartMsg)
startTime = time.Now().Local()

// Get resync period
rsyncTimeStr, ok := os.LookupEnv("INFORMERS_RESYNC_PERIOD")
if !ok {
rsyncTimeStr = "30"
}

rsyncTime, err := strconv.Atoi(rsyncTimeStr)
if err != nil {
log.Logger.Fatal("Error in reading INFORMERS_RESYNC_PERIOD env var.", err)
}

// Register informers for resource lifecycle events
if len(c.Resources) > 0 {
logging.Logger.Info("Registering resource lifecycle informer")
log.Logger.Info("Registering resource lifecycle informer")
for _, r := range c.Resources {
if _, ok := utils.ResourceGetterMap[r.Name]; !ok {
continue
Expand All @@ -47,15 +65,15 @@ func RegisterInformers(c *config.Config) {
continue
}
for _, ns := range r.Namespaces {
logging.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)
log.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)

watchlist := cache.NewListWatchFromClient(
utils.ResourceGetterMap[r.Name], r.Name, findNamespace(ns), fields.Everything())

_, controller := cache.NewInformer(
watchlist,
object,
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
registerEventHandlers(r.Name, r.Events),
)
stopCh := make(chan struct{})
Expand All @@ -69,14 +87,14 @@ func RegisterInformers(c *config.Config) {

// Register informers for k8s events
if len(c.Events.Types) > 0 {
logging.Logger.Info("Registering kubernetes events informer")
log.Logger.Info("Registering kubernetes events informer")
watchlist := cache.NewListWatchFromClient(
utils.KubeClient.CoreV1().RESTClient(), "events", apiV1.NamespaceAll, fields.Everything())

_, controller := cache.NewInformer(
watchlist,
&apiV1.Event{},
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
Expand All @@ -89,12 +107,12 @@ func RegisterInformers(c *config.Config) {
ns := eventObj.InvolvedObject.Namespace
eType := strings.ToLower(eventObj.Type)

logging.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
log.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
// Filter and forward
if (utils.AllowedEventKindsMap[utils.EventKind{kind, "all"}] ||
utils.AllowedEventKindsMap[utils.EventKind{kind, ns}]) && (utils.AllowedEventTypesMap[eType]) {
logging.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
logEvent(obj, "events", "create", err)
log.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
sendEvent(obj, "events", "create", err)
}
},
},
Expand All @@ -109,56 +127,65 @@ func RegisterInformers(c *config.Config) {
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
<-sigterm

sendMessage(controllerStopMsg)
}

func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
for _, event := range events {
if event == "all" || event == "create" {
handlerFns.AddFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing add to %v: %s", resourceType, key)
logEvent(obj, resourceType, "create", err)
log.Logger.Debugf("Processing add to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "create", err)
}
}

if event == "all" || event == "update" {
handlerFns.UpdateFunc = func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
logging.Logger.Debugf("Processing update to %v: %s", resourceType, key)
logEvent(new, resourceType, "update", err)
log.Logger.Debugf("Processing update to %v: %s", resourceType, key)
sendEvent(new, resourceType, "update", err)
}
}

if event == "all" || event == "delete" {
handlerFns.DeleteFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
logEvent(obj, resourceType, "delete", err)
log.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "delete", err)
}
}
}
return handlerFns
}

func logEvent(obj interface{}, kind, eventType string, err error) {
func sendEvent(obj interface{}, kind, eventType string, err error) {
if err != nil {
logging.Logger.Error("Error while receiving event: ", err.Error())
log.Logger.Error("Error while receiving event: ", err.Error())
return
}

// Skip older events
if eventType == "create" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.CreationTimestamp.Sub(startTime).Seconds() <= 0 {
logging.Logger.Debug("Skipping older events")
log.Logger.Debug("Skipping older events")
return
}
}

// Skip older events
if eventType == "delete" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
log.Logger.Debug("Skipping older events")
return
}
}

// Check if Notify disabled
if !config.Notify {
logging.Logger.Info("Skipping notification")
log.Logger.Info("Skipping notification")
return
}

Expand All @@ -168,5 +195,15 @@ func logEvent(obj interface{}, kind, eventType string, err error) {

// Send notification to communication chennel
notifier := notify.NewSlack()
notifier.Send(event)
notifier.SendEvent(event)
}

func sendMessage(msg string) {
if len(msg) <= 0 {
log.Logger.Warn("sendMessage received string with length 0. Hence skipping.")
return
}

notifier := notify.NewSlack()
notifier.SendMessage(msg)
}
12 changes: 10 additions & 2 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var LevelMap map[string]Level
func init() {
LevelMap = make(map[string]Level)
LevelMap["create"] = Info
LevelMap["Update"] = Debug
LevelMap["update"] = Warn
LevelMap["delete"] = Critical
LevelMap["error"] = Error
LevelMap["Warning"] = Critical
Expand All @@ -69,7 +69,15 @@ func New(object interface{}, eventType string, kind string) Event {
Kind: objectTypeMeta.Kind,
Level: LevelMap[eventType],
Type: eventType,
TimeStamp: objectMeta.CreationTimestamp.Time,
}

// Add TimeStamps
if eventType == "create" {
event.TimeStamp = objectMeta.CreationTimestamp.Time
}

if eventType == "delete" {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
}

if kind != "events" {
Expand Down
16 changes: 12 additions & 4 deletions pkg/execute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ var validNotifierCommands = map[string]bool{

var kubectlBinary = "/usr/local/bin/kubectl"

const (
notifierStopMsg = "Brace yourselves, notifications are coming."
notifierStartMsg = "Sure! I won't send you notifications anymore."
unsupportedCmdMsg = "Command not supported. Please run '@kubeops help' to see supported commands."
)

// Executor is an interface for processes to execute commands
type Executor interface {
Execute() string
Expand All @@ -59,7 +65,7 @@ func (e *DefaultExecutor) Execute() string {
if validNotifierCommands[args[0]] {
return runNotifierCommand(args)
}
return "Command not supported. Please run '@kubeops help' to see supported commands"
return unsupportedCmdMsg
}

func printHelp() string {
Expand Down Expand Up @@ -88,7 +94,7 @@ func printHelp() string {
}

func printDefaultMsg() string {
return "Command not supported. Please run '@kubeops help' to see supported commands"
return unsupportedCmdMsg
}

func runKubectlCommand(args []string) string {
Expand Down Expand Up @@ -132,11 +138,13 @@ func runNotifierCommand(args []string) string {
}
if args[1] == "start" {
config.Notify = true
return "Notifier started!"
log.Logger.Info("Notifier enabled")
return notifierStartMsg
}
if args[1] == "stop" {
config.Notify = false
return "Notifier stopped!"
log.Logger.Info("Notifier disabled")
return notifierStopMsg
}
if args[1] == "status" {
if config.Notify == false {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filterengine/filters/ingress_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (iv *IngressValidator) Run(object interface{}, event *events.Event) {
}
_, err := ValidServicePort(serviceName, ns, int32(servicePort))
if err != nil {
event.Messages = append(event.Messages, "Warning: Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Messages = append(event.Messages, "Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Level = events.Warn
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import (

// Notifier to send event notification on the communication channels
type Notifier interface {
Send(events.Event) error
SendEvent(events.Event) error
SendMessage(string) error
}
31 changes: 27 additions & 4 deletions pkg/notify/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewSlack() Notifier {
}
}

// Send event notification to slack
func (s *Slack) Send(event events.Event) error {
// SendEvent sends event notification to slack
func (s *Slack) SendEvent(event events.Event) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", event))

api := slack.New(s.Token)
Expand All @@ -63,7 +63,12 @@ func (s *Slack) Send(event events.Event) error {
},
},
Footer: "kubeops",
Ts: json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)),
}

// Add timestamp
ts := json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10))
if ts > "0" {
attachment.Ts = ts
}

if event.Namespace != "" {
Expand Down Expand Up @@ -114,7 +119,6 @@ func (s *Slack) Send(event events.Event) error {
attachment.Color = attachmentColor[event.Level]
params.Attachments = []slack.Attachment{attachment}

log.Logger.Infof("Sending message on %v with token %s", s.Channel, s.Token)
channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
Expand All @@ -124,3 +128,22 @@ func (s *Slack) Send(event events.Event) error {
log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}

// SendMessage sends message to slack channel
func (s *Slack) SendMessage(msg string) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", msg))

api := slack.New(s.Token)
params := slack.PostMessageParameters{
AsUser: true,
}

channelID, timestamp, err := api.PostMessage(s.Channel, msg, params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
return err
}

log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}

0 comments on commit 55f636f

Please sign in to comment.