Skip to content

Commit

Permalink
prometheus: add initial Prometheus support
Browse files Browse the repository at this point in the history
  • Loading branch information
Quentin-M authored and jzelinskie committed Feb 24, 2016
1 parent ad0531a commit baed60e
Show file tree
Hide file tree
Showing 12 changed files with 1,048 additions and 28 deletions.
39 changes: 32 additions & 7 deletions database/pgsql/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pgsql

import (
"database/sql"
"time"

"github.com/coreos/clair/database"
cerrors "github.com/coreos/clair/utils/errors"
Expand All @@ -27,15 +28,19 @@ func (pgSQL *pgSQL) insertFeature(feature database.Feature) (int, error) {
return 0, cerrors.NewBadRequestError("could not find/insert invalid Feature")
}

// Do cache lookup.
if pgSQL.cache != nil {
promCacheQueriesTotal.WithLabelValues("feature").Inc()
promCacheQueriesTotal.WithLabelValues("feature").Inc()
id, found := pgSQL.cache.Get("feature:" + feature.Namespace.Name + ":" + feature.Name)
if found {
promCacheHitsTotal.WithLabelValues("feature").Inc()
promCacheHitsTotal.WithLabelValues("feature").Inc()
return id.(int), nil
}
}

// We do `defer observeQueryTime` here because we don't want to observe cached features.
defer observeQueryTime("insertFeature", "all", time.Now())

// Find or create Namespace.
namespaceID, err := pgSQL.insertNamespace(feature.Namespace)
if err != nil {
Expand All @@ -61,21 +66,29 @@ func (pgSQL *pgSQL) insertFeatureVersion(featureVersion database.FeatureVersion)
return 0, cerrors.NewBadRequestError("could not find/insert invalid FeatureVersion")
}

// Do cache lookup.
if pgSQL.cache != nil {
promCacheQueriesTotal.WithLabelValues("featureversion").Inc()
promCacheQueriesTotal.WithLabelValues("featureversion").Inc()
id, found := pgSQL.cache.Get("featureversion:" + featureVersion.Feature.Namespace.Name + ":" +
featureVersion.Feature.Name + ":" + featureVersion.Version.String())
if found {
promCacheHitsTotal.WithLabelValues("featureversion").Inc()
promCacheHitsTotal.WithLabelValues("featureversion").Inc()
return id.(int), nil
}
}

// We do `defer observeQueryTime` here because we don't want to observe cached featureversions.
defer observeQueryTime("insertFeatureVersion", "all", time.Now())

// Find or create Feature first.
t := time.Now()
featureID, err := pgSQL.insertFeature(featureVersion.Feature)
observeQueryTime("insertFeatureVersion", "insertFeature", t)

if err != nil {
return 0, err
}

featureVersion.Feature.ID = featureID

// Begin transaction.
Expand All @@ -85,23 +98,32 @@ func (pgSQL *pgSQL) insertFeatureVersion(featureVersion database.FeatureVersion)
return 0, handleError("insertFeatureVersion.Begin()", err)
}

// Set transaction as SERIALIZABLE.
// This is how we ensure that the data in Vulnerability_Affects_FeatureVersion is always
// consistent.
// Lock Vulnerability_Affects_FeatureVersion exclusively.
// We want to prevent InsertVulnerability to modify it.
promConcurrentLockVAFV.Inc()
defer promConcurrentLockVAFV.Dec()
t = time.Now()
_, err = tx.Exec(getQuery("l_vulnerability_affects_featureversion"))
observeQueryTime("insertFeatureVersion", "lock", t)

if err != nil {
tx.Rollback()
return 0, handleError("insertFeatureVersion.l_vulnerability_affects_featureversion", err)
}

// Find or create FeatureVersion.
var newOrExisting string

t = time.Now()
err = tx.QueryRow(getQuery("soi_featureversion"), featureID, &featureVersion.Version).
Scan(&newOrExisting, &featureVersion.ID)
observeQueryTime("insertFeatureVersion", "soi_featureversion", t)

if err != nil {
tx.Rollback()
return 0, handleError("soi_featureversion", err)
}

