Skip to content

Commit

Permalink
🐛 fix(codereview:changes) small things like nameing and the correct w…
Browse files Browse the repository at this point in the history
…ay to close the deduplicator
  • Loading branch information
vladshub committed Dec 30, 2019
1 parent 979554b commit c2d8b8c
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 236 deletions.
14 changes: 14 additions & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,17 @@ type Logged interface {
SetLogger(entry logrus.FieldLogger)
Log() logrus.FieldLogger
}

// Deduplicator abstracts the way to manages the duplications
type Deduplicator interface {
// StoreMessageID stores the message id in the storage
StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error
// MessageIDExists checks if message exists in storage
MessageIDExists(logger logrus.FieldLogger, id string) (bool, error)
// Deletes all data from the storage of the duplicator
Purge(logger logrus.FieldLogger) error
// Starts the background process which cleans the storage of the duplicator
Start(logger logrus.FieldLogger)
// Stops the background process of cleaning
Stop(logger logrus.FieldLogger)
}
6 changes: 3 additions & 3 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/deduplicator/implementation"
"github.com/wework/grabbit/gbus/deduplicator"
"github.com/wework/grabbit/gbus/saga"
"github.com/wework/grabbit/gbus/serialization"
"github.com/wework/grabbit/gbus/tx/mysql"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
if builder.usingPingTimeout {
gb.DbPingTimeout = builder.dbPingTimeout
}
gb.Deduplicator = implementation.NewDeduplicator(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge, gb.Log())
gb.Deduplicator = deduplicator.New(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge)

