Skip to content

Commit

Permalink
notifier/database: refactor notification system and add initial Prome…
Browse files Browse the repository at this point in the history
…theus support
  • Loading branch information
Quentin-M authored and jzelinskie committed Feb 24, 2016
1 parent b8b7be3 commit ad0531a
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 123 deletions.
5 changes: 2 additions & 3 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ type Datastore interface {
DeleteVulnerability(namespaceName, name string) error

// Notifications
CountAvailableNotifications() (int, error)
GetAvailableNotification(renotifyInterval time.Duration) (string, error)
GetNotification(name string, limit, page int) (string, interface{}, error)
GetAvailableNotification(renotifyInterval time.Duration) (VulnerabilityNotification, error) // Does not fill old/new Vulnerabilities.
GetNotification(name string, limit, page int) (VulnerabilityNotification, error)
SetNotificationNotified(name string) error
DeleteNotification(name string) error

Expand Down
37 changes: 20 additions & 17 deletions database/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package database

import "github.com/coreos/clair/utils/types"
import (
"time"

"github.com/coreos/clair/utils/types"
)

// ID is only meant to be used by database implementations and should never be used for anything else.
type Model struct {
Expand All @@ -25,10 +29,10 @@ type Layer struct {
Model

Name string
EngineVersion int `json:",omitempty"`
Parent *Layer `json:",omitempty"`
Namespace *Namespace `json:",omitempty"`
Features []FeatureVersion `json:",omitempty"`
EngineVersion int
Parent *Layer
Namespace *Namespace
Features []FeatureVersion
}

type Namespace struct {
Expand All @@ -42,15 +46,14 @@ type Feature struct {

Name string
Namespace Namespace
// FixedBy map[types.Version]Vulnerability // <<-- WRONG.
}

type FeatureVersion struct {
Model

Feature Feature
Version types.Version
AffectedBy []Vulnerability `json:",omitempty"`
AffectedBy []Vulnerability
}

type Vulnerability struct {
Expand All @@ -62,21 +65,21 @@ type Vulnerability struct {
Link string
Severity types.Priority

FixedIn []FeatureVersion `json:",omitempty"`
//Affects []FeatureVersion
FixedIn []FeatureVersion
LayersIntroducingVulnerability []Layer

// For output purposes. Only make sense when the vulnerability
// is already about a specific Feature/FeatureVersion.
FixedBy types.Version `json:",omitempty"`
}

type NewVulnerabilityNotification struct {
VulnerabilityID int
}
type VulnerabilityNotification struct {
Name string

type NewVulnerabilityNotificationPage struct {
Vulnerability Vulnerability
Layers []Layer
}
Created time.Time
Notified time.Time
Deleted time.Time

// ...
OldVulnerability *Vulnerability
NewVulnerability Vulnerability
}
4 changes: 4 additions & 0 deletions database/pgsql/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func (pgSQL *pgSQL) insertFeature(feature database.Feature) (int, error) {
}

if pgSQL.cache != nil {
promCacheQueriesTotal.WithLabelValues("feature").Inc()
id, found := pgSQL.cache.Get("feature:" + feature.Namespace.Name + ":" + feature.Name)
if found {
promCacheHitsTotal.WithLabelValues("feature").Inc()
return id.(int), nil
}
}
Expand Down Expand Up @@ -60,9 +62,11 @@ func (pgSQL *pgSQL) insertFeatureVersion(featureVersion database.FeatureVersion)
}

if pgSQL.cache != nil {
promCacheQueriesTotal.WithLabelValues("featureversion").Inc()
id, found := pgSQL.cache.Get("featureversion:" + featureVersion.Feature.Namespace.Name + ":" +
featureVersion.Feature.Name + ":" + featureVersion.Version.String())
if found {
promCacheHitsTotal.WithLabelValues("featureversion").Inc()
return id.(int), nil
}
}
Expand Down
15 changes: 8 additions & 7 deletions database/pgsql/migrations/20151222113213_Initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,18 @@ CREATE INDEX ON Lock (owner);


-- -----------------------------------------------------
-- Table Notification
-- Table VulnerabilityNotification
-- -----------------------------------------------------
CREATE TABLE IF NOT EXISTS Notification (
CREATE TABLE IF NOT EXISTS Vulnerability_Notification (
id SERIAL PRIMARY KEY,
name VARCHAR(64) NOT NULL UNIQUE,
kind VARCHAR(64) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE,
notified_at TIMESTAMP WITH TIME ZONE NULL,
deleted_at TIMESTAMP WITH TIME ZONE NULL,
data TEXT);
old_vulnerability TEXT,
new_vulnerability TEXT);

CREATE INDEX ON Notification (notified_at, deleted_at);
CREATE INDEX ON Vulnerability_Notification (notified_at);

-- +goose Down

Expand All @@ -165,7 +166,7 @@ DROP TABLE IF EXISTS Namespace,
Vulnerability,
Vulnerability_FixedIn_Feature,
Vulnerability_Affects_FeatureVersion,
Vulnerability_Notification,
KeyValue,
Lock,
Notification
Lock
CASCADE;
2 changes: 2 additions & 0 deletions database/pgsql/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func (pgSQL *pgSQL) insertNamespace(namespace database.Namespace) (int, error) {
}

if pgSQL.cache != nil {
promCacheQueriesTotal.WithLabelValues("namespace").Inc()
if id, found := pgSQL.cache.Get("namespace:" + namespace.Name); found {
promCacheHitsTotal.WithLabelValues("namespace").Inc()
return id.(int), nil
}
}
Expand Down
79 changes: 40 additions & 39 deletions database/pgsql/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package pgsql
import (
"database/sql"
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/coreos/clair/database"
Expand All @@ -13,15 +11,22 @@ import (
)

// do it in tx so we won't insert/update a vuln without notification and vice-versa.
func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) error {
kind := reflect.Indirect(reflect.ValueOf(notification)).Type().String()
data, err := json.Marshal(notification)
// name and created doesn't matter.
func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification database.VulnerabilityNotification) error {
// Marshal old and new Vulnerabilities.
oldVulnerability, err := json.Marshal(notification.OldVulnerability)
if err != nil {
tx.Rollback()
return cerrors.NewBadRequestError("could not marshal notification in insertNotification")
return cerrors.NewBadRequestError("could not marshal old Vulnerability in insertNotification")
}
newVulnerability, err := json.Marshal(notification.NewVulnerability)
if err != nil {
tx.Rollback()
return cerrors.NewBadRequestError("could not marshal new Vulnerability in insertNotification")
}

_, err = tx.Exec(getQuery("i_notification"), uuid.New(), kind, data)
// Insert Notification.
_, err = tx.Exec(getQuery("i_notification"), uuid.New(), oldVulnerability, newVulnerability)
if err != nil {
tx.Rollback()
return handleError("i_notification", err)
Expand All @@ -30,51 +35,47 @@ func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) err
return nil
}

func (pgSQL *pgSQL) CountAvailableNotifications() (int, error) {
var count int
err := pgSQL.QueryRow(getQuery("c_notification_available")).Scan(&count)
// Get one available notification name (!locked && !deleted && (!notified || notified_but_timed-out)).
// Does not fill new/old vuln.
func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (database.VulnerabilityNotification, error) {
before := time.Now().Add(-renotifyInterval)

var notification database.VulnerabilityNotification
err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(&notification.Name,
&notification.Created, &notification.Notified, &notification.Deleted)
if err != nil {
return 0, handleError("c_notification_available", err)
return notification, handleError("s_notification_available", err)
}

return count, nil
return notification, nil
}

// Get one available notification (!locked && !deleted && (!notified || notified_but_timed-out)).
func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (string, error) {
before := time.Now().Add(-renotifyInterval)
func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (database.VulnerabilityNotification, error) {
// Get Notification.
var notification database.VulnerabilityNotification
var oldVulnerability []byte
var newVulnerability []byte

var name string
err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(&name)
err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(&notification.Name,
&notification.Created, &notification.Notified, &notification.Deleted, &newVulnerability,
&oldVulnerability)
if err != nil {
return "", handleError("s_notification_available", err)
return notification, handleError("s_notification", err)
}

return name, nil
}

func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (string, interface{}, error) {
var kind, data string
err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(&kind, &data)
// Unmarshal old and new Vulnerabilities.
err = json.Unmarshal(oldVulnerability, notification.OldVulnerability)
if err != nil {
return "", struct{}{}, handleError("s_notification", err)
return notification, cerrors.NewBadRequestError("could not unmarshal old Vulnerability in GetNotification")
}
err = json.Unmarshal(newVulnerability, &notification.NewVulnerability)
if err != nil {
return notification, cerrors.NewBadRequestError("could not unmarshal new Vulnerability in GetNotification")
}

return constructNotification(kind, data, limit, page)
}

func constructNotification(kind, data string, limit, page int) (string, interface{}, error) {
switch kind {
case "NotificationNewVulnerability":
var notificationPage database.NewVulnerabilityNotificationPage

// TODO: Request database to fill NewVulnerabilityNotificationPage properly.
// TODO(Quentin-M): Fill LayersIntroducingVulnerability.

return kind, notificationPage, nil
default:
msg := fmt.Sprintf("could not construct notification, '%s' is an unknown notification type", kind)
return "", struct{}{}, cerrors.NewBadRequestError(msg)
}
return notification, nil
}

func (pgSQL *pgSQL) SetNotificationNotified(name string) error {
Expand Down
39 changes: 31 additions & 8 deletions database/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,32 @@ import (
"github.com/hashicorp/golang-lru"
"github.com/lib/pq"
"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
)

var log = capnslog.NewPackageLogger("github.com/coreos/clair", "pgsql")
var (
log = capnslog.NewPackageLogger("github.com/coreos/clair", "pgsql")

promErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "clair_pgsql_errors_total",
Help: "Number of errors that PostgreSQL requests generates.",
}, []string{"request"})

promCacheHitsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "clair_pgsql_cache_hits_total",
Help: "Number of cache hits that the PostgreSQL backend does.",
}, []string{"object"})

promCacheQueriesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "clair_pgsql_cache_queries_total",
Help: "Number of cache queries that the PostgreSQL backend does.",
}, []string{"object"})
)

