Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions extensions/queue/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ go_library(
name = "sql",
srcs = [
"config.go",
"errors.go",
"mock_stores.go",
"publisher.go",
"stores.go",
"subscriber.go",
"validation.go",
],
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
visibility = ["//visibility:public"],
deps = [
"//entities/queue",
"//extensions/queue",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
Expand All @@ -24,6 +27,7 @@ go_test(
srcs = [
"config_test.go",
"publisher_test.go",
"subscriber_test.go",
],
embed = [":sql"],
deps = [
Expand Down
12 changes: 12 additions & 0 deletions extensions/queue/sql/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package sql

import "fmt"

// ErrAlreadyAcknowledged is returned when attempting to ack/nack a delivery that was already processed
type ErrAlreadyAcknowledged struct {
DeliveryID string
}

func (e *ErrAlreadyAcknowledged) Error() string {
return fmt.Sprintf("delivery %s already acknowledged or nacked", e.DeliveryID)
}
265 changes: 250 additions & 15 deletions extensions/queue/sql/mock_stores.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions extensions/queue/sql/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type publisher struct {
config Config
logger *zap.SugaredLogger
metrics tally.Scope
messageStore MessageStore
messageStore messageStore
mu sync.RWMutex
closed bool
}

// NewPublisher creates a publisher with the given configuration and dependencies
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore MessageStore) *publisher {
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore) *publisher {
return &publisher{
config: config,
logger: logger,
Expand Down
Loading