//TODO move this into the NewSagaStore factory methods
if builder.purgeOnStartup {
Expand All @@ -121,7 +121,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
panic(errMsg)
}
err = gb.Deduplicator.Purge()
err = gb.Deduplicator.Purge(gb.Log())
if err != nil {
errMsg := errors.NewWithDetails("duplicator failed to purge", "component", "grabbit", "feature", "deduplicator")
panic(errMsg)
Expand Down
39 changes: 19 additions & 20 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/wework/grabbit/gbus/deduplicator"
"github.com/wework/grabbit/gbus/metrics"

"github.com/opentracing-contrib/go-amqp/amqptracer"
Expand Down Expand Up @@ -61,7 +60,7 @@ type DefaultBus struct {
Serializer Serializer
DLX string
DeduplicationPolicy DeduplicationPolicy
Deduplicator deduplicator.Store
Deduplicator Deduplicator
DefaultPolicies []MessagePolicy
Confirm bool
healthChan chan error
Expand Down Expand Up @@ -225,7 +224,7 @@ func (b *DefaultBus) Start() error {
return startErr
}

b.Deduplicator.Start()
b.Deduplicator.Start(b.Log())

//declare queue
var q amqp.Queue
Expand Down Expand Up @@ -285,23 +284,23 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
tag := fmt.Sprintf("%s_worker_%d", b.SvcName, i)

w := &worker{
consumerTag: tag,
channel: amqpChan,
q: b.serviceQueue,
rpcq: b.rpcQueue,
svcName: b.SvcName,
txProvider: b.TxProvider,
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
deadletterHandler: b.deadletterHandler,
globalRawHandler: b.globalRawHandler,
handlersLock: &sync.Mutex{},
registrations: b.Registrations,
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors,
delicatePolicy: b.DeduplicationPolicy,
duplicateStore: b.Deduplicator,
consumerTag: tag,
channel: amqpChan,
q: b.serviceQueue,
rpcq: b.rpcQueue,
svcName: b.SvcName,
txProvider: b.TxProvider,
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
deadletterHandler: b.deadletterHandler,
globalRawHandler: b.globalRawHandler,
handlersLock: &sync.Mutex{},
registrations: b.Registrations,
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors,
deduplicationPolicy: b.DeduplicationPolicy,
deduplicator: b.Deduplicator,
}

err := w.Start()
Expand Down
162 changes: 162 additions & 0 deletions gbus/deduplicator/deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package deduplicator

import (
"database/sql"
"sync"
"time"

"emperror.dev/errors"
"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/tx"
)

var _ gbus.Deduplicator = &dedup{}

type dedup struct {
svcName string
policy gbus.DeduplicationPolicy
txProvider gbus.TxProvider
age time.Duration
ticker *time.Ticker
done chan bool
tableName string
started bool
startStopMutex sync.Mutex
}

func (d *dedup) Purge(logger logrus.FieldLogger) (err error) {
truncateSQL := "TRUNCATE TABLE " + d.tableName
txp, err := d.txProvider.New()
if err != nil {
logger.WithError(err).WithField("table_name", d.tableName).Error("failed purging duplicates table")
return err
}
defer func() {
if err != nil {
serr := txp.Rollback()
logger.WithError(serr).Error("failed rolling back transaction after purge")
err = errors.Append(err, serr)
}
err = txp.Commit()
}()
_, err = txp.Exec(truncateSQL)
if err != nil {
logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing truncate on table")
return err
}
logger.WithField("table_name", d.tableName).Info("successfully truncated table")
return nil
}

func (d *dedup) Start(l logrus.FieldLogger) {
d.startStopMutex.Lock()
defer d.startStopMutex.Unlock()
logger := d.decoratedLog(l)
d.ticker = time.NewTicker(time.Minute)
d.done = make(chan bool)
deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?"
go func() {
for {
select {
case <-d.done:
return
case <-d.ticker.C:
oldest := time.Now().Add(-1 * d.age)
tx, err := d.txProvider.New()
if err != nil {
logger.WithError(err).Error("failed to acquire a tx")
continue
}
result, err := tx.Exec(deleteQuery, oldest)
if err != nil && err != sql.ErrNoRows {
logger.WithError(err).Error("failed executing delete query")
continue
}
n, err := result.RowsAffected()
if err != nil {
logger.WithError(err).Error("failed to get count of affected rows")
} else {
logger.WithField("table_name", d.tableName).WithField("rows_deleted", n).
Info("successfully cleanup duplicates table")
}
}
}
}()
d.started = true
}

func (d *dedup) decoratedLog(l logrus.FieldLogger) logrus.FieldLogger {
logger := l.WithField("grabbit", "dedup")
return logger
}

func (d *dedup) Stop(logger logrus.FieldLogger) {
d.decoratedLog(logger).Info("shutting down deduplicator")
d.startStopMutex.Lock()
defer d.startStopMutex.Unlock()
if d.started {
d.ticker.Stop()
close(d.done)
d.started = false
}
}

//
func (d *dedup) StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error {
insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)"
_, err := tx.Exec(insertSQL, id)
if err != nil {
d.decoratedLog(logger).WithError(err).Error("failed to insert the id of the message into the dedup table")
return err
}
return nil
}

// MessageIDExists checks if a message id is in the deduplication table and returns an error if it fails
func (d *dedup) MessageIDExists(l logrus.FieldLogger, id string) (bool, error) {
logger := d.decoratedLog(l)
if d.policy == gbus.DeduplicationPolicyNone {
logger.Debug("duplication policy is none")
return false, nil
}
tx, err := d.txProvider.New()
if err != nil {
logger.WithError(err).Error("failed getting tx from txProvider")
return true, err
}
defer func() {
err = tx.Rollback()
if err != nil {
logger.WithError(err).Error("could not commit tx for query MessageIDExists")
}
}()
selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)"

var exists bool
err = tx.QueryRow(selectSQL, id).Scan(&exists)
if err != nil && err == sql.ErrNoRows {
logger.WithField("table_name", d.tableName).Debug("no rows in result set when looking for messages in duplicates table")
return false, nil
}

if err != nil {
logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing lookup query in duplicates table")
return true, err
}

return exists, nil
}

func New(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration) gbus.Deduplicator {
d := &dedup{
svcName: svcName,
policy: policy,
txProvider: txProvider,
age: age,
tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"),
started: false,
}
return d
}
Loading

0 comments on commit c2d8b8c

Please sign in to comment.