func init() {
prometheus.MustRegister(promCacheHitsTotal)
prometheus.MustRegister(promCacheQueriesTotal)
}

type pgSQL struct {
*sql.DB
Expand Down Expand Up @@ -198,17 +221,17 @@ func OpenForTest(name string, withTestData bool) (*pgSQLTest, error) {
// handleError logs an error with an extra description and masks the error if it's an SQL one.
// This ensures we never return plain SQL errors and leak anything.
func handleError(desc string, err error) error {
if _, ok := err.(*pq.Error); ok {
log.Errorf("%s: %v", desc, err)
return database.ErrBackendException
} else if err == sql.ErrNoRows {
if err == sql.ErrNoRows {
return cerrors.ErrNotFound
} else if err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") {
log.Errorf("%s: %v", desc, err)
return database.ErrBackendException
}

log.Errorf("%s: %v", desc, err)
promErrorsTotal.WithLabelValues(desc).Inc()

if _, o := err.(*pq.Error); o || err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") {
return database.ErrBackendException
}

return err
}

Expand Down
23 changes: 10 additions & 13 deletions database/pgsql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,24 @@ func init() {
AND name = $2`

// notification.go
queries["i_notification"] = `INSERT INTO Notification(name, kind, data) VALUES($1, $2, $3)`
queries["i_notification"] = `
INSERT INTO Vulnerability_Notification(name, created_at, old_vulnerability, new_vulnerability)
VALUES($1, CURRENT_TIMESTAMP, $2, $3)`

queries["r_notification"] = `UPDATE Notification SET deleted_at = CURRENT_TIMESTAMP`

queries["c_notification_available"] = `
SELECT COUNT(name) FROM Notification
FROM Notification
WHERE notified_at = NULL
AND name NOT IN (SELECT name FROM Lock)
ORDER BY Random()
LIMIT 1`
queries["r_notification"] = `UPDATE Vulnerability_Notification SET deleted_at = CURRENT_TIMESTAMP`

queries["s_notification_available"] = `
SELECT name FROM Notification
FROM Notification
SELECT name, created_at, notified_at, deleted_at
FROM Vulnerability_Notification
WHERE notified_at = NULL OR notified_at < $1
AND name NOT IN (SELECT name FROM Lock)
ORDER BY Random()
LIMIT 1`

queries["s_notification"] = `SELECT data FROM Notification WHERE name = $1`
queries["s_notification"] = `
SELECT name, created_at, notified_at, deleted_at, old_vulnerability, new_vulnerability
FROM Vulnerability_Notification
WHERE name = $1`

// complex_test.go
queries["s_complextest_featureversion_affects"] = `
Expand Down
2 changes: 1 addition & 1 deletion database/pgsql/vulnerability.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (pgSQL *pgSQL) insertVulnerability(vulnerability database.Vulnerability) er
}

// Create NewVulnerabilityNotification.
notification := database.NewVulnerabilityNotification{VulnerabilityID: vulnerability.ID}
notification := database.VulnerabilityNotification{NewVulnerability: vulnerability}
if err := pgSQL.insertNotification(tx, notification); err != nil {
return err
}
Expand Down
Loading

0 comments on commit ad0531a

Please sign in to comment.