Skip to content

Commit

Permalink
notifier/database: draft new notification system
Browse files Browse the repository at this point in the history
  • Loading branch information
Quentin-M authored and jzelinskie committed Feb 24, 2016
1 parent 5759af5 commit c60d005
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 59 deletions.
5 changes: 3 additions & 2 deletions clair.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/coreos/clair/api"
"github.com/coreos/clair/config"
"github.com/coreos/clair/database/pgsql"
"github.com/coreos/clair/notifier"
"github.com/coreos/clair/updater"
"github.com/coreos/clair/utils"
"github.com/coreos/pkg/capnslog"
Expand All @@ -46,8 +47,8 @@ func Boot(config *config.Config) {
defer db.Close()

// Start notifier
// st.Begin()
// go notifier.Run(config.Notifier, st)
st.Begin()
go notifier.Run(config.Notifier, db, st)

// Start API
st.Begin()
Expand Down
5 changes: 4 additions & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ updater:
# Use 0 to disable the updater entirely.
interval: 2h
notifier:
# How many attempts will the notifier do when a notifier backend fails.
# Number of attempts that the notifier does when a notification backend fails
# before it gives up temporarly and try to d
attempts: 3
# After a notification has been sent
renotifyInterval: 2h
# Configuration for HTTP notifier
http:
# Endpoint that will receive notifications with POST requests.
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type UpdaterConfig struct {
// NotifierConfig is the configuration for the Notifier service and its registered notifiers.
type NotifierConfig struct {
Attempts int
RenotifyInterval time.Duration
Params map[string]interface{} `yaml:",inline"`
}

Expand All @@ -71,6 +72,7 @@ var DefaultConfig = Config{
},
Notifier: &NotifierConfig{
Attempts: 5,
RenotifyInterval: 2 * time.Hour,
},
}

Expand Down
9 changes: 5 additions & 4 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ type Datastore interface {
FindVulnerability(namespaceName, name string) (Vulnerability, error)

// Notifications
// InsertNotifications([]Notification) error
// FindNotificationToSend() (Notification, error)
// CountNotificationsToSend() (int, error)
// MarkNotificationAsSent(id string)
CountAvailableNotifications() (int, error)
GetAvailableNotification(renotifyInterval time.Duration) (string, error)
GetNotification(name string, limit, page int) (string, interface{}, error)
SetNotificationNotified(name string) error
DeleteNotification(name string) error

// Key/Value
InsertKeyValue(key, value string) error
Expand Down
11 changes: 11 additions & 0 deletions database/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,14 @@ type Vulnerability struct {
// is already about a specific Feature/FeatureVersion.
FixedBy types.Version `json:",omitempty"`
}

type NewVulnerabilityNotification struct {
VulnerabilityID int
}

type NewVulnerabilityNotificationPage struct {
Vulnerability Vulnerability
Layers []Layer
}

// ...
17 changes: 16 additions & 1 deletion database/pgsql/migrations/20151222113213_Initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ CREATE TABLE IF NOT EXISTS Lock (

CREATE INDEX ON Lock (owner);


-- -----------------------------------------------------
-- Table Notification
-- -----------------------------------------------------
CREATE TABLE IF NOT EXISTS Notification (
id SERIAL PRIMARY KEY,
name VARCHAR(64) NOT NULL UNIQUE,
kind VARCHAR(64) NOT NULL,
notified_at TIMESTAMP WITH TIME ZONE NULL,
deleted_at TIMESTAMP WITH TIME ZONE NULL,
data TEXT);

CREATE INDEX ON Notification (notified_at, deleted_at);

-- +goose Down

DROP TABLE IF EXISTS Namespace,
Expand All @@ -152,5 +166,6 @@ DROP TABLE IF EXISTS Namespace,
Vulnerability_FixedIn_Feature,
Vulnerability_Affects_FeatureVersion,
KeyValue,
Lock
Lock,
Notification
CASCADE;
103 changes: 103 additions & 0 deletions database/pgsql/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package pgsql

import (
"database/sql"
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/coreos/clair/database"
cerrors "github.com/coreos/clair/utils/errors"
"github.com/pborman/uuid"
)

// do it in tx so we won't insert/update a vuln without notification and vice-versa.
func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) error {
kind := reflect.Indirect(reflect.ValueOf(notification)).Type()
data, err := json.Marshal(notification)
if err != nil {
tx.Rollback()
return cerrors.NewBadRequestError("could not marshal notification in insertNotification")
}

_, err = tx.Exec(getQuery("i_notification"), uuid.New(), kind, data)
if err != nil {
tx.Rollback()
return handleError("i_notification", err)
}

return nil
}

func (pgSQL *pgSQL) CountAvailableNotifications() (int, error) {
var count int
err := pgSQL.QueryRow(getQuery("c_notification_available")).Scan(&count)
if err != nil {
return 0, handleError("c_notification_available", err)
}

return count, nil
}

// Get one available notification (!locked && !deleted && (!notified || notified_but_timed-out)).
func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (string, error) {
before := time.Now().Add(-renotifyInterval)

var name string
err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(&name)
if err != nil {
return "", handleError("s_notification_available", err)
}

return name, nil
}

func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (string, interface{}, error) {
var kind, data string
err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(&kind, &data)
if err != nil {
return "", struct{}{}, handleError("s_notification", err)
}

return constructNotification(kind, data, limit, page)
}

func constructNotification(kind, data string, limit, page int) (string, interface{}, error) {
switch kind {
case "NotificationNewVulnerability":
var notificationPage database.NewVulnerabilityNotificationPage

// TODO: Request database to fill NewVulnerabilityNotificationPage properly.

return kind, notificationPage, nil
default:
msg := fmt.Sprintf("could not construct notification, '%s' is an unknown notification type", kind)
return "", struct{}{}, cerrors.NewBadRequestError(msg)
}
}

func (pgSQL *pgSQL) SetNotificationNotified(name string) error {
if _, err := pgSQL.Exec(getQuery("u_notification_notified"), name); err != nil {
return handleError("u_notification_notified", err)
}
return nil
}

func (pgSQL *pgSQL) DeleteNotification(name string) error {
result, err := pgSQL.Exec(getQuery("r_notification"), name)
if err != nil {
return handleError("r_notification", err)
}

affected, err := result.RowsAffected()
if err != nil {
return handleError("r_notification.RowsAffected()", err)
}

if affected <= 0 {
return cerrors.ErrNotFound
}

return nil
}
23 changes: 23 additions & 0 deletions database/pgsql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,29 @@ func init() {
queries["f_featureversion_by_feature"] = `
SELECT id, version FROM FeatureVersion WHERE feature_id = $1`

// notification.go
queries["i_notification"] = `INSERT INTO Notification(name, kind, data) VALUES($1, $2, $3)`

queries["r_notification"] = `UPDATE Notification SET deleted_at = CURRENT_TIMESTAMP`

queries["c_notification_available"] = `
SELECT COUNT(name) FROM Notification
FROM Notification
WHERE notified_at = NULL
AND name NOT IN (SELECT name FROM Lock)
ORDER BY Random()
LIMIT 1`

queries["s_notification_available"] = `
SELECT name FROM Notification
FROM Notification
WHERE notified_at = NULL OR notified_at < $1
AND name NOT IN (SELECT name FROM Lock)
ORDER BY Random()
LIMIT 1`

queries["s_notification"] = `SELECT data FROM Notification WHERE name = $1`

// complex_test.go
queries["s_complextest_featureversion_affects"] = `
SELECT v.name
Expand Down
6 changes: 6 additions & 0 deletions database/pgsql/vulnerability.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ func (pgSQL *pgSQL) insertVulnerability(vulnerability database.Vulnerability) er
tx.Rollback()
return handleError("i_vulnerability", err)
}

// Create NewVulnerabilityNotification.
notification := database.NewVulnerabilityNotification{VulnerabilityID: vulnerability.ID}
if err := pgSQL.insertNotification(tx, notification); err != nil {
return err
}
} else {
// Update vulnerability
if vulnerability.Description != existingVulnerability.Description ||
Expand Down

0 comments on commit c60d005

Please sign in to comment.