Skip to content
This repository

Features/no message holder #93

Merged
merged 3 commits into from about 1 year ago

2 participants

Rob Miller Mike Trinkala
Rob Miller
Collaborator

Been talking about doing this for a while, I was digging into the MGI anyway, just made sense. I think it's a big improvement.

added some commits April 08, 2013
Rob Miller Use PipelinePacks inside the MGI infrastructure rather than having a …
…separate

messageHolder struct that serves essentially the same purpose.
5606511
Rob Miller Merge branch 'dev' into features/no-messageHolder
Conflicts:
	pipeline/pipeline_runner.go
	pipeline/sandbox_filter.go
42b0a6b
Mike Trinkala trink commented on the diff April 09, 2013
pipeline/report.go
@@ -67,33 +67,33 @@ func PopulateReportMsg(pr PluginRunner, msg *message.Message) (err error) {
67 67
 }
68 68
 
69 69
 // Generate and return recycle channel and plugin report messages.
70  
-func (pc *PipelineConfig) reports() (reports map[string]*messageHolder) {
71  
-	reports = make(map[string]*messageHolder)
  70
+func (pc *PipelineConfig) reports() (reports map[string]*PipelinePack) {
  71
+	reports = make(map[string]*PipelinePack)
2
Mike Trinkala Collaborator
trink added a note April 09, 2013

Since the packs are stored in a map until the report is completed the report generator can exhaust the pipeline pack supply if the number of Plugins exceed the config.PoolSize (hanging the system). With the ability to dynamically add plugins this is now more of an issue.

Rob Miller Collaborator
rafrombrc added a note April 09, 2013

This is true, but the problem already exists on dev, so it's not a blocker for this merge. Issue #97 opened re: the PipelinePack exhaustion issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Rob Miller rafrombrc merged commit 3a75660 into from April 09, 2013
Rob Miller rafrombrc closed this April 09, 2013
Rob Miller rafrombrc deleted the branch April 09, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 3 unique commits by 1 author.

Apr 08, 2013
Rob Miller Use PipelinePacks inside the MGI infrastructure rather than having a …
…separate

messageHolder struct that serves essentially the same purpose.
5606511
Rob Miller Merge branch 'dev' into features/no-messageHolder
Conflicts:
	pipeline/pipeline_runner.go
	pipeline/sandbox_filter.go
42b0a6b
Apr 09, 2013
Rob Miller Merge branch 'dev' into features/no-messageHolder 64460cd
This page is out of date. Refresh to see the latest.
14  pipeline/decoders.go
@@ -67,7 +67,7 @@ func (dm *decoderManager) makeDecoder(name string) (dRunner DecoderRunner, ok bo
67 67
 		decoder := wrapper.Create().(Decoder)
68 68
 		dRunner = NewDecoderRunner(name, decoder, dm)
69 69
 		dm.wg.Add(1)
70  
-		dRunner.Start(dm.wg)
  70
+		dRunner.Start(dm.config, dm.wg)
71 71
 	}
72 72
 	return
73 73
 }
@@ -87,7 +87,7 @@ func (dm *decoderManager) fromStopped(name string) (dRunner DecoderRunner, ok bo
87 87
 	dm.lock.Unlock()
88 88
 	if ok {
89 89
 		dm.wg.Add(1)
90  
-		dRunner.Start(dm.wg)
  90
+		dRunner.Start(dm.config, dm.wg)
91 91
 	}
92 92
 	return
93 93
 }
@@ -146,7 +146,7 @@ func (dm *decoderManager) NewDecoders() (decoders map[string]DecoderRunner) {
146 146
 			decoder = wrapper.Create().(Decoder)
147 147
 			runner = NewDecoderRunner(name, decoder, dm)
148 148
 			dm.wg.Add(1)
149  
-			runner.Start(dm.wg)
  149
+			runner.Start(dm.config, dm.wg)
150 150
 		}
151 151
 		decoders[name] = runner
152 152
 		dSlice = append(dSlice, runner)
@@ -178,7 +178,7 @@ func (dm *decoderManager) RunningDecoders() (decoders map[string]DecoderRunner)
178 178
 type DecoderRunner interface {
179 179
 	PluginRunner
180 180
 	Decoder() Decoder
181  
-	Start(wg *sync.WaitGroup)
  181
+	Start(h PluginHelper, wg *sync.WaitGroup)
182 182
 	InChan() chan *PipelinePack
183 183
 	UUID() string
184 184
 	OrigName() string
@@ -205,7 +205,7 @@ func (dr *dRunner) Decoder() Decoder {
205 205
 	return dr.plugin.(Decoder)
206 206
 }
207 207
 
208  
-func (dr *dRunner) Start(wg *sync.WaitGroup) {
  208
+func (dr *dRunner) Start(h PluginHelper, wg *sync.WaitGroup) {
209 209
 	dr.inChan = make(chan *PipelinePack, PIPECHAN_BUFSIZE)
210 210
 	go func() {
211 211
 		var pack *PipelinePack
@@ -219,7 +219,7 @@ func (dr *dRunner) Start(wg *sync.WaitGroup) {
219 219
 				if Stopping {
220 220
 					wg.Done()
221 221
 				} else {
222  
-					dr.Start(wg)
  222
+					dr.Start(h, wg)
223 223
 				}
224 224
 			}
225 225
 		}()
@@ -232,7 +232,7 @@ func (dr *dRunner) Start(wg *sync.WaitGroup) {
232 232
 				continue
233 233
 			}
234 234
 			pack.Decoded = true
235  
-			pack.Config.Router().InChan <- pack
  235
+			h.Router().InChan <- pack
236 236
 		}
237 237
 		dr.mgr.unregDecoder(dr.uuid)
238 238
 		dr.LogMessage("stopped")
12  pipeline/decoders_test.go
@@ -152,7 +152,7 @@ func DecodersSpec(c gospec.Context) {
152 152
 			timestampJson, *msg.Logger, *msg.Severity, *msg.Payload,
153 153
 			fieldsJson, *msg.EnvVersion, *msg.Pid, *msg.Hostname)
154 154
 
155  
-		pipelinePack := NewPipelinePack(config)
  155
+		pipelinePack := NewPipelinePack(config.RecycleChan)
156 156
 		pipelinePack.MsgBytes = []byte(jsonString)
157 157
 		jsonDecoder := new(JsonDecoder)
158 158
 
@@ -227,7 +227,7 @@ func DecodersSpec(c gospec.Context) {
227 227
 		msg := getTestMessage()
228 228
 		encoded, err := proto.Marshal(msg)
229 229
 		c.Assume(err, gs.IsNil)
230  
-		pack := NewPipelinePack(config)
  230
+		pack := NewPipelinePack(config.RecycleChan)
231 231
 		decoder := new(ProtobufDecoder)
232 232
 
233 233
 		c.Specify("decodes a msgpack message", func() {
@@ -251,11 +251,11 @@ func DecodersSpec(c gospec.Context) {
251 251
 	c.Specify("Recovers from a panic in `Decode()`", func() {
252 252
 		decoder := new(PanicDecoder)
253 253
 		dRunner := NewDecoderRunner("panic", decoder, nil)
254  
-		pack := NewPipelinePack(config)
  254
+		pack := NewPipelinePack(config.RecycleChan)
255 255
 		var wg sync.WaitGroup
256 256
 		wg.Add(1)
257 257
 		Stopping = true
258  
-		dRunner.Start(&wg)
  258
+		dRunner.Start(config, &wg)
259 259
 		dRunner.InChan() <- pack // No panic ==> success
260 260
 		wg.Wait()
261 261
 		Stopping = false
@@ -274,7 +274,7 @@ func BenchmarkDecodeJSON(b *testing.B) {
274 274
 		fieldsJson, *msg.EnvVersion, *msg.Pid, *msg.Hostname)
275 275
 
276 276
 	config := NewPipelineConfig(1)
277  
-	pipelinePack := NewPipelinePack(config)
  277
+	pipelinePack := NewPipelinePack(config.RecycleChan)
278 278
 	pipelinePack.MsgBytes = []byte(jsonString)
279 279
 	jsonDecoder := new(JsonDecoder)
280 280
 	b.StartTimer()
@@ -296,7 +296,7 @@ func BenchmarkDecodeProtobuf(b *testing.B) {
296 296
 	msg := getTestMessage()
297 297
 	encoded, _ := proto.Marshal(msg)
298 298
 	config := NewPipelineConfig(1)
299  
-	pack := NewPipelinePack(config)
  299
+	pack := NewPipelinePack(config.RecycleChan)
300 300
 	decoder := new(ProtobufDecoder)
301 301
 	pack.MsgBytes = encoded
302 302
 	b.StartTimer()
16  pipeline/filters.go
@@ -85,11 +85,11 @@ func (this *CounterFilter) tally() {
85 85
 	this.rate = float64(msgsSent) / elapsedTime.Seconds()
86 86
 	this.rates = append(this.rates, this.rate)
87 87
 
88  
-	outMsg := MessageGenerator.Retrieve()
89  
-	outMsg.Message.SetType("heka.counter-output")
90  
-	outMsg.Message.SetPayload(fmt.Sprintf("Got %d messages. %0.2f msg/sec",
  88
+	pack := MessageGenerator.Retrieve()
  89
+	pack.Message.SetType("heka.counter-output")
  90
+	pack.Message.SetPayload(fmt.Sprintf("Got %d messages. %0.2f msg/sec",
91 91
 		this.count, this.rate))
92  
-	MessageGenerator.Inject(outMsg)
  92
+	MessageGenerator.Inject(pack)
93 93
 
94 94
 	samples := len(this.rates)
95 95
 	if samples == 10 { // generate a summary every 10 samples
@@ -101,12 +101,12 @@ func (this *CounterFilter) tally() {
101 101
 			sum += val
102 102
 		}
103 103
 		mean := sum / float64(samples)
104  
-		outMsg = MessageGenerator.Retrieve()
105  
-		outMsg.Message.SetType("heka.counter-output")
106  
-		outMsg.Message.SetPayload(
  104
+		pack = MessageGenerator.Retrieve()
  105
+		pack.Message.SetType("heka.counter-output")
  106
+		pack.Message.SetPayload(
107 107
 			fmt.Sprintf("AGG Sum. Min: %0.2f    Max: %0.2f    Mean: %0.2f",
108 108
 				min, max, mean))
109  
-		MessageGenerator.Inject(outMsg)
  109
+		MessageGenerator.Inject(pack)
110 110
 		this.rates = this.rates[:0]
111 111
 	}
112 112
 }
60  pipeline/inputs.go
@@ -30,7 +30,6 @@ import (
30 30
 	"strconv"
31 31
 	"strings"
32 32
 	"sync"
33  
-	"sync/atomic"
34 33
 	"time"
35 34
 )
36 35
 
@@ -415,62 +414,56 @@ func (self *TcpInput) Stop() {
415 414
 var MessageGenerator = new(msgGenerator)
416 415
 
417 416
 type msgGenerator struct {
418  
-	RouterChan  chan *messageHolder
  417
+	RouterChan  chan *PipelinePack
419 418
 	OutputChan  chan outputMsg
420  
-	RecycleChan chan *messageHolder
  419
+	RecycleChan chan *PipelinePack
421 420
 	hostname    string
422 421
 	pid         int32
423 422
 }
424 423
 
425 424
 func (self *msgGenerator) Init() {
426  
-	self.RouterChan = make(chan *messageHolder, PoolSize)
  425
+	self.RouterChan = make(chan *PipelinePack, PoolSize)
427 426
 	self.OutputChan = make(chan outputMsg, PoolSize)
428  
-	self.RecycleChan = make(chan *messageHolder, PoolSize)
  427
+	self.RecycleChan = make(chan *PipelinePack, PoolSize)
429 428
 	for i := 0; i < PoolSize; i++ {
430  
-		msg := messageHolder{new(Message), 1}
431  
-		self.RecycleChan <- &msg
  429
+		self.RecycleChan <- NewPipelinePack(self.RecycleChan)
432 430
 	}
433 431
 	self.hostname, _ = os.Hostname()
434 432
 	self.pid = int32(os.Getpid())
435 433
 }
436 434
 
437 435
 // Retrieve a message for use by the MessageGenerator.
438  
-func (self *msgGenerator) Retrieve() (msg *messageHolder) {
439  
-	msg = <-self.RecycleChan
440  
-	msg.Message.SetTimestamp(time.Now().UnixNano())
441  
-	msg.Message.SetUuid(uuid.NewRandom())
442  
-	msg.Message.SetHostname(self.hostname)
443  
-	msg.Message.SetPid(self.pid)
444  
-	msg.RefCount = 1
445  
-	return msg
  436
+func (self *msgGenerator) Retrieve() (pack *PipelinePack) {
  437
+	pack = <-self.RecycleChan
  438
+	pack.Message.SetTimestamp(time.Now().UnixNano())
  439
+	pack.Message.SetUuid(uuid.NewRandom())
  440
+	pack.Message.SetHostname(self.hostname)
  441
+	pack.Message.SetPid(self.pid)
  442
+	pack.RefCount = 1
  443
+	return
446 444
 }
447 445
 
448 446
 // Injects a message using the MessageGenerator.
449  
-func (self *msgGenerator) Inject(msg *messageHolder) {
450  
-	self.RouterChan <- msg
  447
+func (self *msgGenerator) Inject(pack *PipelinePack) {
  448
+	self.RouterChan <- pack
451 449
 }
452 450
 
453 451
 // Sends a message directly to a specific output.
454  
-func (self *msgGenerator) Output(name string, msg *messageHolder) {
455  
-	outMsg := outputMsg{name, msg}
  452
+func (self *msgGenerator) Output(name string, pack *PipelinePack) {
  453
+	outMsg := outputMsg{name, pack}
456 454
 	self.OutputChan <- outMsg
457 455
 }
458 456
 
459 457
 // MessageGeneratorInput
460 458
 type MessageGeneratorInput struct {
461  
-	routerChan  chan *messageHolder
  459
+	routerChan  chan *PipelinePack
462 460
 	outputChan  chan outputMsg
463  
-	recycleChan chan *messageHolder
464  
-}
465  
-
466  
-type messageHolder struct {
467  
-	Message  *Message
468  
-	RefCount int32
  461
+	recycleChan chan *PipelinePack
469 462
 }
470 463
 
471 464
 type outputMsg struct {
472 465
 	outputName string
473  
-	msg        *messageHolder
  466
+	pack       *PipelinePack
474 467
 }
475 468
 
476 469
 func (self *MessageGeneratorInput) Init(config interface{}) error {
@@ -483,21 +476,18 @@ func (self *MessageGeneratorInput) Init(config interface{}) error {
483 476
 
484 477
 func (self *MessageGeneratorInput) Run(ir InputRunner, h PluginHelper) (err error) {
485 478
 	var pack *PipelinePack
486  
-	var msgHolder *messageHolder
487 479
 	var outMsg outputMsg
488 480
 	var output OutputRunner
489 481
 	ok := true
490  
-	packSupply := ir.InChan()
491 482
 	outChan := h.Router().InChan
492 483
 
493 484
 	for ok {
494 485
 		output = nil
495  
-		pack = <-packSupply
496 486
 		select {
497  
-		case msgHolder, ok = <-self.routerChan:
  487
+		case pack, ok = <-self.routerChan:
498 488
 			// if !ok we'll fall through below
499 489
 		case outMsg = <-self.outputChan:
500  
-			msgHolder = outMsg.msg
  490
+			pack = outMsg.pack
501 491
 			if output, ok = h.Output(outMsg.outputName); !ok {
502 492
 				ir.LogError(fmt.Errorf("No '%s' output", outMsg.outputName))
503 493
 				ok = true // still deliver to the router; is this what we want?
@@ -505,18 +495,12 @@ func (self *MessageGeneratorInput) Run(ir InputRunner, h PluginHelper) (err erro
505 495
 		}
506 496
 
507 497
 		if ok {
508  
-			msgHolder.Message.Copy(pack.Message)
509 498
 			pack.Decoded = true
510 499
 			if output != nil {
511 500
 				output.Deliver(pack)
512 501
 			} else {
513 502
 				outChan <- pack
514 503
 			}
515  
-			cnt := atomic.AddInt32(&msgHolder.RefCount, -1)
516  
-			if cnt == 0 {
517  
-				msgHolder.Message = new(Message)
518  
-				self.recycleChan <- msgHolder
519  
-			}
520 504
 		}
521 505
 	}
522 506
 
2  pipeline/inputs_test.go
@@ -75,7 +75,7 @@ func InputsSpec(c gs.Context) {
75 75
 	config := NewPipelineConfig(1)
76 76
 	ith := new(InputTestHelper)
77 77
 	ith.Msg = getTestMessage()
78  
-	ith.Pack = NewPipelinePack(config)
  78
+	ith.Pack = NewPipelinePack(config.RecycleChan)
79 79
 
80 80
 	// Specify localhost, but we're not really going to use the network
81 81
 	ith.AddrStr = "localhost:55565"
8  pipeline/mock_decoderrunner_test.go
@@ -103,12 +103,12 @@ func (_mr *_MockDecoderRunnerRecorder) SetName(arg0 interface{}) *gomock.Call {
103 103
 	return _mr.mock.ctrl.RecordCall(_mr.mock, "SetName", arg0)
104 104
 }
105 105
 
106  
-func (_m *MockDecoderRunner) Start(_param0 *sync.WaitGroup) {
107  
-	_m.ctrl.Call(_m, "Start", _param0)
  106
+func (_m *MockDecoderRunner) Start(_param0 PluginHelper, _param1 *sync.WaitGroup) {
  107
+	_m.ctrl.Call(_m, "Start", _param0, _param1)
108 108
 }
109 109
 
110  
-func (_mr *_MockDecoderRunnerRecorder) Start(arg0 interface{}) *gomock.Call {
111  
-	return _mr.mock.ctrl.RecordCall(_mr.mock, "Start", arg0)
  110
+func (_mr *_MockDecoderRunnerRecorder) Start(arg0, arg1 interface{}) *gomock.Call {
  111
+	return _mr.mock.ctrl.RecordCall(_mr.mock, "Start", arg0, arg1)
112 112
 }
113 113
 
114 114
 func (_m *MockDecoderRunner) UUID() string {
2  pipeline/mock_outputrunner_test.go
@@ -4,8 +4,8 @@
4 4
 package pipeline
5 5
 
6 6
 import (
7  
-	gomock "code.google.com/p/gomock/gomock"
8 7
 	sync "sync"
  8
+	gomock "code.google.com/p/gomock/gomock"
9 9
 	time "time"
10 10
 )
11 11
 
5  pipeline/outputs_test.go
@@ -71,10 +71,9 @@ func OutputsSpec(c gs.Context) {
71 71
 		config.Path = tmpFilePath
72 72
 
73 73
 		msg := getTestMessage()
74  
-		pack := NewPipelinePack(pConfig)
  74
+		pack := NewPipelinePack(pConfig.RecycleChan)
75 75
 		pack.Message = msg
76 76
 		pack.Decoded = true
77  
-		pack.Config.RecycleChan = make(chan *PipelinePack, 1)
78 77
 		plc := &PipelineCapture{Pack: pack}
79 78
 
80 79
 		toString := func(outData interface{}) string {
@@ -239,7 +238,7 @@ func OutputsSpec(c gs.Context) {
239 238
 		tcpOutput.connection = ts.NewMockConn(ctrl)
240 239
 
241 240
 		msg := getTestMessage()
242  
-		pack := NewPipelinePack(pConfig)
  241
+		pack := NewPipelinePack(pConfig.RecycleChan)
243 242
 		pack.Message = msg
244 243
 		pack.Decoded = true
245 244
 		plc := &PipelineCapture{Pack: pack}
28  pipeline/pipeline_runner.go
@@ -158,12 +158,12 @@ func (foRunner *foRunner) Filter() Filter {
158 158
 }
159 159
 
160 160
 type PipelinePack struct {
161  
-	MsgBytes []byte
162  
-	Message  *message.Message
163  
-	Config   *PipelineConfig
164  
-	Decoded  bool
165  
-	RefCount int32
166  
-	Signer   string
  161
+	MsgBytes    []byte
  162
+	Message     *message.Message
  163
+	RecycleChan chan *PipelinePack
  164
+	Decoded     bool
  165
+	RefCount    int32
  166
+	Signer      string
167 167
 }
168 168
 
169 169
 type PipelineCapture struct {
@@ -171,16 +171,16 @@ type PipelineCapture struct {
171 171
 	Captures map[string]string
172 172
 }
173 173
 
174  
-func NewPipelinePack(config *PipelineConfig) (pack *PipelinePack) {
  174
+func NewPipelinePack(recycleChan chan *PipelinePack) (pack *PipelinePack) {
175 175
 	msgBytes := make([]byte, message.MAX_MESSAGE_SIZE)
176 176
 	message := &message.Message{}
177 177
 
178 178
 	return &PipelinePack{
179  
-		MsgBytes: msgBytes,
180  
-		Message:  message,
181  
-		Config:   config,
182  
-		Decoded:  false,
183  
-		RefCount: int32(1),
  179
+		MsgBytes:    msgBytes,
  180
+		Message:     message,
  181
+		RecycleChan: recycleChan,
  182
+		Decoded:     false,
  183
+		RefCount:    int32(1),
184 184
 	}
185 185
 }
186 186
 
@@ -199,7 +199,7 @@ func (p *PipelinePack) Recycle() {
199 199
 	cnt := atomic.AddInt32(&p.RefCount, -1)
200 200
 	if cnt == 0 {
201 201
 		p.Zero()
202  
-		p.Config.RecycleChan <- p
  202
+		p.RecycleChan <- p
203 203
 	}
204 204
 }
205 205
 
@@ -232,7 +232,7 @@ func Run(config *PipelineConfig) {
232 232
 
233 233
 	// Initialize all of the PipelinePacks that we'll need
234 234
 	for i := 0; i < config.PoolSize; i++ {
235  
-		config.RecycleChan <- NewPipelinePack(config)
  235
+		config.RecycleChan <- NewPipelinePack(config.RecycleChan)
236 236
 	}
237 237
 
238 238
 	config.Router().Start()
53  pipeline/report.go
@@ -67,33 +67,33 @@ func PopulateReportMsg(pr PluginRunner, msg *message.Message) (err error) {
67 67
 }
68 68
 
69 69
 // Generate and return recycle channel and plugin report messages.
70  
-func (pc *PipelineConfig) reports() (reports map[string]*messageHolder) {
71  
-	reports = make(map[string]*messageHolder)
  70
+func (pc *PipelineConfig) reports() (reports map[string]*PipelinePack) {
  71
+	reports = make(map[string]*PipelinePack)
72 72
 	var (
73 73
 		f      *message.Field
74  
-		holder *messageHolder
  74
+		pack   *PipelinePack
75 75
 		msg    *message.Message
76 76
 		err, e error
77 77
 	)
78 78
 
79  
-	holder = MessageGenerator.Retrieve()
80  
-	msg = holder.Message
  79
+	pack = MessageGenerator.Retrieve()
  80
+	msg = pack.Message
81 81
 	newIntField(msg, "InChanCapacity", cap(pc.RecycleChan))
82 82
 	newIntField(msg, "InChanLength", len(pc.RecycleChan))
83 83
 	msg.SetType("heka.recycler-report")
84  
-	reports["RecycleChan"] = holder
  84
+	reports["RecycleChan"] = pack
85 85
 
86  
-	holder = MessageGenerator.Retrieve()
87  
-	msg = holder.Message
  86
+	pack = MessageGenerator.Retrieve()
  87
+	msg = pack.Message
88 88
 	newIntField(msg, "InChanCapacity", cap(pc.Router().InChan))
89 89
 	newIntField(msg, "InChanLength", len(pc.Router().InChan))
90 90
 	msg.SetType("heka.router-report")
91  
-	reports["Router"] = holder
  91
+	reports["Router"] = pack
92 92
 
93  
-	getReport := func(runner PluginRunner) (holder *messageHolder) {
94  
-		holder = MessageGenerator.Retrieve()
95  
-		if err = PopulateReportMsg(runner, holder.Message); err != nil {
96  
-			msg = holder.Message
  93
+	getReport := func(runner PluginRunner) (pack *PipelinePack) {
  94
+		pack = MessageGenerator.Retrieve()
  95
+		if err = PopulateReportMsg(runner, pack.Message); err != nil {
  96
+			msg = pack.Message
97 97
 			f, e = message.NewField("Error", err.Error(), message.Field_RAW)
98 98
 			if e == nil {
99 99
 				msg.AddField(f)
@@ -106,11 +106,11 @@ func (pc *PipelineConfig) reports() (reports map[string]*messageHolder) {
106 106
 	var dRunner DecoderRunner
107 107
 
108 108
 	for name, runner := range pc.InputRunners {
109  
-		holder = getReport(runner)
110  
-		if len(holder.Message.Fields) > 0 || holder.Message.GetPayload() != "" {
111  
-			reports[name] = holder
  109
+		pack = getReport(runner)
  110
+		if len(pack.Message.Fields) > 0 || pack.Message.GetPayload() != "" {
  111
+			reports[name] = pack
112 112
 		} else {
113  
-			MessageGenerator.RecycleChan <- holder
  113
+			pack.Recycle()
114 114
 		}
115 115
 		for _, dRunner = range runner.DecoderSource().RunningDecoders() {
116 116
 			reports[dRunner.Name()] = getReport(dRunner)
@@ -134,24 +134,23 @@ func (pc *PipelineConfig) allReportsMsg() {
134 134
 	var line string
135 135
 	reports := pc.reports()
136 136
 
137  
-	for name, holder := range reports {
  137
+	for name, pack := range reports {
138 138
 		line = fmt.Sprintf("%s:", name)
139 139
 		payload = append(payload, line)
140  
-		for _, field := range holder.Message.Fields {
  140
+		for _, field := range pack.Message.Fields {
141 141
 			line = fmt.Sprintf("\t%s:\t%v", field.GetName(), field.GetValue())
142 142
 			payload = append(payload, line)
143 143
 		}
144  
-		if holder.Message.GetPayload() != "" {
145  
-			line = fmt.Sprintf("\tPayload:\t%s", holder.Message.GetPayload())
  144
+		if pack.Message.GetPayload() != "" {
  145
+			line = fmt.Sprintf("\tPayload:\t%s", pack.Message.GetPayload())
146 146
 			payload = append(payload, line)
147 147
 		}
148 148
 		payload = append(payload, "")
149  
-		holder.Message = new(message.Message)
150  
-		MessageGenerator.RecycleChan <- holder
  149
+		pack.Recycle()
151 150
 	}
152 151
 
153  
-	holder := MessageGenerator.Retrieve()
154  
-	holder.Message.SetType("heka.all-report")
155  
-	holder.Message.SetPayload(strings.Join(payload, "\n"))
156  
-	MessageGenerator.Inject(holder)
  152
+	pack := MessageGenerator.Retrieve()
  153
+	pack.Message.SetType("heka.all-report")
  154
+	pack.Message.SetPayload(strings.Join(payload, "\n"))
  155
+	MessageGenerator.Inject(pack)
157 156
 }
10  pipeline/sandbox_filter.go
@@ -70,11 +70,11 @@ func (this *SandboxFilter) Init(config interface{}) (err error) {
70 70
 	}
71 71
 
72 72
 	this.sb.InjectMessage(func(s string) {
73  
-		msg := MessageGenerator.Retrieve()
74  
-		msg.Message.SetType("heka.sandbox")
75  
-		msg.Message.SetLogger(path.Base(this.sbc.ScriptFilename))
76  
-		msg.Message.SetPayload(s)
77  
-		MessageGenerator.Inject(msg)
  73
+		pack := MessageGenerator.Retrieve()
  74
+		pack.Message.SetType("heka.sandbox")
  75
+		pack.Message.SetLogger(path.Base(this.sbc.ScriptFilename))
  76
+		pack.Message.SetPayload(s)
  77
+		MessageGenerator.Inject(pack)
78 78
 	})
79 79
 
80 80
 	return err
10  pipeline/stat_filter.go
@@ -59,11 +59,11 @@ func (s *StatFilter) Run(fr FilterRunner, h PluginHelper) (err error) {
59 59
 
60 60
 		// We matched, generate appropriate metrics
61 61
 		for _, met := range s.metrics {
62  
-			m := MessageGenerator.Retrieve()
63  
-			m.Message.SetType(met.Type_)
64  
-			m.Message.SetLogger(InterpolateString(met.Name, captures))
65  
-			m.Message.SetPayload(InterpolateString(met.Value, captures))
66  
-			MessageGenerator.Inject(m)
  62
+			pack := MessageGenerator.Retrieve()
  63
+			pack.Message.SetType(met.Type_)
  64
+			pack.Message.SetLogger(InterpolateString(met.Name, captures))
  65
+			pack.Message.SetPayload(InterpolateString(met.Value, captures))
  66
+			MessageGenerator.Inject(pack)
67 67
 		}
68 68
 	}
69 69
 
10  pipeline/statsd_input_writer.go
@@ -277,10 +277,10 @@ func (sm *statMonitor) Flush() {
277 277
 		numStats++
278 278
 	}
279 279
 	fmt.Fprintf(buffer, "statsd.numStats %d %d\n", numStats, nowUnix)
280  
-	newMsg := MessageGenerator.Retrieve()
281  
-	newMsg.Message.SetType("statmetric")
282  
-	newMsg.Message.SetTimestamp(now.UnixNano())
283  
-	newMsg.Message.SetPayload(buffer.String())
284  
-	MessageGenerator.Inject(newMsg)
  280
+	pack := MessageGenerator.Retrieve()
  281
+	pack.Message.SetType("statmetric")
  282
+	pack.Message.SetTimestamp(now.UnixNano())
  283
+	pack.Message.SetPayload(buffer.String())
  284
+	MessageGenerator.Inject(pack)
285 285
 	return
286 286
 }
3  pipeline/whisper_test.go
@@ -114,9 +114,8 @@ func WhisperOutputSpec(c gospec.Context) {
114 114
 			o.dbs[statName] = mockWr
115 115
 		}
116 116
 
117  
-		pack := NewPipelinePack(pConfig)
  117
+		pack := NewPipelinePack(pConfig.RecycleChan)
118 118
 		pack.Message.SetPayload(strings.Join(lines, "\n"))
119  
-		pack.Config.RecycleChan = make(chan *PipelinePack, 1) // don't block on recycle
120 119
 		plc := &PipelineCapture{Pack: pack}
121 120
 
122 121
 		c.Specify("turns statmetric lines into points", func() {
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.