Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ e2e-test: build-all-linux ## Run end-to-end tests (hermetic, auto-builds binarie

fmt: ## Format Go and YAML code
@echo "Formatting Go code..."
@$(BAZEL) run @rules_go//go -- run golang.org/x/tools/cmd/goimports@$(GOIMPORTS_VERSION) -w .
@find . -name '*.go' -not -path './pkg/*' -not -path './bazel-*' | xargs $(BAZEL) run @rules_go//go -- run golang.org/x/tools/cmd/goimports@$(GOIMPORTS_VERSION) -w
@echo "Formatting YAML files..."
@$(BAZEL) run @rules_go//go -- run github.com/google/yamlfmt/cmd/yamlfmt@$(YAMLFMT_VERSION)
@echo "Formatting complete!"
Expand Down
6 changes: 6 additions & 0 deletions extension/queue/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gomock(
out = "mock_stores.go",
mockgen_tool = _MOCKGEN,
package = "mysql",
self_package = "github.com/uber/submitqueue/extension/queue/mysql",
source = "stores.go",
source_importpath = "github.com/uber/submitqueue/extension/queue/mysql",
)
Expand All @@ -17,6 +18,7 @@ go_library(
name = "mysql",
srcs = [
"constants.go",
"delivery_state_store.go",
"errors.go",
"message_store.go",
":mock_stores_src",
Expand All @@ -26,10 +28,12 @@ go_library(
"sql.go",
"stores.go",
"subscriber.go",
"subscriber_heartbeat_store.go",
],
importpath = "github.com/uber/submitqueue/extension/queue/mysql",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//entity/queue",
"//extension/queue",
"@com_github_uber_go_tally_v4//:tally",
Expand All @@ -41,11 +45,13 @@ go_library(
go_test(
name = "mysql_test",
srcs = [
"delivery_state_store_test.go",
"message_store_test.go",
"offset_store_test.go",
"partition_lease_store_test.go",
"publisher_test.go",
"sql_test.go",
"subscriber_heartbeat_store_test.go",
"subscriber_test.go",
],
embed = [":mysql"],
Expand Down
7 changes: 0 additions & 7 deletions extension/queue/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@ package mysql
// Common constants for frequently repeated strings across stores

const (
// Tag key (used in every Tagged() call)
tagErrorType = "error_type"

// Common log field names (used extensively across all stores)
logTopic = "topic"
logPartitionKey = "partition_key"
logMessageID = "message_id"
logError = "error"

// Error types used across multiple methods/stores
errorBeginTx = "begin_transaction"
errorCommit = "commit"
)
12 changes: 3 additions & 9 deletions extension/queue/mysql/ctl/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ func newTopicStatsCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
rows := [][]string{
{"Topic", stats.Topic},
{"Total Messages", strconv.FormatInt(stats.TotalMessages, 10)},
{"Visible Messages", strconv.FormatInt(stats.VisibleMessages, 10)},
{"Invisible Messages", strconv.FormatInt(stats.InvisibleMessages, 10)},
{"DLQ Count", strconv.FormatInt(stats.DLQCount, 10)},
{"Partitions", strconv.FormatInt(stats.PartitionCount, 10)},
{"Consumer Groups", strconv.FormatInt(stats.ConsumerGroupCount, 10)},
Expand Down Expand Up @@ -181,16 +179,15 @@ func newListMessagesCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
if *jsonOut {
return lib.FormatJSON(os.Stdout, messages)
}
headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "INVISIBLE_UNTIL", "CREATED_AT"}
headers := []string{"OFFSET", "ID", "PARTITION", "CREATED_AT", "PUBLISHED_AT"}
var rows [][]string
for _, m := range messages {
rows = append(rows, []string{
strconv.FormatInt(m.Offset, 10),
m.ID,
m.PartitionKey,
strconv.Itoa(m.RetryCount),
lib.FormatMillis(m.InvisibleUntil),
lib.FormatMillis(m.CreatedAt),
lib.FormatMillis(m.PublishedAt),
})
}
lib.FormatTable(os.Stdout, headers, rows)
Expand Down Expand Up @@ -226,8 +223,6 @@ func newInspectMessageCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command
{"ID", detail.ID},
{"Topic", detail.Topic},
{"Partition", detail.PartitionKey},
{"Retry Count", strconv.Itoa(detail.RetryCount)},
{"Invisible Until", lib.FormatMillis(detail.InvisibleUntil)},
{"Created At", lib.FormatMillis(detail.CreatedAt)},
{"Published At", lib.FormatMillis(detail.PublishedAt)},
{"Payload", string(detail.Payload)},
Expand Down Expand Up @@ -321,14 +316,13 @@ func newListDLQCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
if *jsonOut {
return lib.FormatJSON(os.Stdout, messages)
}
headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "CREATED_AT"}
headers := []string{"OFFSET", "ID", "PARTITION", "CREATED_AT"}
var rows [][]string
for _, m := range messages {
rows = append(rows, []string{
strconv.FormatInt(m.Offset, 10),
m.ID,
m.PartitionKey,
strconv.Itoa(m.RetryCount),
lib.FormatMillis(m.CreatedAt),
})
}
Expand Down
33 changes: 6 additions & 27 deletions extension/queue/mysql/ctl/lib/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ type MessageSummary struct {
Topic string
// PartitionKey determines message distribution
PartitionKey string
// RetryCount tracks retries on the current topic
RetryCount int
// InvisibleUntil is the epoch milliseconds until which the message is hidden
InvisibleUntil int64
// CreatedAt is the epoch milliseconds when the message was created
CreatedAt int64
// PublishedAt is the epoch milliseconds when the message was published
Expand Down Expand Up @@ -116,10 +112,6 @@ type TopicStats struct {
Topic string
// TotalMessages is the total number of messages
TotalMessages int64
// VisibleMessages is the count of messages currently visible for consumption
VisibleMessages int64
// InvisibleMessages is the count of messages hidden by visibility timeout
InvisibleMessages int64
// DLQCount is the number of messages in the DLQ for this topic
DLQCount int64
// PartitionCount is the number of distinct partitions
Expand Down Expand Up @@ -154,7 +146,6 @@ func (s *AdminStore) ListTopics(ctx context.Context) ([]TopicInfo, error) {
// GetTopicStats returns detailed statistics for a topic.
func (s *AdminStore) GetTopicStats(ctx context.Context, topic string, dlqSuffix string) (TopicStats, error) {
stats := TopicStats{Topic: topic}
nowMs := time.Now().UnixMilli()

// Total messages
err := s.db.QueryRowContext(ctx,
Expand All @@ -165,18 +156,6 @@ func (s *AdminStore) GetTopicStats(ctx context.Context, topic string, dlqSuffix
return stats, fmt.Errorf("count total: %w", err)
}

// Visible messages (invisible_until <= now)
err = s.db.QueryRowContext(ctx,
fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE topic = ? AND invisible_until <= ?", mysql.MessagesTableName),
topic, nowMs,
).Scan(&stats.VisibleMessages)
if err != nil {
return stats, fmt.Errorf("count visible: %w", err)
}

// Invisible messages
stats.InvisibleMessages = stats.TotalMessages - stats.VisibleMessages

// DLQ count
dlqTopic := topic + dlqSuffix
err = s.db.QueryRowContext(ctx,
Expand Down Expand Up @@ -215,12 +194,12 @@ func (s *AdminStore) ListMessages(ctx context.Context, topic string, partition s

if partition != "" {
rows, err = s.db.QueryContext(ctx,
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at FROM %s WHERE topic = ? AND partition_key = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, created_at, published_at FROM %s WHERE topic = ? AND partition_key = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
topic, partition, limit,
)
} else {
rows, err = s.db.QueryContext(ctx,
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at FROM %s WHERE topic = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, created_at, published_at FROM %s WHERE topic = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
topic, limit,
)
}
Expand All @@ -232,7 +211,7 @@ func (s *AdminStore) ListMessages(ctx context.Context, topic string, partition s
var messages []MessageSummary
for rows.Next() {
var m MessageSummary
if err := rows.Scan(&m.Offset, &m.ID, &m.Topic, &m.PartitionKey, &m.RetryCount, &m.InvisibleUntil, &m.CreatedAt, &m.PublishedAt); err != nil {
if err := rows.Scan(&m.Offset, &m.ID, &m.Topic, &m.PartitionKey, &m.CreatedAt, &m.PublishedAt); err != nil {
return nil, fmt.Errorf("scan message row: %w", err)
}
messages = append(messages, m)
Expand All @@ -246,9 +225,9 @@ func (s *AdminStore) InspectMessage(ctx context.Context, topic string, messageID
var metadataJSON []byte

err := s.db.QueryRowContext(ctx,
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at, payload, metadata, failed_at, failure_count, last_error, original_topic FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName),
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, created_at, published_at, payload, metadata, failed_at, failure_count, last_error, original_topic FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName),
topic, messageID,
).Scan(&d.Offset, &d.ID, &d.Topic, &d.PartitionKey, &d.RetryCount, &d.InvisibleUntil, &d.CreatedAt, &d.PublishedAt, &d.Payload, &metadataJSON, &d.FailedAt, &d.FailureCount, &d.LastError, &d.OriginalTopic)
).Scan(&d.Offset, &d.ID, &d.Topic, &d.PartitionKey, &d.CreatedAt, &d.PublishedAt, &d.Payload, &metadataJSON, &d.FailedAt, &d.FailureCount, &d.LastError, &d.OriginalTopic)
if err == sql.ErrNoRows {
return d, false, nil
}
Expand Down Expand Up @@ -323,7 +302,7 @@ func (s *AdminStore) RequeueDLQ(ctx context.Context, topic string, messageID str
// Insert into original topic with reset fields
nowMs := time.Now().UnixMilli()
_, err = tx.ExecContext(ctx,
fmt.Sprintf("INSERT INTO %s (topic, partition_key, id, payload, metadata, retry_count, invisible_until, created_at, published_at, failed_at, failure_count, last_error, original_topic) VALUES (?, ?, ?, ?, ?, 0, 0, ?, ?, 0, 0, '', '')", mysql.MessagesTableName),
fmt.Sprintf("INSERT INTO %s (topic, partition_key, id, payload, metadata, created_at, published_at, failed_at, failure_count, last_error, original_topic) VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, '', '')", mysql.MessagesTableName),
topic, partitionKey, messageID, payload, metadataJSON, createdAt, nowMs,
)
if err != nil {
Expand Down
24 changes: 8 additions & 16 deletions extension/queue/mysql/ctl/lib/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ func TestGetTopicStats(t *testing.T) {
WithArgs("orders").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(100))

// Visible messages
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM queue_messages WHERE topic = \\? AND invisible_until <= \\?").
WithArgs("orders", sqlmock.AnyArg()).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(80))

// DLQ count
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM queue_messages WHERE topic = \\?").
WithArgs("orders_dlq").
Expand All @@ -99,8 +94,6 @@ func TestGetTopicStats(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "orders", stats.Topic)
assert.Equal(t, int64(100), stats.TotalMessages)
assert.Equal(t, int64(80), stats.VisibleMessages)
assert.Equal(t, int64(20), stats.InvisibleMessages)
assert.Equal(t, int64(3), stats.DLQCount)
assert.Equal(t, int64(4), stats.PartitionCount)
assert.Equal(t, int64(2), stats.ConsumerGroupCount)
Expand All @@ -114,9 +107,9 @@ func TestListMessages(t *testing.T) {

store := NewAdminStore(db)

rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at"}).
AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000).
AddRow(2, "msg-2", "orders", "repo-1", 1, 5000, 2000, 2000)
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at"}).
AddRow(1, "msg-1", "orders", "repo-1", 1000, 1000).
AddRow(2, "msg-2", "orders", "repo-1", 2000, 2000)
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? ORDER BY `offset` LIMIT \\?").
WithArgs("orders", 50).
WillReturnRows(rows)
Expand All @@ -127,7 +120,6 @@ func TestListMessages(t *testing.T) {
assert.Equal(t, "msg-1", messages[0].ID)
assert.Equal(t, int64(1), messages[0].Offset)
assert.Equal(t, "msg-2", messages[1].ID)
assert.Equal(t, 1, messages[1].RetryCount)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -138,8 +130,8 @@ func TestListMessagesWithPartition(t *testing.T) {

store := NewAdminStore(db)

rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at"}).
AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000)
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at"}).
AddRow(1, "msg-1", "orders", "repo-1", 1000, 1000)
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND partition_key = \\? ORDER BY `offset` LIMIT \\?").
WithArgs("orders", "repo-1", 10).
WillReturnRows(rows)
Expand All @@ -158,8 +150,8 @@ func TestInspectMessage(t *testing.T) {

store := NewAdminStore(db)

rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"}).
AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000, []byte("hello"), []byte(`{"key":"val"}`), 0, 0, "", "")
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"}).
AddRow(1, "msg-1", "orders", "repo-1", 1000, 1000, []byte("hello"), []byte(`{"key":"val"}`), 0, 0, "", "")
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND id = \\?").
WithArgs("orders", "msg-1").
WillReturnRows(rows)
Expand All @@ -181,7 +173,7 @@ func TestInspectMessageNotFound(t *testing.T) {

store := NewAdminStore(db)

rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"})
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"})
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND id = \\?").
WithArgs("orders", "missing").
WillReturnRows(rows)
Expand Down
Loading