Skip to content

Commit

Permalink
💄 Rename types -> utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Aug 11, 2016
1 parent fdcb8cc commit 989a419
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 85 deletions.
10 changes: 5 additions & 5 deletions components/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
"github.com/redBorder/rbforwarder/utils"
)

// Batch groups multiple messages
type Batch struct {
Group string
Message *types.Message
Message *utils.Message
Buff *bytes.Buffer
MessageCount uint // Current number of messages in the buffer
Next types.Next // Call to pass the message to the next handler
Next utils.Next // Call to pass the message to the next handler
}

// NewBatch creates a new instance of Batch
func NewBatch(m *types.Message, group string, next types.Next, clk clock.Clock,
func NewBatch(m *utils.Message, group string, next utils.Next, clk clock.Clock,
timeoutMillis uint, ready chan *Batch) *Batch {
payload, _ := m.PopPayload()
b := &Batch{
Expand Down Expand Up @@ -51,7 +51,7 @@ func (b *Batch) Send(cb func()) {
}

// Add merges a new message in the buffer
func (b *Batch) Add(m *types.Message) {
func (b *Batch) Add(m *utils.Message) {
newReport := m.Reports.Pop()
b.Message.Reports.Push(newReport)

Expand Down
4 changes: 2 additions & 2 deletions components/batch/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package batcher

import (
"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
"github.com/redBorder/rbforwarder/utils"
)

// Batcher allows to merge multiple messages in a single one
Expand Down Expand Up @@ -35,7 +35,7 @@ func (b *Batcher) Init(id int) {

// OnMessage is called when a new message is receive. Add the new message to
// a batch
func (b *Batcher) OnMessage(m *types.Message, next types.Next, done types.Done) {
func (b *Batcher) OnMessage(m *utils.Message, next utils.Next, done utils.Done) {
if group, exists := m.Opts["batch_group"].(string); exists {
if batch, exists := b.batches[group]; exists {
batch.Add(m)
Expand Down
40 changes: 20 additions & 20 deletions components/batch/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
"github.com/redBorder/rbforwarder/utils"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
)

type NexterDoner struct {
mock.Mock
nextCalled chan *types.Message
nextCalled chan *utils.Message
}

func (nd *NexterDoner) Next(m *types.Message) {
func (nd *NexterDoner) Next(m *utils.Message) {
nd.Called(m)
nd.nextCalled <- m
}
Expand All @@ -34,12 +34,12 @@ func TestBatcher(t *testing.T) {
batcher.clk = clock.NewMock()

Convey("When a message is received with no batch group", func() {
m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 1)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)
nd.nextCalled = make(chan *utils.Message, 1)
nd.On("Next", mock.AnythingOfType("*utils.Message")).Times(1)

batcher.OnMessage(m, nd.Next, nil)

Expand All @@ -54,7 +54,7 @@ func TestBatcher(t *testing.T) {
})

Convey("When a message is received, but not yet sent", func() {
m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))
m.Opts = map[string]interface{}{
"batch_group": "group1",
Expand All @@ -81,10 +81,10 @@ func TestBatcher(t *testing.T) {
})

Convey("When the max number of messages is reached", func() {
var messages []*types.Message
var messages []*utils.Message

for i := 0; i < int(batcher.config.Limit); i++ {
m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("ABC"))
m.Opts = map[string]interface{}{
"batch_group": "group1",
Expand All @@ -95,8 +95,8 @@ func TestBatcher(t *testing.T) {
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)
nd.nextCalled = make(chan *utils.Message)
nd.On("Next", mock.AnythingOfType("*utils.Message")).Times(1)

for i := 0; i < int(batcher.config.Limit); i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
Expand All @@ -116,10 +116,10 @@ func TestBatcher(t *testing.T) {
})

Convey("When the timeout expires", func() {
var messages []*types.Message
var messages []*utils.Message

for i := 0; i < 5; i++ {
m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))
m.Opts = map[string]interface{}{
"batch_group": "group1",
Expand All @@ -130,8 +130,8 @@ func TestBatcher(t *testing.T) {
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 1)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)
nd.nextCalled = make(chan *utils.Message, 1)
nd.On("Next", mock.AnythingOfType("*utils.Message")).Times(1)

for i := 0; i < 5; i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
Expand All @@ -151,25 +151,25 @@ func TestBatcher(t *testing.T) {
})

Convey("When multiple messages are received with differente groups", func() {
m1 := types.NewMessage()
m1 := utils.NewMessage()
m1.PushPayload([]byte("MESSAGE 1"))
m1.Opts = map[string]interface{}{
"batch_group": "group1",
}
m2 := types.NewMessage()
m2 := utils.NewMessage()
m2.PushPayload([]byte("MESSAGE 2"))
m2.Opts = map[string]interface{}{
"batch_group": "group2",
}
m3 := types.NewMessage()
m3 := utils.NewMessage()
m3.PushPayload([]byte("MESSAGE 3"))
m3.Opts = map[string]interface{}{
"batch_group": "group2",
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 2)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(2)
nd.nextCalled = make(chan *utils.Message, 2)
nd.On("Next", mock.AnythingOfType("*utils.Message")).Times(2)

batcher.OnMessage(m1, nd.Next, nil)
batcher.OnMessage(m2, nd.Next, nil)
Expand Down
4 changes: 2 additions & 2 deletions components/httpsender/httpSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"net/http"

"github.com/asaskevich/govalidator"
"github.com/redBorder/rbforwarder/types"
"github.com/redBorder/rbforwarder/utils"
)

// HTTPSender is a component for the rbforwarder pipeline that sends messages
Expand All @@ -33,7 +33,7 @@ func (s *HTTPSender) Init(id int) {
}

// OnMessage is called when a new message should be sent via HTTP
func (s *HTTPSender) OnMessage(m *types.Message, next types.Next, done types.Done) {
func (s *HTTPSender) OnMessage(m *utils.Message, next utils.Next, done utils.Done) {
var u string

if s.err != nil {
Expand Down
28 changes: 14 additions & 14 deletions components/httpsender/httpSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/url"
"testing"

"github.com/redBorder/rbforwarder/types"
"github.com/redBorder/rbforwarder/utils"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
)
Expand All @@ -19,7 +19,7 @@ type Doner struct {
}
}

func (d *Doner) Done(m *types.Message, code int, status string) {
func (d *Doner) Done(m *utils.Message, code int, status string) {
d.Called(m, code, status)
d.doneCalled <- struct {
code int
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestHTTPSender(t *testing.T) {
var url string
sender.Init(0)

m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))
sender.client = NewTestClient(401, func(req *http.Request) {
url = req.URL.String()
Expand All @@ -77,7 +77,7 @@ func TestHTTPSender(t *testing.T) {
}, 1),
}

d.On("Done", mock.AnythingOfType("*types.Message"),
d.On("Done", mock.AnythingOfType("*utils.Message"),
mock.AnythingOfType("int"), mock.AnythingOfType("string"))

sender.OnMessage(m, nil, d.Done)
Expand All @@ -96,7 +96,7 @@ func TestHTTPSender(t *testing.T) {
var url string
sender.Init(0)

m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))

sender.client = NewTestClient(200, func(req *http.Request) {
Expand All @@ -109,7 +109,7 @@ func TestHTTPSender(t *testing.T) {
status string
}, 1),
}
d.On("Done", mock.AnythingOfType("*types.Message"),
d.On("Done", mock.AnythingOfType("*utils.Message"),
mock.AnythingOfType("int"), mock.AnythingOfType("string"))

sender.OnMessage(m, nil, d.Done)
Expand All @@ -128,7 +128,7 @@ func TestHTTPSender(t *testing.T) {
var url string
sender.Init(0)

m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))
m.Opts["http_endpoint"] = "endpoint1"

Expand All @@ -142,7 +142,7 @@ func TestHTTPSender(t *testing.T) {
status string
}, 1),
}
d.On("Done", mock.AnythingOfType("*types.Message"),
d.On("Done", mock.AnythingOfType("*utils.Message"),
mock.AnythingOfType("int"), mock.AnythingOfType("string"))

sender.OnMessage(m, nil, d.Done)
Expand All @@ -161,7 +161,7 @@ func TestHTTPSender(t *testing.T) {
var url string
sender.Init(0)

m := types.NewMessage()
m := utils.NewMessage()

sender.client = NewTestClient(200, func(req *http.Request) {
url = req.URL.String()
Expand All @@ -173,7 +173,7 @@ func TestHTTPSender(t *testing.T) {
status string
}, 1),
}
d.On("Done", mock.AnythingOfType("*types.Message"),
d.On("Done", mock.AnythingOfType("*utils.Message"),
mock.AnythingOfType("int"), mock.AnythingOfType("string"))

sender.OnMessage(m, nil, d.Done)
Expand All @@ -191,7 +191,7 @@ func TestHTTPSender(t *testing.T) {
Convey("When a the HTTP client fails", func() {
sender.Init(0)

m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))

sender.client = NewTestClient(200, func(req *http.Request) {
Expand All @@ -204,7 +204,7 @@ func TestHTTPSender(t *testing.T) {
status string
}, 1),
}
d.On("Done", mock.AnythingOfType("*types.Message"),
d.On("Done", mock.AnythingOfType("*utils.Message"),
mock.AnythingOfType("int"), mock.AnythingOfType("string"))

sender.OnMessage(m, nil, d.Done)
Expand All @@ -224,7 +224,7 @@ func TestHTTPSender(t *testing.T) {
sender.Init(0)

Convey("When try to send messages", func() {
m := types.NewMessage()
m := utils.NewMessage()
m.PushPayload([]byte("Hello World"))
m.Opts["http_endpoint"] = "endpoint1"

Expand All @@ -235,7 +235,7 @@ func TestHTTPSender(t *testing.T) {
}, 1),
}

d.On("Done", mock.AnythingOfType("*types.Message"),
d.On("Done", mock.AnythingOfType("*utils.Message"),
mock.AnythingOfType("int"), mock.AnythingOfType("string"))

sender.OnMessage(m, nil, d.Done)
Expand Down
22 changes: 11 additions & 11 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ import (
"sync"

"github.com/oleiade/lane"
"github.com/redBorder/rbforwarder/types"
"github.com/redBorder/rbforwarder/utils"
)

type component struct {
pool chan chan *types.Message
pool chan chan *utils.Message
workers int
}

// pipeline contains the components
type pipeline struct {
components []component
input chan *types.Message
retry chan *types.Message
output chan *types.Message
input chan *utils.Message
retry chan *utils.Message
output chan *utils.Message
}

// newPipeline creates a new Backend
func newPipeline(input, retry, output chan *types.Message) *pipeline {
func newPipeline(input, retry, output chan *utils.Message) *pipeline {
var wg sync.WaitGroup
p := &pipeline{
input: input,
Expand Down Expand Up @@ -62,11 +62,11 @@ func newPipeline(input, retry, output chan *types.Message) *pipeline {
}

// PushComponent adds a new component to the pipeline
func (p *pipeline) PushComponent(composser types.Composer, w int) {
func (p *pipeline) PushComponent(composser utils.Composer, w int) {
var wg sync.WaitGroup
c := component{
workers: w,
pool: make(chan chan *types.Message, w),
pool: make(chan chan *utils.Message, w),
}

index := len(p.components)
Expand All @@ -75,19 +75,19 @@ func (p *pipeline) PushComponent(composser types.Composer, w int) {
for i := 0; i < w; i++ {
composser.Init(i)

worker := make(chan *types.Message)
worker := make(chan *utils.Message)
p.components[index].pool <- worker

wg.Add(1)
go func(i int) {
wg.Done()
for m := range worker {
composser.OnMessage(m, func(m *types.Message) {
composser.OnMessage(m, func(m *utils.Message) {
if len(p.components) >= index {
nextWorker := <-p.components[index+1].pool
nextWorker <- m
}
}, func(m *types.Message, code int, status string) {
}, func(m *utils.Message, code int, status string) {
reports := lane.NewStack()
for !m.Reports.Empty() {
rep := m.Reports.Pop().(report)
Expand Down
Loading

0 comments on commit 989a419

Please sign in to comment.