Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:
db:
image: postgres:17@sha256:8d3be35b184e70d81e54cbcbd3df3c0b47f37d06482c0dd1c140db5dbcc6a808
Expand Down
274 changes: 141 additions & 133 deletions go.mod

Large diffs are not rendered by default.

702 changes: 355 additions & 347 deletions go.sum

Large diffs are not rendered by default.

276 changes: 276 additions & 0 deletions pkg/database/label.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package database

import (
"context"
"fmt"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/updatecli/udash/pkg/model"

"github.com/stephenafamo/bob/dialect/psql"
"github.com/stephenafamo/bob/dialect/psql/im"
"github.com/stephenafamo/bob/dialect/psql/sm"
)

// InsertLabel creates a new label and inserts it into the database.
//
// It returns the ID of the newly created label.
func InsertLabel(ctx context.Context, key, value string) (string, error) {
query := psql.Insert(
im.Into("labels", "key", "value"),
im.Values(psql.Arg(key), psql.Arg(value)),
im.Returning("id"),
)

queryString, args, err := query.Build(ctx)

if err != nil {
logrus.Errorf("building query failed: %s\n\t%s", queryString, err)
return "", err
}

var id uuid.UUID
err = DB.QueryRow(ctx, queryString, args...).Scan(
&id,
)

if err != nil {
logrus.Errorf("query failed: %q\n\t%s", queryString, err)
return "", err
}

return id.String(), nil
}

// GetLabelKeyOnlyRecords returns a list of labels from the labels database table.
func GetLabelKeyOnlyRecords(ctx context.Context, startTime, endTime string, limit, page int) ([]string, int, error) {

query := psql.Select(
sm.Columns("id", "key", "created_at", "updated_at", "last_pipeline_report_at"),
sm.From("labels"),
sm.OrderBy("key"),
sm.Distinct("key"),
)

if err := applyRangeFilter(
"last_pipeline_report_at",
dateRangeFilterParams{
Query: &query,
DateRangeDays: 0,
StartTime: startTime,
EndTime: endTime,
}); err != nil {
return nil, 0, fmt.Errorf("applying last_pipeline_report_at range filter: %w", err)
}

totalCount := 0
totalQuery := psql.Select(sm.From(query), sm.Columns("count(*)"))
totalQueryString, totalArgs, err := totalQuery.Build(ctx)
if err != nil {
logrus.Errorf("building total count query failed: %s\n\t%s", totalQueryString, err)
return nil, 0, err
}

if err = DB.QueryRow(ctx, totalQueryString, totalArgs...).Scan(
&totalCount,
); err != nil {
logrus.Errorf("parsing total count result: %s", err)
}

if limit < totalCount && limit > 0 {
query.Apply(
sm.Limit(limit),
sm.Offset((page-1)*limit),
)
}

queryString, args, err := query.Build(ctx)

if err != nil {
logrus.Errorf("building query failed: %s\n\t%s", queryString, err)
return nil, 0, err
}

rows, err := DB.Query(ctx, queryString, args...)
if err != nil {
logrus.Errorf("query failed: %s\n\t%s", queryString, err)
return nil, 0, err
}
defer rows.Close()

results := []string{}

for rows.Next() {
r := model.Label{}

err = rows.Scan(&r.ID, &r.Key, &r.CreatedAt, &r.UpdatedAt, &r.LastPipelineReportAt)
if err != nil {
logrus.Errorf("scanning label row failed: %s", err)
continue
}

results = append(results, r.Key)
}

if err := rows.Err(); err != nil {
logrus.Errorf("iterating label rows failed: %s", err)
return nil, 0, err
}

return results, totalCount, nil
}

// GetLabelRecords returns a list of labels from the labels database table.
func GetLabelRecords(ctx context.Context, id, key, value, startTime, endTime string, limit, page int) ([]model.Label, int, error) {

query := psql.Select(
sm.Columns("id", "key", "value", "created_at", "updated_at", "last_pipeline_report_at"),
sm.From("labels"),
sm.OrderBy("key"),
)

if key != "" {
query.Apply(
sm.Where(psql.Quote("key").EQ(psql.Arg(key))),
)
}

if value != "" {
query.Apply(
sm.Where(psql.Quote("value").EQ(psql.Arg(value))),
)
}

if id != "" {
query.Apply(
sm.Where(psql.Quote("id").EQ(psql.Arg(id))),
)
}

if err := applyRangeFilter(
"last_pipeline_report_at",
dateRangeFilterParams{
Query: &query,
DateRangeDays: 0,
StartTime: startTime,
EndTime: endTime,
}); err != nil {
return nil, 0, fmt.Errorf("applying last_pipeline_report_at range filter: %w", err)
}

totalCount := 0
totalQuery := psql.Select(sm.From(query), sm.Columns("count(*)"))
totalQueryString, totalArgs, err := totalQuery.Build(ctx)
if err != nil {
logrus.Errorf("building total count query failed: %s\n\t%s", totalQueryString, err)
return nil, 0, err
}

if err = DB.QueryRow(ctx, totalQueryString, totalArgs...).Scan(
&totalCount,
); err != nil {
logrus.Errorf("parsing total count result: %s", err)
}

if limit < totalCount && limit > 0 {
query.Apply(
sm.Limit(limit),
sm.Offset((page-1)*limit),
)
}

queryString, args, err := query.Build(ctx)

if err != nil {
logrus.Errorf("building query failed: %s\n\t%s", queryString, err)
return nil, 0, err
}

rows, err := DB.Query(ctx, queryString, args...)
if err != nil {
logrus.Errorf("query failed: %s\n\t%s", queryString, err)
return nil, 0, err
}
defer rows.Close()

results := []model.Label{}

for rows.Next() {
r := model.Label{}

err = rows.Scan(&r.ID, &r.Key, &r.Value, &r.CreatedAt, &r.UpdatedAt, &r.LastPipelineReportAt)
if err != nil {
logrus.Errorf("scanning label row failed: %s", err)
continue
}

results = append(results, r)
}

if err := rows.Err(); err != nil {
logrus.Errorf("iterating label rows failed: %s", err)
return nil, 0, err
}

return results, totalCount, nil
}

