Skip to content

Commit

Permalink
Add support for elasticsearch interface
Browse files Browse the repository at this point in the history
  • Loading branch information
PrasadG193 committed Mar 22, 2019
1 parent 81553b8 commit 79fcde3
Show file tree
Hide file tree
Showing 276 changed files with 50,409 additions and 31 deletions.
26 changes: 26 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Expand Up @@ -21,3 +21,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/olivere/elastic"
version = "6.0.0"
6 changes: 4 additions & 2 deletions cmd/botkube/main.go
Expand Up @@ -17,8 +17,10 @@ func main() {
}
log.Logger.Info(fmt.Sprintf("Configuration:: %+v\n", Config))

sb := slack.NewSlackBot()
go sb.Start()
if Config.Communications.Slack.Enable {
sb := slack.NewSlackBot()
go sb.Start()
}

controller.RegisterInformers(Config)
}
15 changes: 15 additions & 0 deletions config.yaml
Expand Up @@ -115,10 +115,25 @@ recommendations: true

# Channels configuration
communications:
# Settings for Slack
slack:
enable: false
channel: 'SLACK_CHANNEL'
token: 'SLACK_API_TOKEN'

# Settings for ELS
elasticsearch:
enable: false
server: 'ELASTICSEARCH_ADDRESS' # e.g https://example.com:9243
username: 'ELASTICSEARCH_USERNAME'
password: 'ELASTICSEARCH_PASSWORD'
# ELS index settings
index:
name: botkube
type: botkube-event
shards: 1
replicas: 0

# Setting to support multiple clusters
settings:
# Cluster name to differentiate incoming messages
Expand Down
15 changes: 15 additions & 0 deletions helm/botkube/values.yaml
Expand Up @@ -130,10 +130,25 @@ config:

# Channels configuration
communications:
# Settings for Slack
slack:
enable: false
channel: 'SLACK_CHANNEL'
token: 'SLACK_API_TOKEN'

# Settings for ELS
elasticsearch:
enable: false
server: 'ELASTICSEARCH_ADDRESS' # e.g https://example.com:9243
username: 'ELASTICSEARCH_USERNAME'
password: 'ELASTICSEARCH_PASSWORD'
# ELS index settings
index:
name: botkube
type: botkube-event
shards: 1
replicas: 0

# Setting to support multiple clusters
settings:
# Cluster name to differentiate incoming messages
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/config.go
Expand Up @@ -37,14 +37,34 @@ type Resource struct {
// Communications channels to send events to
type Communications struct {
Slack Slack

ElasticSearch ElasticSearch
}

// Slack configuration to authentication and send notifications
type Slack struct {
Enable bool
Channel string
Token string
}

// ElasticSearch config auth settings
type ElasticSearch struct {
Enable bool
Username string
Password string
Server string
Index Index
}

// Index settings for ELS
type Index struct {
Name string
Type string
Shards int
Replicas int
}

// Settings for multicluster support
type Settings struct {
ClusterName string
Expand Down
43 changes: 26 additions & 17 deletions pkg/controller/controller.go
Expand Up @@ -40,7 +40,7 @@ func findNamespace(ns string) string {

// RegisterInformers creates new informer controllers to watch k8s resources
func RegisterInformers(c *config.Config) {
sendMessage(fmt.Sprintf(controllerStartMsg, c.Settings.ClusterName))
sendMessage(c, fmt.Sprintf(controllerStartMsg, c.Settings.ClusterName))
startTime = time.Now().Local()

// Get resync period
Expand Down Expand Up @@ -75,7 +75,7 @@ func RegisterInformers(c *config.Config) {
watchlist,
object,
time.Duration(rsyncTime)*time.Minute,
registerEventHandlers(r.Name, r.Events),
registerEventHandlers(c, r.Name, r.Events),
)
stopCh := make(chan struct{})
defer close(stopCh)
Expand Down Expand Up @@ -113,7 +113,7 @@ func RegisterInformers(c *config.Config) {
if (utils.AllowedEventKindsMap[utils.EventKind{kind, "all"}] ||
utils.AllowedEventKindsMap[utils.EventKind{kind, ns}]) && (utils.AllowedEventTypesMap[eType]) {
log.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
sendEvent(obj, "events", "create", err)
sendEvent(obj, c, "events", "create", err)
}
},
},
Expand All @@ -128,39 +128,39 @@ func RegisterInformers(c *config.Config) {
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
<-sigterm
sendMessage(fmt.Sprintf(controllerStopMsg, c.Settings.ClusterName))
sendMessage(c, fmt.Sprintf(controllerStopMsg, c.Settings.ClusterName))
}

func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
func registerEventHandlers(c *config.Config, resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
for _, event := range events {
if event == "all" || event == "create" {
handlerFns.AddFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Logger.Debugf("Processing add to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "create", err)
sendEvent(obj, c, resourceType, "create", err)
}
}

if event == "all" || event == "update" {
handlerFns.UpdateFunc = func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
log.Logger.Debugf("Processing update to %v: %s", resourceType, key)
sendEvent(new, resourceType, "update", err)
sendEvent(new, c, resourceType, "update", err)
}
}

if event == "all" || event == "delete" {
handlerFns.DeleteFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "delete", err)
sendEvent(obj, c, resourceType, "delete", err)
}
}
}
return handlerFns
}

