Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send bot start/stop messages to slack channel #3

Merged
merged 3 commits into from Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion helm/kubeops/values.yaml
Expand Up @@ -6,7 +6,7 @@ replicaCount: 1

image:
repository: infracloud/kubeops
tag: "0.5"
tag: "0.6"
pullPolicy: Always

nameOverride: ""
Expand Down
81 changes: 59 additions & 22 deletions pkg/controller/controller.go
@@ -1,17 +1,17 @@
package controller

import (
//"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/infracloudio/kubeops/pkg/config"
"github.com/infracloudio/kubeops/pkg/events"
"github.com/infracloudio/kubeops/pkg/filterengine"
"github.com/infracloudio/kubeops/pkg/logging"
log "github.com/infracloudio/kubeops/pkg/logging"
"github.com/infracloudio/kubeops/pkg/notify"
"github.com/infracloudio/kubeops/pkg/utils"

Expand All @@ -22,6 +22,11 @@ import (

var startTime time.Time

const (
controllerStartMsg = "...and now my watch begins! :crossed_swords:"
controllerStopMsg = "my watch has ended!"
)

func findNamespace(ns string) string {
if ns == "all" {
return apiV1.NamespaceAll
Expand All @@ -34,10 +39,23 @@ func findNamespace(ns string) string {

// RegisterInformers creates new informer controllers to watch k8s resources
func RegisterInformers(c *config.Config) {
sendMessage(controllerStartMsg)
startTime = time.Now().Local()

// Get resync period
rsyncTimeStr, ok := os.LookupEnv("INFORMERS_RESYNC_PERIOD")
if !ok {
rsyncTimeStr = "30"
}

rsyncTime, err := strconv.Atoi(rsyncTimeStr)
if err != nil {
log.Logger.Fatal("Error in reading INFORMERS_RESYNC_PERIOD env var.", err)
}

// Register informers for resource lifecycle events
if len(c.Resources) > 0 {
logging.Logger.Info("Registering resource lifecycle informer")
log.Logger.Info("Registering resource lifecycle informer")
for _, r := range c.Resources {
if _, ok := utils.ResourceGetterMap[r.Name]; !ok {
continue
Expand All @@ -47,15 +65,15 @@ func RegisterInformers(c *config.Config) {
continue
}
for _, ns := range r.Namespaces {
logging.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)
log.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)

watchlist := cache.NewListWatchFromClient(
utils.ResourceGetterMap[r.Name], r.Name, findNamespace(ns), fields.Everything())

_, controller := cache.NewInformer(
watchlist,
object,
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
registerEventHandlers(r.Name, r.Events),
)
stopCh := make(chan struct{})
Expand All @@ -69,14 +87,14 @@ func RegisterInformers(c *config.Config) {

// Register informers for k8s events
if len(c.Events.Types) > 0 {
logging.Logger.Info("Registering kubernetes events informer")
log.Logger.Info("Registering kubernetes events informer")
watchlist := cache.NewListWatchFromClient(
utils.KubeClient.CoreV1().RESTClient(), "events", apiV1.NamespaceAll, fields.Everything())

_, controller := cache.NewInformer(
watchlist,
&apiV1.Event{},
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
Expand All @@ -89,12 +107,12 @@ func RegisterInformers(c *config.Config) {
ns := eventObj.InvolvedObject.Namespace
eType := strings.ToLower(eventObj.Type)

logging.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
log.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
// Filter and forward
if (utils.AllowedEventKindsMap[utils.EventKind{kind, "all"}] ||
utils.AllowedEventKindsMap[utils.EventKind{kind, ns}]) && (utils.AllowedEventTypesMap[eType]) {
logging.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
logEvent(obj, "events", "create", err)
log.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
sendEvent(obj, "events", "create", err)
}
},
},
Expand All @@ -109,56 +127,65 @@ func RegisterInformers(c *config.Config) {
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
<-sigterm

sendMessage(controllerStopMsg)
}

func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
for _, event := range events {
if event == "all" || event == "create" {
handlerFns.AddFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing add to %v: %s", resourceType, key)
logEvent(obj, resourceType, "create", err)
log.Logger.Debugf("Processing add to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "create", err)
}
}

if event == "all" || event == "update" {
handlerFns.UpdateFunc = func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
logging.Logger.Debugf("Processing update to %v: %s", resourceType, key)
logEvent(new, resourceType, "update", err)
log.Logger.Debugf("Processing update to %v: %s", resourceType, key)
sendEvent(new, resourceType, "update", err)
}
}

if event == "all" || event == "delete" {
handlerFns.DeleteFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
logEvent(obj, resourceType, "delete", err)
log.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "delete", err)
}
}
}
return handlerFns
}

func logEvent(obj interface{}, kind, eventType string, err error) {
func sendEvent(obj interface{}, kind, eventType string, err error) {
if err != nil {
logging.Logger.Error("Error while receiving event: ", err.Error())
log.Logger.Error("Error while receiving event: ", err.Error())
return
}

// Skip older events
if eventType == "create" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.CreationTimestamp.Sub(startTime).Seconds() <= 0 {
logging.Logger.Debug("Skipping older events")
log.Logger.Debug("Skipping older events")
return
}
}

// Skip older events
if eventType == "delete" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
log.Logger.Debug("Skipping older events")
return
}
}

// Check if Notify disabled
if !config.Notify {
logging.Logger.Info("Skipping notification")
log.Logger.Info("Skipping notification")
return
}

Expand All @@ -168,5 +195,15 @@ func logEvent(obj interface{}, kind, eventType string, err error) {

// Send notification to communication chennel
notifier := notify.NewSlack()
notifier.Send(event)
notifier.SendEvent(event)
}

func sendMessage(msg string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this function in notifier package

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ssudake21 we already have notifier.SendMessage(msg) function in notifier package

if len(msg) <= 0 {
log.Logger.Warn("sendMessage received string with length 0. Hence skipping.")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log warning here

}

notifier := notify.NewSlack()
notifier.SendMessage(msg)
}
12 changes: 10 additions & 2 deletions pkg/events/events.go
Expand Up @@ -51,7 +51,7 @@ var LevelMap map[string]Level
func init() {
LevelMap = make(map[string]Level)
LevelMap["create"] = Info
LevelMap["Update"] = Debug
LevelMap["update"] = Warn
LevelMap["delete"] = Critical
LevelMap["error"] = Error
LevelMap["Warning"] = Critical
Expand All @@ -69,7 +69,15 @@ func New(object interface{}, eventType string, kind string) Event {
Kind: objectTypeMeta.Kind,
Level: LevelMap[eventType],
Type: eventType,
TimeStamp: objectMeta.CreationTimestamp.Time,
}

// Add TimeStamps
if eventType == "create" {
event.TimeStamp = objectMeta.CreationTimestamp.Time
}

if eventType == "delete" {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
}

if kind != "events" {
Expand Down
16 changes: 12 additions & 4 deletions pkg/execute/executor.go
Expand Up @@ -33,6 +33,12 @@ var validNotifierCommands = map[string]bool{

var kubectlBinary = "/usr/local/bin/kubectl"

const (
notifierStopMsg = "Brace yourselves, notifications are coming."
notifierStartMsg = "Sure! I won't send you notifications anymore."
unsupportedCmdMsg = "Command not supported. Please run '@kubeops help' to see supported commands."
)

// Executor is an interface for processes to execute commands
type Executor interface {
Execute() string
Expand All @@ -59,7 +65,7 @@ func (e *DefaultExecutor) Execute() string {
if validNotifierCommands[args[0]] {
return runNotifierCommand(args)
}
return "Command not supported. Please run '@kubeops help' to see supported commands"
return unsupportedCmdMsg
}

func printHelp() string {
Expand Down Expand Up @@ -88,7 +94,7 @@ func printHelp() string {
}

func printDefaultMsg() string {
return "Command not supported. Please run '@kubeops help' to see supported commands"
return unsupportedCmdMsg
}

func runKubectlCommand(args []string) string {
Expand Down Expand Up @@ -132,11 +138,13 @@ func runNotifierCommand(args []string) string {
}
if args[1] == "start" {
config.Notify = true
return "Notifier started!"
log.Logger.Info("Notifier enabled")
return notifierStartMsg
}
if args[1] == "stop" {
config.Notify = false
return "Notifier stopped!"
log.Logger.Info("Notifier disabled")
return notifierStopMsg
}
if args[1] == "status" {
if config.Notify == false {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filterengine/filters/ingress_validator.go
Expand Up @@ -38,7 +38,7 @@ func (iv *IngressValidator) Run(object interface{}, event *events.Event) {
}
_, err := ValidServicePort(serviceName, ns, int32(servicePort))
if err != nil {
event.Messages = append(event.Messages, "Warning: Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Messages = append(event.Messages, "Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Level = events.Warn
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/notify/notify.go
Expand Up @@ -6,5 +6,6 @@ import (

// Notifier to send event notification on the communication channels
type Notifier interface {
Send(events.Event) error
SendEvent(events.Event) error
SendMessage(string) error
}
31 changes: 27 additions & 4 deletions pkg/notify/slack.go
Expand Up @@ -40,8 +40,8 @@ func NewSlack() Notifier {
}
}

// Send event notification to slack
func (s *Slack) Send(event events.Event) error {
// SendEvent sends event notification to slack
func (s *Slack) SendEvent(event events.Event) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", event))

api := slack.New(s.Token)
Expand All @@ -63,7 +63,12 @@ func (s *Slack) Send(event events.Event) error {
},
},
Footer: "kubeops",
Ts: json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)),
}

// Add timestamp
ts := json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10))
if ts > "0" {
attachment.Ts = ts
}

if event.Namespace != "" {
Expand Down Expand Up @@ -114,7 +119,6 @@ func (s *Slack) Send(event events.Event) error {
attachment.Color = attachmentColor[event.Level]
params.Attachments = []slack.Attachment{attachment}

log.Logger.Infof("Sending message on %v with token %s", s.Channel, s.Token)
channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
Expand All @@ -124,3 +128,22 @@ func (s *Slack) Send(event events.Event) error {
log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}

// SendMessage sends message to slack channel
func (s *Slack) SendMessage(msg string) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", msg))

api := slack.New(s.Token)
params := slack.PostMessageParameters{
AsUser: true,
}

channelID, timestamp, err := api.PostMessage(s.Channel, msg, params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
return err
}

log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}