Skip to content

Commit

Permalink
Merge pull request cschleiden#240 from lovromazgon/monoprocess-backend
Browse files Browse the repository at this point in the history
Monoprocess backend
  • Loading branch information
cschleiden committed Sep 23, 2023
2 parents 07a9af1 + 90442ea commit 0ee3530
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 56 deletions.
32 changes: 32 additions & 0 deletions backend/monoprocess/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package monoprocess

import (
"context"
"errors"

"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/diag"
)

var _ diag.Backend = (*monoprocessBackend)(nil)

func (b *monoprocessBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
if diagBackend, ok := b.Backend.(diag.Backend); ok {
return diagBackend.GetWorkflowInstance(ctx, instance)
}
return nil, errors.New("not implemented")
}

func (b *monoprocessBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
if diagBackend, ok := b.Backend.(diag.Backend); ok {
return diagBackend.GetWorkflowInstances(ctx, afterInstanceID, afterExecutionID, count)
}
return nil, errors.New("not implemented")
}

func (b *monoprocessBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
if diagBackend, ok := b.Backend.(diag.Backend); ok {
return diagBackend.GetWorkflowTree(ctx, instance)
}
return nil, errors.New("not implemented")
}
156 changes: 156 additions & 0 deletions backend/monoprocess/monoprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package monoprocess

import (
"context"
"log/slog"
"reflect"
"time"

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/workflow"
)

type monoprocessBackend struct {
backend.Backend

workflowSignal chan struct{}
activitySignal chan struct{}
signalTimeout time.Duration

logger *slog.Logger
}

var _ backend.Backend = (*monoprocessBackend)(nil)

// NewMonoprocessBackend wraps an existing backend and improves its responsiveness
// in case the backend and worker are running in the same process. This backend
// uses channels to notify the worker every time there is a new task ready to be
// worked on. Note that only one worker will be notified.
// IMPORTANT: Only use this backend when the backend and worker are running in
// the same process.
func NewMonoprocessBackend(b backend.Backend) *monoprocessBackend {
mb := &monoprocessBackend{
Backend: b,
workflowSignal: make(chan struct{}, 1),
activitySignal: make(chan struct{}, 1),
logger: b.Logger(),
}
return mb
}

func (b *monoprocessBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) {
// loop until either we find a task or the context is cancelled
for {
if w, err := b.Backend.GetWorkflowTask(ctx); w != nil || err != nil {
return w, err
}
b.logger.DebugContext(ctx, "worker waiting for workflow task signal")
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-b.workflowSignal:
b.logger.DebugContext(ctx, "worker got a workflow task signal")
}
}
}

func (b *monoprocessBackend) GetActivityTask(ctx context.Context) (*backend.ActivityTask, error) {
// loop until either we find a task or the context is cancelled
for {
if a, err := b.Backend.GetActivityTask(ctx); a != nil || err != nil {
return a, err
}
b.logger.DebugContext(ctx, "worker waiting for activity task signal")
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-b.activitySignal:
b.logger.DebugContext(ctx, "worker got an activity task signal")
}
}
}

func (b *monoprocessBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
if err := b.Backend.CreateWorkflowInstance(ctx, instance, event); err != nil {
return err
}
b.notifyWorkflowWorker(ctx)
return nil
}

func (b *monoprocessBackend) CompleteWorkflowTask(
ctx context.Context,
task *backend.WorkflowTask,
instance *workflow.Instance,
state core.WorkflowInstanceState,
executedEvents, activityEvents, timerEvents []*history.Event,
workflowEvents []history.WorkflowEvent,
) error {
if err := b.Backend.CompleteWorkflowTask(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents); err != nil {
return err
}

if len(activityEvents) > 0 {
b.notifyActivityWorker(ctx)
}

for _, e := range timerEvents {
attr, ok := e.Attributes.(*history.TimerFiredAttributes)
if !ok {
b.logger.WarnContext(ctx, "unknown attributes type in timer event", "type", reflect.TypeOf(e.Attributes).String())
continue
}
b.logger.DebugContext(ctx, "scheduling timer to notify workflow worker")
// Note that the worker will be notified even if the timer event gets
// cancelled. This is ok, because the poller will simply find no task
// and continue.
time.AfterFunc(attr.At.Sub(time.Now()), func() { b.notifyWorkflowWorker(context.Background()) })
}

b.notifyWorkflowWorker(ctx)
return nil
}

func (b *monoprocessBackend) CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error {
if err := b.Backend.CompleteActivityTask(ctx, instance, activityID, event); err != nil {
return err
}
b.notifyWorkflowWorker(ctx)
return nil
}