func sendEvent(obj interface{}, kind, eventType string, err error) {
func sendEvent(obj interface{}, c *config.Config, kind, eventType string, err error) {
if err != nil {
log.Logger.Error("Error while receiving event: ", err.Error())
return
Expand All @@ -178,7 +178,7 @@ func sendEvent(obj interface{}, kind, eventType string, err error) {
// Skip older events
if eventType == "delete" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
if objectMeta.DeletionTimestamp != nil && objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
log.Logger.Debug("Skipping older events")
return
}
Expand All @@ -199,17 +199,26 @@ func sendEvent(obj interface{}, kind, eventType string, err error) {
return
}

// Send notification to communication chennel
notifier := notify.NewSlack()
notifier.SendEvent(event)
var notifier notify.Notifier
// Send notification to communication channel
if c.Communications.Slack.Enable {
notifier = notify.NewSlack(c)
go notifier.SendEvent(event)
}

if c.Communications.ElasticSearch.Enable {
notifier = notify.NewElasticSearch(c)
go notifier.SendEvent(event)
}
}

func sendMessage(msg string) {
func sendMessage(c *config.Config, msg string) {
if len(msg) <= 0 {
log.Logger.Warn("sendMessage received string with length 0. Hence skipping.")
return
}

notifier := notify.NewSlack()
notifier.SendMessage(msg)
if c.Communications.Slack.Enable {
notifier := notify.NewSlack(c)
notifier.SendMessage(msg)
}
}
4 changes: 3 additions & 1 deletion pkg/events/events.go
Expand Up @@ -77,7 +77,9 @@ func New(object interface{}, eventType string, kind string) Event {
}

if eventType == "delete" {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
if objectMeta.DeletionTimestamp != nil {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
}
}

if kind != "events" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filterengine/filters/image_tag_checker.go
Expand Up @@ -43,5 +43,5 @@ func (f *ImageTagChecker) Run(object interface{}, event *events.Event) {
event.Recommendations = append(event.Recommendations, ":latest tag used in image '"+c.Image+"' of Container '"+c.Name+"' should be avoided.\n")
}
}
log.Logger.Info("Image tag filter successful!")
log.Logger.Debug("Image tag filter successfull!")
}
100 changes: 100 additions & 0 deletions pkg/notify/elasticsearch.go
@@ -0,0 +1,100 @@
package notify

import (
"context"
"fmt"

"github.com/infracloudio/botkube/pkg/config"
"github.com/infracloudio/botkube/pkg/events"
log "github.com/infracloudio/botkube/pkg/logging"
"github.com/olivere/elastic"
)

// ElasticSearch contains auth cred and index setting
type ElasticSearch struct {
Username string
Password string
Server string
Index string
Shards int
Replicas int
Type string
}

// NewElasticSearch returns new Slack object
func NewElasticSearch(c *config.Config) Notifier {
return &ElasticSearch{
Username: c.Communications.ElasticSearch.Username,
Password: c.Communications.ElasticSearch.Password,
Server: c.Communications.ElasticSearch.Server,
Index: c.Communications.ElasticSearch.Index.Name,
Type: c.Communications.ElasticSearch.Index.Type,
Shards: c.Communications.ElasticSearch.Index.Shards,
Replicas: c.Communications.ElasticSearch.Index.Replicas,
}
}

type mapping struct {
Settings settings `json:"settings"`
}

type settings struct {
Index index `json:"index"`
}
type index struct {
Shards int `json:"number_of_shards"`
Replicas int `json:"number_of_replicas"`
}

// SendEvent sends event notification to slack
func (e *ElasticSearch) SendEvent(event events.Event) error {
log.Logger.Debug(fmt.Sprintf(">> Sending to ElasticSearch: %+v", event))
ctx := context.Background()
client, err := elastic.NewClient(elastic.SetURL(e.Server), elastic.SetBasicAuth(e.Username, e.Password), elastic.SetSniff(false), elastic.SetHealthcheck(false), elastic.SetGzip(true))
if err != nil {
log.Logger.Error(fmt.Sprintf("Failed to create els client. Error:%s", err.Error()))
return err
}

// Create index if not exists
exists, err := client.IndexExists(e.Index).Do(ctx)
if err != nil {
log.Logger.Error(fmt.Sprintf("Failed to get index. Error:%s", err.Error()))
return err
}
if !exists {
// Create a new index.
mapping := mapping{
Settings: settings{
index{
Shards: e.Shards,
Replicas: e.Replicas,
},
},
}
_, err := client.CreateIndex(e.Index).BodyJson(mapping).Do(ctx)
if err != nil {
log.Logger.Error(fmt.Sprintf("Failed to create index. Error:%s", err.Error()))
return err
}
}

// Send event to els
_, err = client.Index().Index(e.Index).Type(e.Type).BodyJson(event).Do(ctx)
if err != nil {
log.Logger.Error(fmt.Sprintf("Failed to post data to els. Error:%s", err.Error()))
return err
}
_, err = client.Flush().Index(e.Index).Do(ctx)
if err != nil {
log.Logger.Error(fmt.Sprintf("Failed to flush data to els. Error:%s", err.Error()))
return err
}
log.Logger.Debugf("Event successfully sent to ElasticSearch index %s", e.Index)
return nil
}

// SendMessage sends message to slack channel
func (e *ElasticSearch) SendMessage(msg string) error {
return nil
}

0 comments on commit 79fcde3

Please sign in to comment.