Skip to content

Commit

Permalink
Add Object Annotation filter
Browse files Browse the repository at this point in the history
This commit, 
- enables filtering of events based on annotations present in objects at run time.
- annotation `botkube.io/disable: true` disables event notifications for the annotated object
- annotation `botkube.io/channel: <channel_name>` sends events notifications of the annotated object to the mentioned channel.
- adds func `ExtractAnnotations()`. It extract annotations from Event.InvolvedObject and adds them to event.Metadata.Annotations
- implements individual actions using internal functions.
- adds unit tests for internal functions.
- replaces Init() with InitialiseKubeClient() to decouple config.yaml and KubeClinet dependencies from unit testing
  • Loading branch information
codenio committed Aug 7, 2019
1 parent 5013c3b commit afca609
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 8 deletions.
3 changes: 2 additions & 1 deletion pkg/events/events.go
Expand Up @@ -42,6 +42,7 @@ type Event struct {
Error string
Level Level
Cluster string
Channel string
TimeStamp time.Time
Count int32
Action string
Expand Down Expand Up @@ -204,7 +205,7 @@ func (event *Event) Message() (msg string) {
switch event.Type {
case config.CreateEvent, config.DeleteEvent, config.UpdateEvent:
msg = fmt.Sprintf(
"%s `%s` in of cluster `%s`, namespace `%s` has been %s:\n```%s```",
"%s `%s` of cluster `%s`, namespace `%s` has been %s:\n```%s```",
event.Kind,
event.Name,
event.Cluster,
Expand Down
@@ -0,0 +1,83 @@
package filters

import (
"github.com/infracloudio/botkube/pkg/events"
"github.com/infracloudio/botkube/pkg/filterengine"
log "github.com/infracloudio/botkube/pkg/logging"
"github.com/infracloudio/botkube/pkg/utils"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// DisableAnnotation is the object disable annotation
DisableAnnotation string = "botkube.io/disable"
// ChannelAnnotation is the multichannel support annotation
ChannelAnnotation string = "botkube.io/channel"
)

// ObjectAnnotationChecker add recommendations to the event object if pod created without any labels
type ObjectAnnotationChecker struct {
Description string
}

// Register filter
func init() {
filterengine.DefaultFilterEngine.Register(ObjectAnnotationChecker{
Description: "Checks if annotations botkube.io/* present in object specs and filters them.",
})
}

// Run filters and modifies event struct
func (f ObjectAnnotationChecker) Run(object interface{}, event *events.Event) {

// get objects metadata
obj := utils.GetObjectMetaData(object)

// if eventObj, ok := object.(*apiV1.Event); ok {
// // check annotations of the involved object
// eventObj = utils.ExtractAnnotaions(eventObj)
// obj = eventObj.ObjectMeta
// log.Logger.Debugf("Event Object >>> %+v",eventObj)
// }

// Check annotations in object
if isObjectNotifDisabled(obj) {
event.Skip = true
log.Logger.Debug("Object Notification Disable through annotations")
}

if channel, ok := reconfigureChannel(obj); ok {
event.Channel = channel
log.Logger.Debugf("Redirecting Event Notifications to channel: %s", channel)
}

log.Logger.Debug("Object annotations filter successful!")
}

// Describe filter
func (f ObjectAnnotationChecker) Describe() string {
return f.Description
}

// isObjectNotifDisabled checks annotation botkube.io/disable
// annotation botkube.io/disable disables the event notifications from objects
func isObjectNotifDisabled(obj metaV1.ObjectMeta) bool {

if obj.Annotations[DisableAnnotation] == "true" {
log.Logger.Debug("Skipping Disabled Event Notifications!")
return true
}
return false
}

// reconfigureChannel checks annotation botkube.io/channel
// annotation botkube.io/channel directs event notifications to channels
// based on the channel names present in them
// Note: Add botkube app into the desired channel to receive notifications
func reconfigureChannel(obj metaV1.ObjectMeta) (string, bool) {
// redirect messages to channels based on annotations
if channel, ok := obj.Annotations[ChannelAnnotation]; ok {
return channel, true
}
return "", false
}
@@ -0,0 +1,50 @@
package filters

import (
"testing"

metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestIsObjectNotifDisabled(t *testing.T) {
tests := map[string]struct {
annotaion metaV1.ObjectMeta
expected bool
}{
`Empty ObjectMeta`: {metaV1.ObjectMeta{}, false},
`ObjectMeta with some annotations`: {metaV1.ObjectMeta{Annotations: map[string]string{"foo": "bar"}}, false},
`ObjectMeta with disable false`: {metaV1.ObjectMeta{Annotations: map[string]string{"botkube.io/disable": "false"}}, false},
`ObjectMeta with disable true`: {metaV1.ObjectMeta{Annotations: map[string]string{"botkube.io/disable": "true"}}, true},
}
for name, test := range tests {
name, test := name, test
t.Run(name, func(t *testing.T) {
if actual := isObjectNotifDisabled(test.annotaion); actual != test.expected {
t.Errorf("expected: %+v != actual: %+v\n", test.expected, actual)
}
})
}
}

func TestReconfigureChannel(t *testing.T) {
tests := map[string]struct {
objectMeta metaV1.ObjectMeta
expectedChannel string
expectedBool bool
}{
`Empty ObjectMeta`: {metaV1.ObjectMeta{}, "", false},
`ObjectMeta with some annotations`: {metaV1.ObjectMeta{Annotations: map[string]string{"foo": "bar"}}, "", false},
`ObjectMeta with channel ""`: {metaV1.ObjectMeta{Annotations: map[string]string{"botkube.io/channel": ""}}, "", false},
`ObjectMeta with channel foo-channel`: {metaV1.ObjectMeta{Annotations: map[string]string{"botkube.io/channel": "foo-channel"}}, "foo-channel", true},
}
for name, test := range tests {
name, test := name, test
t.Run(name, func(t *testing.T) {
if actualChannel, actualBool := reconfigureChannel(test.objectMeta); actualBool != test.expectedBool {
if actualChannel != test.expectedChannel {
t.Errorf("expected: %+v != actual: %+v\n", test.expectedChannel, actualChannel)
}
}
})
}
}
31 changes: 25 additions & 6 deletions pkg/notify/slack.go
Expand Up @@ -145,13 +145,32 @@ func (s *Slack) SendEvent(event events.Event) error {
attachment.Color = attachmentColor[event.Level]
params.Attachments = []slack.Attachment{attachment}

channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
return err
// non empty value in event.channel demands redirection of events to a different channel
if event.Channel != "" {
channelID, timestamp, err := api.PostMessage(event.Channel, "", params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
// send error message to default channel
if err.Error() == "channel_not_found" {
msg := fmt.Sprintf("Unable to send message to Channel `%s`: `%s`\n```add Botkube app to the Channel %s\nMissed events follows below:```", event.Channel, err.Error(), event.Channel)
go s.SendMessage(msg)
// sending missed event to default channel
// reset event.Channel and send event
event.Channel = ""
go s.SendEvent(event)
}
return err
}
log.Logger.Debugf("Event successfully sent to channel %s at %s", channelID, timestamp)
} else {
// empty value in event.channel sends notifications to default channel.
channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
return err
}
log.Logger.Debugf("Event successfully sent to channel %s at %s", channelID, timestamp)
}

log.Logger.Debugf("Event successfully sent to channel %s at %s", channelID, timestamp)
return nil
}

Expand Down
138 changes: 137 additions & 1 deletion pkg/utils/utils.go
Expand Up @@ -32,7 +32,14 @@ var (
KubeClient kubernetes.Interface
)

func init() {
// InitialiseKubeClient func initialises
// - KubeClient interface to interact with the cluster
// - RtObjectMap, ResourceGetterMap in memory maps
// - ClusterNamespaces with list of namespaces available in the cluster
//
// Note: Init() is replaced by InitialiseKubeClient() to
// decouple config.yaml and KubeClinet dependencies from unit testing
func InitialiseKubeClient() {
kubeConfig, err := rest.InClusterConfig()
if err != nil {
kubeconfigPath := os.Getenv("KUBECONFIG")
Expand Down Expand Up @@ -138,6 +145,17 @@ func GetObjectMetaData(obj interface{}) metaV1.ObjectMeta {
switch object := obj.(type) {
case *apiV1.Event:
objectMeta = object.ObjectMeta
// pass InvolvedObject`s annotations into Event`s annotations
// for filtering event objects based on InvolvedObject`s annotations
if len(objectMeta.Annotations) == 0 {
objectMeta.Annotations = ExtractAnnotaions(object)
} else {
// Append InvolvedObject`s annotations to existing event object`s annotations map
for key, value := range ExtractAnnotaions(object) {
objectMeta.Annotations[key] = value
}
}

case *apiV1.Pod:
objectMeta = object.ObjectMeta
case *apiV1.Node:
Expand Down Expand Up @@ -236,3 +254,121 @@ func DeleteDoubleWhiteSpace(slice []string) []string {
}
return result
}

// ExtractAnnotaions returns annotations of InvolvedObject for the given event
func ExtractAnnotaions(obj *apiV1.Event) map[string]string {

switch obj.InvolvedObject.Kind {
case "Pod":
object, err := KubeClient.CoreV1().Pods(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Node":
object, err := KubeClient.CoreV1().Nodes().Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Namespace":
object, err := KubeClient.CoreV1().Namespaces().Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "PersistentVolume":
object, err := KubeClient.CoreV1().PersistentVolumes().Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "PersistentVolumeClaim":
object, err := KubeClient.CoreV1().PersistentVolumeClaims(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "ReplicationController":
object, err := KubeClient.CoreV1().ReplicationControllers(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Service":
object, err := KubeClient.CoreV1().Services(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Secret":
object, err := KubeClient.CoreV1().Secrets(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "ConfigMap":
object, err := KubeClient.CoreV1().ConfigMaps(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "DaemonSet":
object, err := KubeClient.ExtensionsV1beta1().DaemonSets(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Ingress":
object, err := KubeClient.ExtensionsV1beta1().Ingresses(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)

case "ReplicaSet":
object, err := KubeClient.ExtensionsV1beta1().ReplicaSets(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Deployment":
object, err := KubeClient.ExtensionsV1beta1().Deployments(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Job":
object, err := KubeClient.BatchV1().Jobs(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "Role":
object, err := KubeClient.RbacV1().Roles(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "RoleBinding":
object, err := KubeClient.RbacV1().RoleBindings(obj.InvolvedObject.Namespace).Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "ClusterRole":
object, err := KubeClient.RbacV1().ClusterRoles().Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
case "ClusterRoleBinding":
object, err := KubeClient.RbacV1().ClusterRoleBindings().Get(obj.InvolvedObject.Name, metaV1.GetOptions{})
if err == nil {
return object.ObjectMeta.Annotations
}
log.Logger.Error(err)
}

return map[string]string{}
}

0 comments on commit afca609

Please sign in to comment.