Skip to content

Commit

Permalink
Add sync pump (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma committed Jan 29, 2019
1 parent a43bc19 commit 38f5f0a
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 25 deletions.
9 changes: 5 additions & 4 deletions example/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ func main() {
}
ctx = stats.WithStats(ctx, client)

tasks := streams.Tasks{}
p, err := producerTask(ctx, []string{"127.0.0.1:9092"}, config)
if err != nil {
log.Fatal(err.Error())
}
tasks = append(tasks, p)

c, err := consumerTask(ctx, []string{"127.0.0.1:9092"}, config)
if err != nil {
log.Fatal(err.Error())
}
tasks = append(tasks, c)

p.Start()
c.Start()
defer c.Close()
defer p.Close()
tasks.Start()
defer tasks.Close()

<-clix.WaitForSignals()
}
Expand Down
66 changes: 57 additions & 9 deletions pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,56 @@ type Pump interface {
Close() error
}

// processorPump is an asynchronous Message Pump.
type processorPump struct {
// syncPump is an synchronous Message Pump.
type syncPump struct {
sync.Mutex

name string
processor Processor
pipe TimedPipe
}

// NewSyncPump creates a new synchronous Pump instance.
func NewSyncPump(node Node, pipe TimedPipe) Pump {
p := &syncPump{
name: node.Name(),
processor: node.Processor(),
pipe: pipe,
}

return p
}

// Accept takes a message to be processed in the Pump.
func (p *syncPump) Accept(msg *Message) error {
p.pipe.Reset()

start := nanotime()
err := p.processor.Process(msg)
if err != nil {
return err
}
latency := time.Duration(nanotime()-start) - p.pipe.Duration()

tags := []interface{}{"name", p.name}
withStats(msg.Ctx, func(s stats.Stats) {
s.Timing("node.latency", latency, 0.1, tags...)
s.Inc("node.throughput", 1, 0.1, tags...)
})

return nil
}

// Stop stops the pump, but does not close it.
func (p *syncPump) Stop() {}

// Close closes the pump.
func (p *syncPump) Close() error {
return p.processor.Close()
}

// asyncPump is an asynchronous Message Pump.
type asyncPump struct {
sync.Mutex

name string
Expand All @@ -34,9 +82,9 @@ type processorPump struct {
wg sync.WaitGroup
}

// NewPump creates a new processorPump instance.
func NewPump(node Node, pipe TimedPipe, errFn ErrorFunc) Pump {
p := &processorPump{
// NewAsyncPump creates a new asynchronous Pump instance.
func NewAsyncPump(node Node, pipe TimedPipe, errFn ErrorFunc) Pump {
p := &asyncPump{
name: node.Name(),
processor: node.Processor(),
pipe: pipe,
Expand All @@ -49,7 +97,7 @@ func NewPump(node Node, pipe TimedPipe, errFn ErrorFunc) Pump {
return p
}

func (p *processorPump) run() {
func (p *asyncPump) run() {
p.wg.Add(1)
defer p.wg.Done()

Expand Down Expand Up @@ -83,14 +131,14 @@ func (p *processorPump) run() {
}

// Accept takes a message to be processed in the Pump.
func (p *processorPump) Accept(msg *Message) error {
func (p *asyncPump) Accept(msg *Message) error {
p.ch <- msg

return nil
}

// Stop stops the pump, but does not close it.
func (p *processorPump) Stop() {
func (p *asyncPump) Stop() {
close(p.ch)

p.wg.Wait()
Expand All @@ -99,7 +147,7 @@ func (p *processorPump) Stop() {
// Close closes the pump.
//
// Stop must be called before closing the pump.
func (p *processorPump) Close() error {
func (p *asyncPump) Close() error {
return p.processor.Close()
}

Expand Down
77 changes: 69 additions & 8 deletions pump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestProcessorPump_Accept(t *testing.T) {
func TestSyncPump_Accept(t *testing.T) {
ctx := stats.WithStats(context.Background(), stats.Null)
msg := streams.NewMessageWithContext(ctx, "test", "test")
processor := new(MockProcessor)
Expand All @@ -21,7 +21,68 @@ func TestProcessorPump_Accept(t *testing.T) {
pipe := new(MockTimedPipe)
pipe.On("Reset")
pipe.On("Duration").Return(time.Duration(0))
p := streams.NewPump(node, pipe, func(error) {})
p := streams.NewSyncPump(node, pipe)
defer p.Close()

err := p.Accept(msg)

assert.NoError(t, err)
processor.AssertExpectations(t)
}

func TestSyncPump_AcceptError(t *testing.T) {
msg := streams.NewMessage("test", "test")
processor := new(MockProcessor)
processor.On("Process", msg).Return(errors.New("test"))
processor.On("Close").Return(nil)
node := streams.NewProcessorNode("test", processor)
pipe := new(MockTimedPipe)
pipe.On("Reset")
pipe.On("Duration").Return(time.Duration(0))
p := streams.NewSyncPump(node, pipe)
defer p.Close()

err := p.Accept(msg)

assert.Error(t, err)
}

func TestSyncPump_Close(t *testing.T) {
processor := new(MockProcessor)
processor.On("Close").Return(nil)
node := streams.NewProcessorNode("test", processor)
pipe := new(MockTimedPipe)
p := streams.NewSyncPump(node, pipe)

err := p.Close()

assert.NoError(t, err)
processor.AssertExpectations(t)
}

func TestSyncPump_CloseError(t *testing.T) {
processor := new(MockProcessor)
processor.On("Close").Return(errors.New("test"))
node := streams.NewProcessorNode("test", processor)
pipe := new(MockTimedPipe)
p := streams.NewSyncPump(node, pipe)

err := p.Close()

assert.Error(t, err)
}

func TestAsyncPump_Accept(t *testing.T) {
ctx := stats.WithStats(context.Background(), stats.Null)
msg := streams.NewMessageWithContext(ctx, "test", "test")
processor := new(MockProcessor)
processor.On("Process", msg).Return(nil)
processor.On("Close").Maybe().Return(nil)
node := streams.NewProcessorNode("test", processor)
pipe := new(MockTimedPipe)
pipe.On("Reset")
pipe.On("Duration").Return(time.Duration(0))
p := streams.NewAsyncPump(node, pipe, func(error) {})
defer p.Close()

err := p.Accept(msg)
Expand All @@ -32,7 +93,7 @@ func TestProcessorPump_Accept(t *testing.T) {
processor.AssertExpectations(t)
}

func TestProcessorPump_AcceptError(t *testing.T) {
func TestAsyncPump_AcceptError(t *testing.T) {
var err error

msg := streams.NewMessage("test", "test")
Expand All @@ -43,7 +104,7 @@ func TestProcessorPump_AcceptError(t *testing.T) {
pipe := new(MockTimedPipe)
pipe.On("Reset")
pipe.On("Duration").Return(time.Duration(0))
p := streams.NewPump(node, pipe, func(e error) {
p := streams.NewAsyncPump(node, pipe, func(e error) {
err = e
})
defer p.Close()
Expand All @@ -55,25 +116,25 @@ func TestProcessorPump_AcceptError(t *testing.T) {
assert.Error(t, err)
}

func TestProcessorPump_Close(t *testing.T) {
func TestAsyncPump_Close(t *testing.T) {
processor := new(MockProcessor)
processor.On("Close").Return(nil)
node := streams.NewProcessorNode("test", processor)
pipe := new(MockTimedPipe)
p := streams.NewPump(node, pipe, func(error) {})
p := streams.NewAsyncPump(node, pipe, func(error) {})

err := p.Close()

assert.NoError(t, err)
processor.AssertExpectations(t)
}

func TestProcessorPump_CloseError(t *testing.T) {
func TestAsyncPump_CloseError(t *testing.T) {
processor := new(MockProcessor)
processor.On("Close").Return(errors.New("test"))
node := streams.NewProcessorNode("test", processor)
pipe := new(MockTimedPipe)
p := streams.NewPump(node, pipe, func(error) {})
p := streams.NewAsyncPump(node, pipe, func(error) {})

err := p.Close()

Expand Down
30 changes: 28 additions & 2 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ import (
"time"
)

// TaskMode represents the task mode.
type TaskMode int8

// TaskMode types.
const (
Async TaskMode = iota
Sync
)

// ErrorFunc represents a streams error handling function.
type ErrorFunc func(error)

Expand All @@ -18,13 +27,20 @@ func WithCommitInterval(d time.Duration) TaskOptFunc {
}
}

// WithMetadataStrategy defines an strategy of metadata mergers.
// WithMetadataStrategy defines a strategy of metadata mergers.
func WithMetadataStrategy(strategy MetadataStrategy) TaskOptFunc {
return func(t *streamTask) {
t.supervisorOpts.Strategy = strategy
}
}

// WithMode defines the task mode to run in.
func WithMode(m TaskMode) TaskOptFunc {
return func(t *streamTask) {
t.mode = m
}
}

// Task represents a streams task.
type Task interface {
// Start starts the streams processors.
Expand All @@ -44,6 +60,7 @@ type streamTask struct {
topology *Topology

running bool
mode TaskMode
errorFn ErrorFunc

store Metastore
Expand All @@ -59,6 +76,7 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task {

t := &streamTask{
topology: topology,
mode: Async,
store: store,
errorFn: func(_ error) {},
supervisorOpts: supervisorOpts{
Expand Down Expand Up @@ -101,7 +119,7 @@ func (t *streamTask) setupTopology() {
pipe := NewPipe(t.store, t.supervisor, node.Processor(), t.resolvePumps(node.Children()))
node.Processor().WithPipe(pipe)

pump := NewPump(node, pipe.(TimedPipe), t.handleError)
pump := t.newPump(node, pipe.(TimedPipe), t.handleError)
t.pumps[node] = pump
}

Expand All @@ -113,6 +131,14 @@ func (t *streamTask) setupTopology() {
}
}

func (t *streamTask) newPump(node Node, pipe TimedPipe, errFn ErrorFunc) Pump {
if t.mode == Sync {
return NewSyncPump(node, pipe)
}

return NewAsyncPump(node, pipe, errFn)
}

func (t *streamTask) resolvePumps(nodes []Node) []Pump {
var pumps []Pump
for _, node := range nodes {
Expand Down
38 changes: 36 additions & 2 deletions task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestNewTask(t *testing.T) {
assert.Implements(t, (*streams.Task)(nil), task)
}

func TestStreamTask_ConsumesMessages(t *testing.T) {
func TestStreamTask_ConsumesAsyncMessages(t *testing.T) {
msgs := make(chan *streams.Message)
msg := streams.NewMessage("test", "test")

Expand All @@ -35,7 +35,41 @@ func TestStreamTask_ConsumesMessages(t *testing.T) {
Process("processor", p)

tp, _ := b.Build()
task := streams.NewTask(tp)
task := streams.NewTask(tp, streams.WithMode(streams.Async))
task.OnError(func(err error) {
t.FailNow()
})

err := task.Start()
if err != nil {
assert.FailNow(t, err.Error())
}

msgs <- msg

time.Sleep(time.Millisecond)

_ = task.Close()

p.AssertExpectations(t)
}

func TestStreamTask_ConsumesSyncMessages(t *testing.T) {
msgs := make(chan *streams.Message)
msg := streams.NewMessage("test", "test")

p := new(MockProcessor)
p.On("WithPipe", mock.Anything).Return(nil)
p.On("Process", msg).Return(nil)
p.On("Close").Return(nil)

b := streams.NewStreamBuilder()
b.Source("src", &chanSource{msgs: msgs}).
Map("pass-through", streams.MapperFunc(passThroughMapper)).
Process("processor", p)

tp, _ := b.Build()
task := streams.NewTask(tp, streams.WithMode(streams.Sync))
task.OnError(func(err error) {
t.FailNow()
})
Expand Down

0 comments on commit 38f5f0a

Please sign in to comment.