Skip to content

Commit

Permalink
postgres: refactor receipt methods
Browse files Browse the repository at this point in the history
Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Feb 22, 2022
1 parent be87d38 commit 4650b69
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 216 deletions.
93 changes: 0 additions & 93 deletions notifier/postgres/putreceipt.go

This file was deleted.

164 changes: 136 additions & 28 deletions notifier/postgres/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package postgres
import (
"context"
"errors"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v4"
Expand All @@ -21,46 +20,155 @@ var (
Namespace: "clair",
Subsystem: "notifier",
Name: "receipt_total",
Help: "Total number of database queries issued in the receipt method.",
Help: "Total number of database queries issued in the receipt method",
},
[]string{"query"},
[]string{"query", "error"},
)
receiptDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "clair",
Subsystem: "notifier",
Name: "receipt_duration_seconds",
Help: "The duration of all queries issued in the receipt method",
Help: "Duration of all queries issued in the receipt method",
},
[]string{"query"},
[]string{"query", "error"},
)
receiptByUOIDCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "clair",
Subsystem: "notifier",
Name: "receiptbyuoid_total",
Help: "Total number of database queries issued in the receiptByUOID method",
},
[]string{"query", "error"},
)
receiptByUOIDDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "clair",
Subsystem: "notifier",
Name: "receiptbyuoid_duration_seconds",
Help: "Duration of all queries issued in the receiptByUOID method",
},
[]string{"query", "error"},
)
putReceiptCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "clair",
Subsystem: "notifier",
Name: "putreceipt_total",
Help: "Total number of database queries issued in the putReceipt method",
},
[]string{"query", "error"},
)
putReceiptAffected = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "clair",
Subsystem: "notifier",
Name: "putreceipt_affected_total",
Help: "Total number of rows affected in the putReceipt method",
},
[]string{"query", "error"},
)
putReceiptDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "clair",
Subsystem: "notifier",
Name: "putreceipt_duration_seconds",
Help: "Duration of all queries issued in the putReceipt method",
},
[]string{"query", "error"},
)
)

// receipt returns a receipt for a given notification id
//
// if the receipt does not exist a ErrNoReceipt is returned
func receipt(ctx context.Context, pool *pgxpool.Pool, id uuid.UUID) (notifier.Receipt, error) {
const (
query = `SELECT uo_id, notification_id, status, ts FROM receipt WHERE notification_id = $1`
)
// Receipt returns the Receipt for a given notification ID.
func (s *Store) Receipt(ctx context.Context, id uuid.UUID) (notifier.Receipt, error) {
const query = `SELECT uo_id, notification_id, status, ts FROM receipt WHERE notification_id = $1::uuid;`
var r notifier.Receipt
f := getReceipt(ctx, &r, query, `query`, id, statusMetrics{
counter: receiptCounter,
dur: receiptDuration,
})
return r, s.pool.AcquireFunc(ctx, f)
}

// ReceiptByUOID returns the Receipt for a given UpdateOperation ID.
func (s *Store) ReceiptByUOID(ctx context.Context, id uuid.UUID) (notifier.Receipt, error) {
const query = `SELECT uo_id, notification_id, status, ts FROM receipt WHERE uo_id = $1::uuid;`
var r notifier.Receipt
start := time.Now()
row := pool.QueryRow(ctx, query, id.String())
err := row.Scan(
&r.UOID,
&r.NotificationID,
&r.Status,
&r.TS,
)
switch {
case errors.Is(err, pgx.ErrNoRows):
return r, clairerror.ErrNoReceipt{id}
case err != nil:
return r, clairerror.ErrReceipt{id, err}
f := getReceipt(ctx, &r, query, `query`, id, statusMetrics{
counter: receiptByUOIDCounter,
dur: receiptByUOIDDuration,
})
return r, s.pool.AcquireFunc(ctx, f)
}

func getReceipt(ctx context.Context, r *notifier.Receipt, query, name string, id uuid.UUID, m statusMetrics) func(*pgxpool.Conn) error {
return func(c *pgxpool.Conn) error {
var err error
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
m.dur.WithLabelValues(`query`, errLabel(err)).Observe(v)
}))
defer timer.ObserveDuration()
err = c.QueryRow(ctx, query, id).Scan(
&r.UOID,
&r.NotificationID,
&r.Status,
&r.TS,
)
receiptCounter.WithLabelValues("query", errLabel(err)).Add(1)
switch {
case errors.Is(err, pgx.ErrNoRows):
return &clairerror.ErrNoReceipt{
NotificationID: id,
}
case err != nil:
return &clairerror.ErrReceipt{
NotificationID: id,
E: err,
}
}
return nil
}
receiptCounter.WithLabelValues("query").Add(1)
receiptDuration.WithLabelValues("query").Observe(time.Since(start).Seconds())
}