if newOrExisting == "exi" {
// That featureVersion already exists, return its id.
tx.Commit()
Expand All @@ -110,7 +132,10 @@ func (pgSQL *pgSQL) insertFeatureVersion(featureVersion database.FeatureVersion)

// Link the new FeatureVersion with every vulnerabilities that affect it, by inserting in
// Vulnerability_Affects_FeatureVersion.
t = time.Now()
err = linkFeatureVersionToVulnerabilities(tx, featureVersion)
observeQueryTime("insertFeatureVersion", "linkFeatureVersionToVulnerabilities", t)

if err != nil {
tx.Rollback()
return 0, err
Expand Down
5 changes: 5 additions & 0 deletions database/pgsql/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pgsql

import (
"database/sql"
"time"

cerrors "github.com/coreos/clair/utils/errors"
)
Expand All @@ -27,6 +28,8 @@ func (pgSQL *pgSQL) InsertKeyValue(key, value string) (err error) {
return cerrors.NewBadRequestError("could not insert a flag which has an empty name or value")
}

defer observeQueryTime("InsertKeyValue", "all", time.Now())

// Upsert.
//
// Note: UPSERT works only on >= PostgreSQL 9.5 which is not yet supported by AWS RDS.
Expand Down Expand Up @@ -64,6 +67,8 @@ func (pgSQL *pgSQL) InsertKeyValue(key, value string) (err error) {

// GetValue reads a single key / value tuple and returns an empty string if the key doesn't exist.
func (pgSQL *pgSQL) GetKeyValue(key string) (string, error) {
defer observeQueryTime("GetKeyValue", "all", time.Now())

var value string
err := pgSQL.QueryRow(getQuery("s_keyvalue"), key).Scan(&value)

Expand Down
26 changes: 26 additions & 0 deletions database/pgsql/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pgsql

import (
"database/sql"
"time"

"github.com/coreos/clair/database"
"github.com/coreos/clair/utils"
Expand All @@ -24,16 +25,27 @@ import (
)

func (pgSQL *pgSQL) FindLayer(name string, withFeatures, withVulnerabilities bool) (database.Layer, error) {
subquery := "all"
if withFeatures {
subquery += "/features"
} else if withVulnerabilities {
subquery += "/features+vulnerabilities"
}
defer observeQueryTime("FindLayer", subquery, time.Now())

// Find the layer
var layer database.Layer
var parentID zero.Int
var parentName zero.String
var namespaceID zero.Int
var namespaceName sql.NullString

t := time.Now()
err := pgSQL.QueryRow(getQuery("s_layer"), name).
Scan(&layer.ID, &layer.Name, &layer.EngineVersion, &parentID, &parentName, &namespaceID,
&namespaceName)
observeQueryTime("FindLayer", "s_layer", t)

if err != nil {
return layer, handleError("s_layer", err)
}
Expand All @@ -53,15 +65,22 @@ func (pgSQL *pgSQL) FindLayer(name string, withFeatures, withVulnerabilities boo

// Find its features
if withFeatures || withVulnerabilities {
t = time.Now()
featureVersions, err := pgSQL.getLayerFeatureVersions(layer.ID)
observeQueryTime("FindLayer", "getLayerFeatureVersions", t)

if err != nil {
return layer, err
}

layer.Features = featureVersions

if withVulnerabilities {
// Load the vulnerabilities that affect the FeatureVersions.
t = time.Now()
err := pgSQL.loadAffectedBy(layer.Features)
observeQueryTime("FindLayer", "loadAffectedBy", t)

if err != nil {
return layer, err
}
Expand Down Expand Up @@ -183,6 +202,8 @@ func (pgSQL *pgSQL) loadAffectedBy(featureVersions []database.FeatureVersion) er
// been modified.
// TODO(Quentin-M): This behavior should be implemented at the Feature detectors level.
func (pgSQL *pgSQL) InsertLayer(layer database.Layer) error {
tf := time.Now()

// Verify parameters
if layer.Name == "" {
log.Warning("could not insert a layer which has an empty Name")
Expand All @@ -202,6 +223,9 @@ func (pgSQL *pgSQL) InsertLayer(layer database.Layer) error {
layer.ID = existingLayer.ID
}

// We do `defer observeQueryTime` here because we don't want to observe existing layers.
defer observeQueryTime("InsertLayer", "all", tf)

// Get parent ID.
var parentID zero.Int
if layer.Parent != nil {
Expand Down Expand Up @@ -346,6 +370,8 @@ func createNV(features []database.FeatureVersion) (map[string]*database.FeatureV
}

func (pgSQL *pgSQL) DeleteLayer(name string) error {
defer observeQueryTime("DeleteLayer", "all", time.Now())

result, err := pgSQL.Exec(getQuery("r_layer"), name)
if err != nil {
return handleError("r_layer", err)
Expand Down
8 changes: 8 additions & 0 deletions database/pgsql/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (pgSQL *pgSQL) Lock(name string, owner string, duration time.Duration, rene
return false, time.Time{}
}

defer observeQueryTime("Lock", "all", time.Now())

// Prune locks.
pgSQL.pruneLocks()

Expand Down Expand Up @@ -68,6 +70,8 @@ func (pgSQL *pgSQL) Unlock(name, owner string) {
return
}

defer observeQueryTime("Unlock", "all", time.Now())

pgSQL.Exec(getQuery("r_lock"), name, owner)
}

Expand All @@ -79,6 +83,8 @@ func (pgSQL *pgSQL) FindLock(name string) (string, time.Time, error) {
return "", time.Time{}, cerrors.NewBadRequestError("could not find an invalid lock")
}

defer observeQueryTime("FindLock", "all", time.Now())

var owner string
var until time.Time
err := pgSQL.QueryRow(getQuery("f_lock"), name).Scan(&owner, &until)
Expand All @@ -91,6 +97,8 @@ func (pgSQL *pgSQL) FindLock(name string) (string, time.Time, error) {

// pruneLocks removes every expired locks from the database
func (pgSQL *pgSQL) pruneLocks() {
defer observeQueryTime("pruneLocks", "all", time.Now())

if _, err := pgSQL.Exec(getQuery("r_lock_expired")); err != nil {
handleError("r_lock_expired", err)
}
Expand Down
5 changes: 5 additions & 0 deletions database/pgsql/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package pgsql

import (
"time"

"github.com/coreos/clair/database"
cerrors "github.com/coreos/clair/utils/errors"
)
Expand All @@ -32,6 +34,9 @@ func (pgSQL *pgSQL) insertNamespace(namespace database.Namespace) (int, error) {
}
}

// We do `defer observeQueryTime` here because we don't want to observe cached namespaces.
defer observeQueryTime("insertNamespace", "all", time.Now())

var id int
err := pgSQL.QueryRow(getQuery("soi_namespace"), namespace.Name).Scan(&id)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions database/pgsql/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// do it in tx so we won't insert/update a vuln without notification and vice-versa.
// name and created doesn't matter.
func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification database.VulnerabilityNotification) error {
defer observeQueryTime("insertNotification", "all", time.Now())

// Marshal old and new Vulnerabilities.
oldVulnerability, err := json.Marshal(notification.OldVulnerability)
if err != nil {
Expand All @@ -38,6 +40,8 @@ func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification database.Vulnera
// Get one available notification name (!locked && !deleted && (!notified || notified_but_timed-out)).
// Does not fill new/old vuln.
func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (database.VulnerabilityNotification, error) {
defer observeQueryTime("GetAvailableNotification", "all", time.Now())

before := time.Now().Add(-renotifyInterval)

var notification database.VulnerabilityNotification
Expand All @@ -51,6 +55,8 @@ func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (da
}

func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (database.VulnerabilityNotification, error) {
defer observeQueryTime("GetNotification", "all", time.Now())

// Get Notification.
var notification database.VulnerabilityNotification
var oldVulnerability []byte
Expand All @@ -74,18 +80,23 @@ func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (database.Vuln
}

// TODO(Quentin-M): Fill LayersIntroducingVulnerability.
// And time it.

return notification, nil
}

func (pgSQL *pgSQL) SetNotificationNotified(name string) error {
defer observeQueryTime("SetNotificationNotified", "all", time.Now())

if _, err := pgSQL.Exec(getQuery("u_notification_notified"), name); err != nil {
return handleError("u_notification_notified", err)
}
return nil
}

func (pgSQL *pgSQL) DeleteNotification(name string) error {
defer observeQueryTime("DeleteNotification", "all", time.Now())

result, err := pgSQL.Exec(getQuery("r_notification"), name)
if err != nil {
return handleError("r_notification", err)
Expand Down
25 changes: 22 additions & 3 deletions database/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"path"
"runtime"
"strings"
"time"

"bitbucket.org/liamstask/goose/lib/goose"
"github.com/coreos/clair/config"
"github.com/coreos/clair/database"
"github.com/coreos/clair/utils"
cerrors "github.com/coreos/clair/utils/errors"
"github.com/coreos/pkg/capnslog"
"github.com/hashicorp/golang-lru"
Expand All @@ -39,23 +41,36 @@ var (

promErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "clair_pgsql_errors_total",
Help: "Number of errors that PostgreSQL requests generates.",
Help: "Number of errors that PostgreSQL requests generated.",
}, []string{"request"})

promCacheHitsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "clair_pgsql_cache_hits_total",
Help: "Number of cache hits that the PostgreSQL backend does.",
Help: "Number of cache hits that the PostgreSQL backend did.",
}, []string{"object"})

promCacheQueriesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "clair_pgsql_cache_queries_total",
Help: "Number of cache queries that the PostgreSQL backend does.",
Help: "Number of cache queries that the PostgreSQL backend did.",
}, []string{"object"})

promQueryDurationMilliseconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "clair_pgsql_query_duration_milliseconds",
Help: "Time it takes to execute the database query.",
}, []string{"query", "subquery"})

promConcurrentLockVAFV = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "clair_pgsql_concurrent_lock_vafv_total",
Help: "Number of transactions trying to hold the exclusive Vulnerability_Affects_FeatureVersion lock.",
})
)

func init() {
prometheus.MustRegister(promErrorsTotal)
prometheus.MustRegister(promCacheHitsTotal)
prometheus.MustRegister(promCacheQueriesTotal)
prometheus.MustRegister(promQueryDurationMilliseconds)
prometheus.MustRegister(promConcurrentLockVAFV)
}

type pgSQL struct {
Expand Down Expand Up @@ -240,3 +255,7 @@ func isErrUniqueViolation(err error) bool {
pqErr, ok := err.(*pq.Error)
return ok && pqErr.Code == "23505"
}

func observeQueryTime(query, subquery string, start time.Time) {
utils.PrometheusObserveTimeMilliseconds(promQueryDurationMilliseconds.WithLabelValues(query, subquery), start)
}

0 comments on commit baed60e

Please sign in to comment.