Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send bot start/stop messages to slack channel #3

Merged
merged 3 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion helm/kubeops/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replicaCount: 1

image:
repository: infracloud/kubeops
tag: "0.5"
tag: "0.6"
pullPolicy: Always

nameOverride: ""
Expand Down
75 changes: 53 additions & 22 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package controller

import (
//"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/infracloudio/kubeops/pkg/config"
"github.com/infracloudio/kubeops/pkg/events"
"github.com/infracloudio/kubeops/pkg/filterengine"
"github.com/infracloudio/kubeops/pkg/logging"
log "github.com/infracloudio/kubeops/pkg/logging"
"github.com/infracloudio/kubeops/pkg/notify"
"github.com/infracloudio/kubeops/pkg/utils"

Expand All @@ -34,10 +34,23 @@ func findNamespace(ns string) string {

// RegisterInformers creates new informer controllers to watch k8s resources
func RegisterInformers(c *config.Config) {
sendMessage("...and now my watch begins! :crossed_swords:")
startTime = time.Now().Local()

// Get resync period
rsyncTimeStr, ok := os.LookupEnv("INFORMERS_RESYNC_PERIOD")
if !ok {
rsyncTimeStr = "30"
}

rsyncTime, err := strconv.Atoi(rsyncTimeStr)
if err != nil {
log.Logger.Fatal("Error in reading INFORMERS_RESYNC_PERIOD env var.", err)
}

// Register informers for resource lifecycle events
if len(c.Resources) > 0 {
logging.Logger.Info("Registering resource lifecycle informer")
log.Logger.Info("Registering resource lifecycle informer")
for _, r := range c.Resources {
if _, ok := utils.ResourceGetterMap[r.Name]; !ok {
continue
Expand All @@ -47,15 +60,15 @@ func RegisterInformers(c *config.Config) {
continue
}
for _, ns := range r.Namespaces {
logging.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)
log.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)

watchlist := cache.NewListWatchFromClient(
utils.ResourceGetterMap[r.Name], r.Name, findNamespace(ns), fields.Everything())

_, controller := cache.NewInformer(
watchlist,
object,
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
registerEventHandlers(r.Name, r.Events),
)
stopCh := make(chan struct{})
Expand All @@ -69,14 +82,14 @@ func RegisterInformers(c *config.Config) {

// Register informers for k8s events
if len(c.Events.Types) > 0 {
logging.Logger.Info("Registering kubernetes events informer")
log.Logger.Info("Registering kubernetes events informer")
watchlist := cache.NewListWatchFromClient(
utils.KubeClient.CoreV1().RESTClient(), "events", apiV1.NamespaceAll, fields.Everything())

_, controller := cache.NewInformer(
watchlist,
&apiV1.Event{},
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
Expand All @@ -89,12 +102,12 @@ func RegisterInformers(c *config.Config) {
ns := eventObj.InvolvedObject.Namespace
eType := strings.ToLower(eventObj.Type)

logging.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
log.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
// Filter and forward
if (utils.AllowedEventKindsMap[utils.EventKind{kind, "all"}] ||
utils.AllowedEventKindsMap[utils.EventKind{kind, ns}]) && (utils.AllowedEventTypesMap[eType]) {
logging.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
logEvent(obj, "events", "create", err)
log.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
sendEvent(obj, "events", "create", err)
}
},
},
Expand All @@ -109,56 +122,65 @@ func RegisterInformers(c *config.Config) {
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
<-sigterm

sendMessage("my watch has ended!")
}

func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
for _, event := range events {
if event == "all" || event == "create" {
handlerFns.AddFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing add to %v: %s", resourceType, key)
logEvent(obj, resourceType, "create", err)
log.Logger.Debugf("Processing add to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "create", err)
}
}

if event == "all" || event == "update" {
handlerFns.UpdateFunc = func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
logging.Logger.Debugf("Processing update to %v: %s", resourceType, key)
logEvent(new, resourceType, "update", err)
log.Logger.Debugf("Processing update to %v: %s", resourceType, key)
sendEvent(new, resourceType, "update", err)
}
}

if event == "all" || event == "delete" {
handlerFns.DeleteFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
logEvent(obj, resourceType, "delete", err)
log.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "delete", err)
}
}
}
return handlerFns
}

func logEvent(obj interface{}, kind, eventType string, err error) {
func sendEvent(obj interface{}, kind, eventType string, err error) {
if err != nil {
logging.Logger.Error("Error while receiving event: ", err.Error())
log.Logger.Error("Error while receiving event: ", err.Error())
return
}

// Skip older events
if eventType == "create" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.CreationTimestamp.Sub(startTime).Seconds() <= 0 {
logging.Logger.Debug("Skipping older events")
log.Logger.Debug("Skipping older events")
return
}
}

// Skip older events
if eventType == "delete" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
log.Logger.Debug("Skipping older events")
return
}
}

// Check if Notify disabled
if !config.Notify {
logging.Logger.Info("Skipping notification")
log.Logger.Info("Skipping notification")
return
}

Expand All @@ -168,5 +190,14 @@ func logEvent(obj interface{}, kind, eventType string, err error) {

// Send notification to communication chennel
notifier := notify.NewSlack()
notifier.Send(event)
notifier.SendEvent(event)
}

func sendMessage(msg string) {
Copy link
Contributor

@sanketsudake sanketsudake Jan 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this function in notifier package

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ssudake21 we already have notifier.SendMessage(msg) function in notifier package

if len(msg) <= 0 {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log warning here

}

notifier := notify.NewSlack()
notifier.SendMessage(msg)
}
12 changes: 10 additions & 2 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var LevelMap map[string]Level
func init() {
LevelMap = make(map[string]Level)
LevelMap["create"] = Info
LevelMap["Update"] = Debug
LevelMap["update"] = Warn
LevelMap["delete"] = Critical
LevelMap["error"] = Error
LevelMap["Warning"] = Critical
Expand All @@ -69,7 +69,15 @@ func New(object interface{}, eventType string, kind string) Event {
Kind: objectTypeMeta.Kind,
Level: LevelMap[eventType],
Type: eventType,
TimeStamp: objectMeta.CreationTimestamp.Time,
}

// Add TimeStamps
if eventType == "create" {
event.TimeStamp = objectMeta.CreationTimestamp.Time
}

if eventType == "delete" {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
}

if kind != "events" {
Expand Down
6 changes: 4 additions & 2 deletions pkg/execute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,13 @@ func runNotifierCommand(args []string) string {
}
if args[1] == "start" {
config.Notify = true
return "Notifier started!"
log.Logger.Info("Notifier enabled")
return "Brace yourselves, notifications are coming."
PrasadG193 marked this conversation as resolved.
Show resolved Hide resolved
}
if args[1] == "stop" {
config.Notify = false
return "Notifier stopped!"
log.Logger.Info("Notifier disabled")
return "Sure! I won't send you notifications anymore."
}
if args[1] == "status" {
if config.Notify == false {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filterengine/filters/ingress_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (iv *IngressValidator) Run(object interface{}, event *events.Event) {
}
_, err := ValidServicePort(serviceName, ns, int32(servicePort))
if err != nil {
event.Messages = append(event.Messages, "Warning: Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Messages = append(event.Messages, "Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Level = events.Warn
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import (

// Notifier to send event notification on the communication channels
type Notifier interface {
Send(events.Event) error
SendEvent(events.Event) error
SendMessage(string) error
}
31 changes: 27 additions & 4 deletions pkg/notify/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewSlack() Notifier {
}
}

// Send event notification to slack
func (s *Slack) Send(event events.Event) error {
// SendEvent sends event notification to slack
func (s *Slack) SendEvent(event events.Event) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", event))

api := slack.New(s.Token)
Expand All @@ -63,7 +63,12 @@ func (s *Slack) Send(event events.Event) error {
},
},
Footer: "kubeops",
Ts: json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)),
}

// Add timestamp
ts := json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10))
if ts > "0" {
attachment.Ts = ts
}

if event.Namespace != "" {
Expand Down Expand Up @@ -114,7 +119,6 @@ func (s *Slack) Send(event events.Event) error {
attachment.Color = attachmentColor[event.Level]
params.Attachments = []slack.Attachment{attachment}

log.Logger.Infof("Sending message on %v with token %s", s.Channel, s.Token)
channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
Expand All @@ -124,3 +128,22 @@ func (s *Slack) Send(event events.Event) error {
log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}

// SendMessage sends message to slack channel
func (s *Slack) SendMessage(msg string) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", msg))

api := slack.New(s.Token)
params := slack.PostMessageParameters{
AsUser: true,
}

channelID, timestamp, err := api.PostMessage(s.Channel, msg, params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
return err
}

log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}