Skip to content

Commit

Permalink
Allow transactions to timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Dec 1, 2023
1 parent ae58c85 commit 28994a1
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 49 deletions.
4 changes: 2 additions & 2 deletions cmd/event-service/di/inject_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

var sqliteAdaptersSet = wire.NewSet(
newSqliteDB,
sqlite.NewDatabaseMutex,
sqlite.NewTransactionRunner,

sqlite.NewTransactionProvider,
wire.Bind(new(app.TransactionProvider), new(*sqlite.TransactionProvider)),
Expand All @@ -31,7 +31,7 @@ var sqliteAdaptersSet = wire.NewSet(

var sqliteTestAdaptersSet = wire.NewSet(
newSqliteDB,
sqlite.NewDatabaseMutex,
sqlite.NewTransactionRunner,

sqlite.NewTestTransactionProvider,

Expand Down
9 changes: 9 additions & 0 deletions cmd/event-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/boreq/errors"
"github.com/hashicorp/go-multierror"
"github.com/planetary-social/nos-event-service/internal/migrations"
"github.com/planetary-social/nos-event-service/service/adapters/sqlite"
"github.com/planetary-social/nos-event-service/service/app"
"github.com/planetary-social/nos-event-service/service/domain/downloader"
"github.com/planetary-social/nos-event-service/service/ports/http"
Expand All @@ -21,6 +22,7 @@ type Service struct {
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber
eventSavedEventSubscriber *sqlitepubsub.EventSavedEventSubscriber
metricsTimer *timer.Metrics
transactionRunner *sqlite.TransactionRunner
migrationsRunner *migrations.Runner
migrations migrations.Migrations
migrationsProgressCallback migrations.ProgressCallback
Expand All @@ -33,6 +35,7 @@ func NewService(
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber,
eventSavedEventSubscriber *sqlitepubsub.EventSavedEventSubscriber,
metricsTimer *timer.Metrics,
transactionRunner *sqlite.TransactionRunner,
migrationsRunner *migrations.Runner,
migrations migrations.Migrations,
migrationsProgressCallback migrations.ProgressCallback,
Expand All @@ -44,6 +47,7 @@ func NewService(
receivedEventSubscriber: receivedEventSubscriber,
eventSavedEventSubscriber: eventSavedEventSubscriber,
metricsTimer: metricsTimer,
transactionRunner: transactionRunner,
migrationsRunner: migrationsRunner,
migrations: migrations,
migrationsProgressCallback: migrationsProgressCallback,
Expand Down Expand Up @@ -90,6 +94,11 @@ func (s Service) Run(ctx context.Context) error {
errCh <- errors.Wrap(s.metricsTimer.Run(ctx), "metrics timer error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.transactionRunner.Run(ctx), "transaction runner error")
}()

var err error
for i := 0; i < runners; i++ {
err = multierror.Append(err, errors.Wrap(<-errCh, "error returned by runner"))
Expand Down
15 changes: 8 additions & 7 deletions cmd/event-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/adapters/sqlite/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestPubSub_PublishingMessagesWithIdenticalUUIDsReturnsAnError(t *testing.T)
require.NoError(t, err)

err = adapters.PubSub.Publish(ctx, fixtures.SomeString(), msg)
require.EqualError(t, err, "transaction error: error calling the provided function: UNIQUE constraint failed: pubsub.uuid")
require.EqualError(t, err, "transaction error: received an error: error calling the callback: error calling the adapters callback: UNIQUE constraint failed: pubsub.uuid")
}

func TestPubSub_NackedMessagesAreRetried(t *testing.T) {
Expand Down
130 changes: 91 additions & 39 deletions service/adapters/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sqlite
import (
"context"
"database/sql"
"sync"

"github.com/boreq/errors"
"github.com/hashicorp/go-multierror"
Expand All @@ -30,6 +29,7 @@ type TestedItems struct {
MigrationsRunner *migrations.Runner
Migrations migrations.Migrations
MigrationsProgressCallback migrations.ProgressCallback
TransactionRunner *TransactionRunner
}

func Open(conf config.Config) (*sql.DB, error) {
Expand All @@ -47,12 +47,12 @@ type TransactionProvider = GenericTransactionProvider[app.Adapters]
func NewTransactionProvider(
db *sql.DB,
fn AdaptersFactoryFn,
mutex *DatabaseMutex,
runner *TransactionRunner,
) *TransactionProvider {
return &TransactionProvider{
db: db,
fn: fn,
mutex: mutex,
db: db,
fn: fn,
runner: runner,
}
}

Expand All @@ -62,60 +62,118 @@ type TestTransactionProvider = GenericTransactionProvider[TestAdapters]
func NewTestTransactionProvider(
db *sql.DB,
fn TestAdaptersFactoryFn,
mutex *DatabaseMutex,
runner *TransactionRunner,
) *TestTransactionProvider {
return &TestTransactionProvider{
db: db,
fn: fn,
mutex: mutex,
db: db,
fn: fn,
runner: runner,
}
}

type PubSubTxTransactionProvider = GenericTransactionProvider[*sql.Tx]

func NewPubSubTxTransactionProvider(
db *sql.DB,
mutex *DatabaseMutex,
runner *TransactionRunner,
) *PubSubTxTransactionProvider {
return &PubSubTxTransactionProvider{
db: db,
fn: func(db *sql.DB, tx *sql.Tx) (*sql.Tx, error) {
return tx, nil
},
mutex: mutex,
runner: runner,
}
}

type GenericAdaptersFactoryFn[T any] func(*sql.DB, *sql.Tx) (T, error)

type GenericTransactionProvider[T any] struct {
db *sql.DB
fn GenericAdaptersFactoryFn[T]
mutex *DatabaseMutex
db *sql.DB
fn GenericAdaptersFactoryFn[T]
runner *TransactionRunner
}

func (t *GenericTransactionProvider[T]) Transact(ctx context.Context, f func(context.Context, T) error) error {
t.mutex.Lock()
defer t.mutex.Unlock()
func (t *GenericTransactionProvider[T]) Transact(ctx context.Context, fn func(context.Context, T) error) error {
transactionFunc := t.makeTransactionFunc(fn)
return t.runner.TryRun(ctx, transactionFunc)
}

tx, err := t.db.BeginTx(ctx, nil)
if err != nil {
return errors.Wrap(err, "error starting the transaction")
func (t *GenericTransactionProvider[T]) makeTransactionFunc(fn func(context.Context, T) error) TransactionFunc {
return func(ctx context.Context, db *sql.DB, tx *sql.Tx) error {
adapters, err := t.fn(t.db, tx)
if err != nil {
return errors.Wrap(err, "error building the adapters")
}

if err := fn(ctx, adapters); err != nil {
return errors.Wrap(err, "error calling the adapters callback")
}

return nil
}
}

adapters, err := t.fn(t.db, tx)
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
err = multierror.Append(err, errors.Wrap(rollbackErr, "rollback error"))
type TransactionFunc func(context.Context, *sql.DB, *sql.Tx) error

type TransactionRunner struct {
db *sql.DB
chIn chan transactionTask
}

func NewTransactionRunner(db *sql.DB) *TransactionRunner {
return &TransactionRunner{
db: db,
chIn: make(chan transactionTask),
}
}

func (t *TransactionRunner) TryRun(ctx context.Context, fn TransactionFunc) error {
resultCh := make(chan error)

select {
case t.chIn <- newTransactionTask(ctx, fn, resultCh):
case <-ctx.Done():
return ctx.Err()
}

select {
case err := <-resultCh:
return errors.Wrap(err, "received an error")
case <-ctx.Done():
return ctx.Err()
}
}

func (t *TransactionRunner) Run(ctx context.Context) error {
for {
select {
case task := <-t.chIn:
select {
case task.ResultCh <- t.run(task.Ctx, task.Fn):
continue
case <-task.Ctx.Done():
continue
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
}
return errors.Wrap(err, "error building the adapters")
}
}

if err := f(ctx, adapters); err != nil {
func (t *TransactionRunner) run(ctx context.Context, fn TransactionFunc) error {
tx, err := t.db.BeginTx(ctx, nil)
if err != nil {
return errors.Wrap(err, "error starting the transaction")
}

if err := fn(ctx, t.db, tx); err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
err = multierror.Append(err, errors.Wrap(rollbackErr, "rollback error"))
}
return errors.Wrap(err, "error calling the provided function")
return errors.Wrap(err, "error calling the callback")
}

if err := tx.Commit(); err != nil {
Expand All @@ -125,18 +183,12 @@ func (t *GenericTransactionProvider[T]) Transact(ctx context.Context, f func(con
return nil
}

type DatabaseMutex struct {
m sync.Mutex
}

func NewDatabaseMutex() *DatabaseMutex {
return &DatabaseMutex{}
}

func (m *DatabaseMutex) Lock() {
m.m.Lock()
type transactionTask struct {
Ctx context.Context
Fn TransactionFunc
ResultCh chan<- error
}

func (m *DatabaseMutex) Unlock() {
m.m.Unlock()
func newTransactionTask(ctx context.Context, fn TransactionFunc, resultCh chan<- error) transactionTask {
return transactionTask{Ctx: ctx, Fn: fn, ResultCh: resultCh}
}
9 changes: 9 additions & 0 deletions service/adapters/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/cmd/event-service/di"
"github.com/planetary-social/nos-event-service/service/adapters/sqlite"
"github.com/stretchr/testify/require"
Expand All @@ -18,5 +19,13 @@ func NewTestAdapters(ctx context.Context, tb testing.TB) sqlite.TestedItems {
err = adapters.MigrationsRunner.Run(ctx, adapters.Migrations, adapters.MigrationsProgressCallback)
require.NoError(tb, err)

go func() {
if err := adapters.TransactionRunner.Run(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)
}
}
}()

return adapters
}
4 changes: 4 additions & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ var (
ErrPublicKeyToMonitorNotFound = errors.New("public key to monitor not found")
)

const (
applicationHandlerTimeout = 30 * time.Second
)

type TransactionProvider interface {
Transact(context.Context, func(context.Context, Adapters) error) error
}
Expand Down
11 changes: 11 additions & 0 deletions service/app/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/internal"
Expand All @@ -22,6 +23,11 @@ func NewDatabaseRelaySource(transactionProvider TransactionProvider, logger logg
}

func (m *DatabaseRelaySource) GetRelays(ctx context.Context) ([]domain.RelayAddress, error) {
start := time.Now()
defer func() {
m.logger.Debug().WithField("duration", time.Since(start)).Message("got relays")
}()

var maybeResult []domain.MaybeRelayAddress
if err := m.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error {
tmp, err := adapters.Relays.List(ctx)
Expand Down Expand Up @@ -69,6 +75,11 @@ func NewDatabasePublicKeySource(transactionProvider TransactionProvider, logger
}

func (d *DatabasePublicKeySource) GetPublicKeys(ctx context.Context) ([]domain.PublicKey, error) {
start := time.Now()
defer func() {
d.logger.Debug().WithField("duration", time.Since(start)).Message("got public keys")
}()

result := internal.NewEmptySet[domain.PublicKey]()

if err := d.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error {
Expand Down
3 changes: 3 additions & 0 deletions service/app/handler_add_public_key_to_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func NewAddPublicKeyToMonitorHandler(
func (h *AddPublicKeyToMonitorHandler) Handle(ctx context.Context, cmd AddPublicKeyToMonitor) (err error) {
defer h.metrics.StartApplicationCall("addPublicKeyToMonitor").End(&err)

ctx, cancel := context.WithTimeout(ctx, applicationHandlerTimeout)
defer cancel()

publicKeyToMonitor, err := domain.NewPublicKeyToMonitor(
cmd.publicKey,
time.Now(),
Expand Down
Loading

0 comments on commit 28994a1

Please sign in to comment.