func (b *monoprocessBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error {
if err := b.Backend.CancelWorkflowInstance(ctx, instance, cancelEvent); err != nil {
return err
}
b.notifyWorkflowWorker(ctx)
return nil
}

func (b *monoprocessBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error {
if err := b.Backend.SignalWorkflow(ctx, instanceID, event); err != nil {
return err
}
b.notifyWorkflowWorker(ctx)
return nil
}

func (b *monoprocessBackend) notifyActivityWorker(ctx context.Context) {
select {
case b.activitySignal <- struct{}{}:
b.logger.DebugContext(ctx, "signalled a new activity task to worker")
default:
// the signal channel already contains a signal, no need to add another
}
}

func (b *monoprocessBackend) notifyWorkflowWorker(ctx context.Context) {
select {
case b.workflowSignal <- struct{}{}:
b.logger.DebugContext(ctx, "signalled a new workflow task to worker")
default:
// the signal channel already contains a signal, no need to add another
}
}
47 changes: 47 additions & 0 deletions backend/monoprocess/monoprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package monoprocess

import (
"context"
"errors"
"testing"

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/backend/sqlite"
"github.com/cschleiden/go-workflows/backend/test"
)

func Test_MonoprocessBackend(t *testing.T) {
if testing.Short() {
t.Skip()
}

test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
// Disable sticky workflow behavior for the test execution
options = append(options, backend.WithStickyTimeout(0))

return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
}, nil)
}

func Test_EndToEndMonoprocessBackend(t *testing.T) {
if testing.Short() {
t.Skip()
}

test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
// Disable sticky workflow behavior for the test execution
options = append(options, backend.WithStickyTimeout(0))

return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
}, nil)
}

var _ test.TestBackend = (*monoprocessBackend)(nil)

func (b *monoprocessBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
if testBackend, ok := b.Backend.(test.TestBackend); ok {
return testBackend.GetFutureEvents(ctx)
}
return nil, errors.New("not implemented")
}
54 changes: 54 additions & 0 deletions backend/sqlite/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,64 @@ import (
"strings"
"time"

"github.com/cschleiden/go-workflows/backend/test"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
)

var _ test.TestBackend = (*sqliteBackend)(nil)

func (sb *sqliteBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
tx, err := sb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()

// There is no index on `visible_at`, but this is okay for test only usage.
futureEvents, err := tx.QueryContext(
ctx,
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
)
if err != nil {
return nil, fmt.Errorf("getting history: %w", err)
}

f := make([]*history.Event, 0)

for futureEvents.Next() {
var instanceID, executionID string
var attributes []byte

fe := &history.Event{}

if err := futureEvents.Scan(
&fe.ID,
&fe.SequenceID,
&instanceID,
&executionID,
&fe.Type,
&fe.Timestamp,
&fe.ScheduleEventID,
&attributes,
&fe.VisibleAt,
); err != nil {
return nil, fmt.Errorf("scanning event: %w", err)
}

a, err := history.DeserializeAttributes(fe.Type, attributes)
if err != nil {
return nil, fmt.Errorf("deserializing attributes: %w", err)
}

fe.Attributes = a

f = append(f, fe)
}

return f, nil
}

func getPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance) ([]*history.Event, error) {
now := time.Now()
events, err := tx.QueryContext(
Expand Down
56 changes: 0 additions & 56 deletions backend/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package sqlite

import (
"context"
"fmt"
"testing"

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/backend/test"
)

Expand Down Expand Up @@ -35,56 +32,3 @@ func Test_EndToEndSqliteBackend(t *testing.T) {
return NewInMemoryBackend(options...)
}, nil)
}

var _ test.TestBackend = (*sqliteBackend)(nil)

func (sb *sqliteBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
tx, err := sb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()

// There is no index on `visible_at`, but this is okay for test only usage.
futureEvents, err := tx.QueryContext(
ctx,
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
)
if err != nil {
return nil, fmt.Errorf("getting history: %w", err)
}

f := make([]*history.Event, 0)

for futureEvents.Next() {
var instanceID, executionID string
var attributes []byte

fe := &history.Event{}

if err := futureEvents.Scan(
&fe.ID,
&fe.SequenceID,
&instanceID,
&executionID,
&fe.Type,
&fe.Timestamp,
&fe.ScheduleEventID,
&attributes,
&fe.VisibleAt,
); err != nil {
return nil, fmt.Errorf("scanning event: %w", err)
}

a, err := history.DeserializeAttributes(fe.Type, attributes)
if err != nil {
return nil, fmt.Errorf("deserializing attributes: %w", err)
}

fe.Attributes = a

f = append(f, fe)
}

return f, nil
}

0 comments on commit 0ee3530

Please sign in to comment.