Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Heka Issue #287 Remove the match capture
Browse files Browse the repository at this point in the history
  • Loading branch information
trink committed Jun 26, 2013
1 parent a2a167c commit 82bba0e
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 32 deletions.
4 changes: 1 addition & 3 deletions filters.go
Expand Up @@ -63,9 +63,7 @@ func (hsf *HekaStatsFilter) Run(fr pipeline.FilterRunner, h pipeline.PluginHelpe
return
}

for plc := range fr.InChan() {
pack = plc.Pack

for pack = range fr.InChan() {
ns = pack.Message.GetLogger()

if tmp, ok = pack.Message.GetFieldValue("name"); !ok {
Expand Down
3 changes: 1 addition & 2 deletions outputs.go
Expand Up @@ -93,8 +93,7 @@ func (cef *CefOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er
pack *pipeline.PipelinePack
)
syslogMsg := new(SyslogMsg)
for plc := range or.InChan() {
pack = plc.Pack
for pack = range or.InChan() {

// default values
facility, priority = syslog.LOG_LOCAL4, syslog.LOG_INFO
Expand Down
3 changes: 1 addition & 2 deletions sentry.go
Expand Up @@ -112,8 +112,7 @@ func (so *SentryOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (
dataPacket: make([]byte, 0, so.config.MaxSentryBytes),
}

for plc := range or.InChan() {
pack = plc.Pack
for pack = range or.InChan() {
e = so.prepSentryMsg(pack, sentryMsg)
pack.Recycle()
if e != nil {
Expand Down
3 changes: 1 addition & 2 deletions statsd.go
Expand Up @@ -114,8 +114,7 @@ func (so *StatsdOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (
)
statsdMsg := new(StatsdMsg)

for plc := range or.InChan() {
pack = plc.Pack
for pack = range or.InChan() {
e = so.prepStatsdMsg(pack, statsdMsg)
pack.Recycle()
if e != nil {
Expand Down
18 changes: 8 additions & 10 deletions statsd_test.go
Expand Up @@ -25,9 +25,9 @@ import (
"sync"
)

func getStatsdPlc(typeStr string, payload string) (plc *pipeline.PipelineCapture) {
func getStatsdPack(typeStr string, payload string) (pack *pipeline.PipelinePack) {
recycleChan := make(chan *pipeline.PipelinePack, 1)
pack := pipeline.NewPipelinePack(recycleChan)
pack = pipeline.NewPipelinePack(recycleChan)
pack.Message.SetType(typeStr)
pack.Message.SetLogger("thenamespace")
fName, _ := message.NewField("name", "myname", "")
Expand All @@ -36,7 +36,7 @@ func getStatsdPlc(typeStr string, payload string) (plc *pipeline.PipelineCapture
pack.Message.AddField(fRate)
pack.Message.SetPayload(payload)
pack.Decoded = true
return &pipeline.PipelineCapture{Pack: pack}
return pack
}

func StatsdOutputSpec(c gs.Context) {
Expand Down Expand Up @@ -64,22 +64,21 @@ func StatsdOutputSpec(c gs.Context) {
output.statsdClient = mockStatsdClient

oth := ts.NewOutputTestHelper(ctrl)
inChan := make(chan *pipeline.PipelineCapture, 1)
inChan := make(chan *pipeline.PipelinePack, 1)
oth.MockOutputRunner.EXPECT().InChan().Return(inChan)

var wg sync.WaitGroup

c.Specify("a decr msg", func() {
plc := getStatsdPlc("counter", "-1")
pack := plc.Pack
pack := getStatsdPack("counter", "-1")
msg := new(StatsdMsg)
err := output.prepStatsdMsg(pack, msg)
c.Expect(err, gs.IsNil)
c.Expect(*msg, gs.Equals, *decrMsg)

mockStatsdClient.EXPECT().IncrementSampledCounter("thenamespace.myname",
-1, float32(.30))
inChan <- plc
inChan <- pack
close(inChan)
wg.Add(1)

Expand All @@ -92,16 +91,15 @@ func StatsdOutputSpec(c gs.Context) {
})

c.Specify("a timer msg", func() {
plc := getStatsdPlc("timer", "123")
pack := plc.Pack
pack := getStatsdPack("timer", "123")
msg := new(StatsdMsg)
err := output.prepStatsdMsg(pack, msg)
c.Expect(err, gs.IsNil)
c.Expect(*msg, gs.Equals, *timerMsg)

mockStatsdClient.EXPECT().SendSampledTiming("thenamespace.myname",
123, float32(.30))
inChan <- plc
inChan <- pack
close(inChan)
wg.Add(1)

Expand Down
16 changes: 4 additions & 12 deletions testsupport/mock_pipeline_outputrunner.go
Expand Up @@ -4,9 +4,9 @@
package testsupport

import (
sync "sync"
gomock "code.google.com/p/gomock/gomock"
pipeline "github.com/mozilla-services/heka/pipeline"
sync "sync"
time "time"
)

Expand All @@ -31,17 +31,9 @@ func (_m *MockOutputRunner) EXPECT() *_MockOutputRunnerRecorder {
return _m.recorder
}

func (_m *MockOutputRunner) Deliver(_param0 *pipeline.PipelinePack) {
_m.ctrl.Call(_m, "Deliver", _param0)
}

func (_mr *_MockOutputRunnerRecorder) Deliver(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Deliver", arg0)
}

func (_m *MockOutputRunner) InChan() chan *pipeline.PipelineCapture {
func (_m *MockOutputRunner) InChan() chan *pipeline.PipelinePack {
ret := _m.ctrl.Call(_m, "InChan")
ret0, _ := ret[0].(chan *pipeline.PipelineCapture)
ret0, _ := ret[0].(chan *pipeline.PipelinePack)
return ret0
}

Expand Down Expand Up @@ -115,7 +107,7 @@ func (_mr *_MockOutputRunnerRecorder) PluginGlobals() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "PluginGlobals")
}

func (_m *MockOutputRunner) RetainPack(_param0 *pipeline.PipelineCapture) {
func (_m *MockOutputRunner) RetainPack(_param0 *pipeline.PipelinePack) {
_m.ctrl.Call(_m, "RetainPack", _param0)
}

Expand Down
2 changes: 1 addition & 1 deletion testsupport/mock_pipeline_pluginhelper.go
Expand Up @@ -4,8 +4,8 @@
package testsupport

import (
pipeline "github.com/mozilla-services/heka/pipeline"
gomock "code.google.com/p/gomock/gomock"
pipeline "github.com/mozilla-services/heka/pipeline"
)

// Mock of PluginHelper interface
Expand Down

0 comments on commit 82bba0e

Please sign in to comment.