Skip to content

Commit

Permalink
notifier: Allow custom notifiers to be registered.
Browse files Browse the repository at this point in the history
  • Loading branch information
Quentin-M committed Dec 16, 2015
1 parent 7ee1481 commit 3ff8bfa
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 120 deletions.
3 changes: 1 addition & 2 deletions clair.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func Boot(config *config.Config) {

// Start notifier
st.Begin()
notifier := notifier.New(config.Notifier)
go notifier.Serve(st)
go notifier.Run(config.Notifier, st)

// Start API
st.Begin()
Expand Down
1 change: 1 addition & 0 deletions cmd/clair/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/coreos/pkg/capnslog"

// Register components
_ "github.com/coreos/clair/notifier/notifiers"
_ "github.com/coreos/clair/updater/fetchers"
_ "github.com/coreos/clair/worker/detectors/os"
_ "github.com/coreos/clair/worker/detectors/packages"
Expand Down
16 changes: 9 additions & 7 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ updater:
# Use 0 to disable the updater entirely.
interval: 2h
notifier:
# HTTP endpoint that will receive notifications with POST requests.
endpoint:
# Server name and path to certificates to call the endpoint securely with TLS and client certificate auth.
servername:
cafile:
keyfile:
certfile:
# Configuration for HTTP notifier
http:
# Endpoint that will receive notifications with POST requests.
endpoint:
# Server name and path to certificates to call the endpoint securely with TLS and client certificate auth.
servername:
cafile:
keyfile:
certfile:
8 changes: 2 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ type UpdaterConfig struct {
Interval time.Duration
}

// NotifierConfig is the configuration for the Notifier service.
// NotifierConfig is the configuration for the Notifier service and its registered notifiers.
type NotifierConfig struct {
Endpoint string
ServerName string
CertFile string
KeyFile string
CAFile string
Params map[string]interface{} `yaml:",inline"`
}

// APIConfig is the configuration for the API service.
Expand Down
6 changes: 2 additions & 4 deletions docs/Notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

This tool can send notifications to external services when specific events happen, such as vulnerability updates.

For now, it only supports transmitting them to an HTTP endpoint using POST requests, but it may be extended quite easily.
To enable the notification system, specify the following command-line arguments:

--notifier-type=http --notifier-http-url="http://your-notification-endpoint"
For now, it only supports transmitting them to an HTTP endpoint using POST requests, but it can be extended quite easily by registering a new Notifier kind.
To enable the notification system, you simply have to specify the appropriate configuration. See the [example configuration](../config.example.yaml).

# Types of notifications

Expand Down
157 changes: 56 additions & 101 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@
package notifier

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/coreos/pkg/capnslog"
Expand Down Expand Up @@ -52,77 +45,79 @@ type Notification struct {
Content interface{}
}

// A Notifier dispatches notifications to an HTTP endpoint.
type Notifier struct {
lockIdentifier string
endpoint string
client *http.Client
}
var notifiers = make(map[string]Notifier)

// New initializes a new Notifier from the specified configuration.
func New(config *config.NotifierConfig) *Notifier {
if config == nil {
return &Notifier{}
}
// Notifier represents anything that can transmit notifications.
type Notifier interface {
// Configure attempts to initialize the notifier with the provided configuration.
// It returns whether the notifier is enabled or not.
Configure(*config.NotifierConfig) (bool, error)
// Send transmits the specified notification.
Send(notification *Notification) error
}

// Validate endpoint URL.
if _, err := url.Parse(config.Endpoint); err != nil {
log.Error("could not create a notifier with an invalid endpoint URL")
return &Notifier{}
// RegisterNotifier makes a Fetcher available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func RegisterNotifier(name string, n Notifier) {
if name == "" {
panic("notifier: could not register a Notifier with an empty name")
}

// Initialize TLS.
tlsConfig, err := loadTLSClientConfig(config)
if err != nil {
log.Fatalf("could not initialize client cert authentication: %s\n", err)
}
if tlsConfig != nil {
log.Info("notifier configured with client certificate authentication")
if n == nil {
panic("notifier: could not register a nil Notifier")
}

httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
if _, dup := notifiers[name]; dup {
panic("notifier: RegisterNotifier called twice for " + name)
}

return &Notifier{
lockIdentifier: uuid.New(),
endpoint: config.Endpoint,
client: httpClient,
}
notifiers[name] = n
}

// Serve starts the Notifier.
func (n *Notifier) Serve(stopper *utils.Stopper) {
// Run starts the Notifier service.
func Run(config *config.NotifierConfig, stopper *utils.Stopper) {
defer stopper.End()

// Do not run the updater if the endpoint is empty.
if n.endpoint == "" {
log.Infof("notifier service is disabled.")
// Configure registered notifiers.
for notifierName, notifier := range notifiers {
if configured, err := notifier.Configure(config); configured {
log.Infof("notifier '%s' configured\n", notifierName)
} else {
delete(notifiers, notifierName)
if err != nil {
log.Errorf("could not configure notifier '%s': %s", notifierName, err)
}
}
}

// Do not run the updater if there is no notifier enabled.
if len(notifiers) == 0 {
log.Infof("notifier service is disabled")
return
}

// Register healthchecker.
health.RegisterHealthchecker("notifier", n.Healthcheck)
whoAmI := uuid.New()
log.Infof("notifier service started. lock identifier: %s\n", whoAmI)

log.Infof("notifier service started. endpoint: %s. lock identifier: %s\n", n.endpoint, n.lockIdentifier)
// Register healthchecker.
health.RegisterHealthchecker("notifier", Healthcheck)

for {
// Find task.
// TODO(Quentin-M): Combine node and notification.
node, notification := n.findTask(stopper)
node, notification := findTask(whoAmI, stopper)
if node == "" && notification == nil {
break
}

// Handle task.
done := make(chan bool, 1)
go func() {
if n.handleTask(node, notification) {
if handleTask(notification) {
database.MarkNotificationAsSent(node)
}
database.Unlock(node, n.lockIdentifier)
database.Unlock(node, whoAmI)
done <- true
}()

Expand All @@ -133,15 +128,15 @@ func (n *Notifier) Serve(stopper *utils.Stopper) {
case <-done:
break outer
case <-time.After(refreshLockDuration):
database.Lock(node, lockDuration, n.lockIdentifier)
database.Lock(node, lockDuration, whoAmI)
}
}
}

log.Info("notifier service stopped")
}

func (n *Notifier) findTask(stopper *utils.Stopper) (string, database.Notification) {
func findTask(whoAmI string, stopper *utils.Stopper) (string, database.Notification) {
for {
// Find a notification to send.
node, notification, err := database.FindOneNotificationToSend(database.GetDefaultNotificationWrapper())
Expand All @@ -158,14 +153,14 @@ func (n *Notifier) findTask(stopper *utils.Stopper) (string, database.Notificati
}

// Lock the notification.
if hasLock, _ := database.Lock(node, lockDuration, n.lockIdentifier); hasLock {
if hasLock, _ := database.Lock(node, lockDuration, whoAmI); hasLock {
log.Infof("found and locked a notification: %s", notification.GetName())
return node, notification
}
}
}

func (n *Notifier) handleTask(node string, notification database.Notification) bool {
func handleTask(notification database.Notification) bool {
// Get notification content.
// TODO(Quentin-M): Split big notifications.
notificationContent, err := notification.GetContent()
Expand All @@ -175,67 +170,27 @@ func (n *Notifier) handleTask(node string, notification database.Notification) b
}

// Create notification.
payload := Notification{
payload := &Notification{
Name: notification.GetName(),
Type: notification.GetType(),
Content: notificationContent,
}

// Marshal notification.
jsonPayload, err := json.Marshal(payload)
if err != nil {
log.Errorf("could not marshal content of notification '%s': %s", notification.GetName(), err)
return false
}

// Send notification.
resp, err := n.client.Post(n.endpoint, "application/json", bytes.NewBuffer(jsonPayload))
defer resp.Body.Close()
if err != nil || (resp.StatusCode != 200 && resp.StatusCode != 201) {
log.Errorf("could not send notification '%s': (%d) %s", notification.GetName(), resp.StatusCode, err)
return false
// TODO(Quentin-M): Backoff / MaxRetries
for notifierName, notifier := range notifiers {
if err := notifier.Send(payload); err != nil {
log.Errorf("could not send notification '%s' to notifier '%s': %s", notification.GetName(), notifierName, err)
return false
}
}

log.Infof("successfully sent notification '%s'\n", notification.GetName())
return true
}

// Healthcheck returns the health of the notifier service.
func (n *Notifier) Healthcheck() health.Status {
func Healthcheck() health.Status {
queueSize, err := database.CountNotificationsToSend()
return health.Status{IsEssential: false, IsHealthy: err == nil, Details: struct{ QueueSize int }{QueueSize: queueSize}}
}

// loadTLSClientConfig initializes a *tls.Config using the given notifier
// configuration.
//
// If no certificates are given, (nil, nil) is returned.
// The CA certificate is optional and falls back to the system default.
func loadTLSClientConfig(cfg *config.NotifierConfig) (*tls.Config, error) {
if cfg.CertFile == "" || cfg.KeyFile == "" {
return nil, nil
}

cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}

var caCertPool *x509.CertPool
if cfg.CAFile != "" {
caCert, err := ioutil.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
}

tlsConfig := &tls.Config{
ServerName: cfg.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

return tlsConfig, nil
}
Loading

0 comments on commit 3ff8bfa

Please sign in to comment.