return r, nil
func (s *Store) PutReceipt(ctx context.Context, updater string, r notifier.Receipt) error {
const (
insertNotification = `INSERT INTO notification (id) VALUES ($1);`
insertReceipt = `INSERT INTO receipt (notification_id, uo_id, status, ts) VALUES ($1, $2, $3, CURRENT_TIMESTAMP);`
insertUpdateOperation = `INSERT INTO notifier_update_operation (updater, uo_id, ts) VALUES ($1, $2, CURRENT_TIMESTAMP);`
)
txOpt := pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
}
metrics := statusMetrics{
dur: putReceiptDuration,
counter: putReceiptCounter,
affected: putReceiptAffected,
}
err := s.pool.BeginTxFunc(ctx, txOpt, func(tx pgx.Tx) error {
if err := txExec(ctx, metrics, tx,
`insertNotification`,
insertNotification,
[]interface{}{r.NotificationID}); err != nil {
return err
}
if err := txExec(ctx, metrics, tx,
`insertUpdateOperation`,
insertUpdateOperation,
[]interface{}{updater, r.UOID}); err != nil {
return err
}
if err := txExec(ctx, metrics, tx,
`insertReceipt`,
insertReceipt,
[]interface{}{r.NotificationID, r.UOID, r.Status}); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
66 changes: 0 additions & 66 deletions notifier/postgres/receiptbyuoid.go

This file was deleted.

29 changes: 0 additions & 29 deletions notifier/postgres/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func (s *Store) PutNotifications(ctx context.Context, opts notifier.PutOpts) err
return putNotifications(ctx, s.pool, opts)
}

func (s *Store) PutReceipt(ctx context.Context, updater string, r notifier.Receipt) error {
return putReceipt(ctx, s.pool, updater, r)
}

// DeleteNotifications garbage collects all notifications associated
// with a notification id.
//
Expand All @@ -56,31 +52,6 @@ func (s *Store) DeleteNotifications(ctx context.Context, id uuid.UUID) error {
return deleteNotifications(ctx, s.pool, id)
}

// Receipt returns the Receipt for a given notification id
func (s *Store) Receipt(ctx context.Context, id uuid.UUID) (notifier.Receipt, error) {
return receipt(ctx, s.pool, id)
}

// ReceiptByUOID returns the Receipt for a given notification UOID
func (s *Store) ReceiptByUOID(ctx context.Context, id uuid.UUID) (notifier.Receipt, error) {
return receiptByUOID(ctx, s.pool, id)
}

// Created returns a slice of notification ids in created status
func (s *Store) Created(ctx context.Context) ([]uuid.UUID, error) {
return created(ctx, s.pool)
}

// Failed returns a slice of notification ids in failed status
func (s *Store) Failed(ctx context.Context) ([]uuid.UUID, error) {
return failed(ctx, s.pool)
}

// Deleted returns a slice of notification ids in deleted status
func (s *Store) Deleted(ctx context.Context) ([]uuid.UUID, error) {
return deleted(ctx, s.pool)
}

func errLabel(e error) string {
if e == nil {
return `false`
Expand Down

0 comments on commit 4650b69

Please sign in to comment.