// InitLabels takes a map of labels and ensures that they exist in the database, creating them if necessary.
func InitLabels(ctx context.Context, labels map[string]string) ([]uuid.UUID, error) {
errs := []error{}
labelIDs := []uuid.UUID{}

for labelKey, labelValue := range labels {
if labelKey == "" {
errs = append(errs, fmt.Errorf("missing key, ignoring label:\t%q:%q", labelKey, labelValue))
continue
}

if labelValue == "" {
errs = append(errs, fmt.Errorf("missing value, ignoring label:\t%q:%q", labelKey, labelValue))
continue
}

labelRecords, totalCount, err := GetLabelRecords(ctx, "", labelKey, labelValue, "", "", 0, 1)
if err != nil {
errs = append(errs, fmt.Errorf("failed to get labels: %s", err))
continue
}

switch totalCount {
case 0:
id, err := InsertLabel(ctx, labelKey, labelValue)
if err != nil {
err := fmt.Errorf("insert label data: %s", err)
errs = append(errs, err)
continue
}

parsedID, err := uuid.Parse(id)
if err != nil {
errs = append(errs, fmt.Errorf("parsing id: %s", err))
continue
}

labelIDs = append(labelIDs, parsedID)
case 1:
if labelValue == labelRecords[0].Value {
labelIDs = append(labelIDs, labelRecords[0].ID)
}
default:
errMsg := fmt.Errorf("something went wrong multiple labels found for key %s", labelKey)
logrus.Error(errMsg)
errs = append(errs, errMsg)
}
}

if len(errs) > 0 {
for i := range errs {
logrus.Errorln(errs[i])
}
return nil, fmt.Errorf("something went wrong during label creation")
}

return labelIDs, nil
}
90 changes: 90 additions & 0 deletions pkg/database/label_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package database

import (
"context"
"fmt"
"strings"

"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/psql"
"github.com/stephenafamo/bob/dialect/psql/dialect"
"github.com/stephenafamo/bob/dialect/psql/sm"
)

// labelFilterParams holds parameters for applying a date range filter to a query.
type labelFilterParams struct {
Query *bob.BaseQuery[*dialect.SelectQuery]
Labels map[string]string
StartTime string
EndTime string
Ctx context.Context
}

func applyLabelFilter(params labelFilterParams) error {

if params.Ctx == nil {
params.Ctx = context.Background()
}

if len(params.Labels) == 0 {
return nil
}

errs := []error{}
for key, value := range params.Labels {
if key == "" {
errs = append(errs, fmt.Errorf("label key cannot be empty"))
continue
}

results, totalCounts, err := GetLabelRecords(
params.Ctx,
"",
key,
value,
params.StartTime,
params.EndTime,
0,
1,
)
if err != nil {
errs = append(errs, fmt.Errorf("failed getting label records: %s", err))
continue
}

if totalCounts == 0 {
if value == "" {
errs = append(errs, fmt.Errorf("label not found for key %s", key))
} else {
errs = append(errs, fmt.Errorf("label not found for %s=%s", key, value))
}
continue
}

ids := make([]string, 0, len(results))
for i := range results {
ids = append(ids, results[i].ID.String())
}

if len(ids) == 0 {
if value == "" {
errs = append(errs, fmt.Errorf("no label ids found for key %s", key))
} else {
errs = append(errs, fmt.Errorf("no label ids found for %s=%s", key, value))
}
continue
}

params.Query.Apply(
sm.Where(
psql.Raw(`label_ids && ?`, fmt.Sprintf("{%s}", strings.Join(ids, ","))),
),
)
}

if len(errs) > 0 {
return fmt.Errorf("errors occurred while applying label filter: %v", errs)
}

return nil
}
4 changes: 4 additions & 0 deletions pkg/database/migrations/000007_create_labels_tables.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS idx_labels_key;
DROP INDEX IF EXISTS idx_labels_value;
ALTER TABLE IF EXISTS labels DROP CONSTRAINT IF EXISTS labels_key_value_unique;
DROP TABLE IF EXISTS labels;
19 changes: 19 additions & 0 deletions pkg/database/migrations/000007_create_labels_tables.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This migration will create the labels table in the database, which will store the labels associated with the database.
BEGIN;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE IF NOT EXISTS labels(
id uuid DEFAULT uuid_generate_v4 (),
key VARCHAR NOT NULL,
value VARCHAR NOT NULL,
created_at TIMESTAMP,
updated_at TIMESTAMP,
last_pipeline_report_at TIMESTAMP,
CONSTRAINT labels_key_value_unique UNIQUE (key, value)
);
ALTER TABLE labels ALTER COLUMN created_at SET DEFAULT now();
ALTER TABLE labels ALTER COLUMN updated_at SET DEFAULT now();

CREATE INDEX IF NOT EXISTS idx_labels_key ON labels (key);
CREATE INDEX IF NOT EXISTS idx_labels_value ON labels (value);

COMMIT;
Loading
Loading