Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge branch 'erewh0n-fix-file-polling-input' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rafrombrc committed Feb 24, 2015
2 parents f91b9b2 + d7cf3c0 commit af72598
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 1 deletion.
1 change: 1 addition & 0 deletions plugins/file/file_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func FileOutputSpec(c gs.Context) {

c.Specify("when interval triggers first", func() {
timerChan <- time.Now()
runtime.Gosched()
select {
case <-fileOutput.batchChan:
default:
Expand Down
6 changes: 5 additions & 1 deletion plugins/file/file_polling_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (input *FilePollingInput) Stop() {

func (input *FilePollingInput) packDecorator(pack *pipeline.PipelinePack) {
pack.Message.SetType("heka.file.polling")
pack.Message.SetHostname(input.hostname)

field, err := message.NewField("TickerInterval", int(input.TickerInterval), "")
if err != nil {
input.runner.LogError(
Expand All @@ -76,8 +78,10 @@ func (input *FilePollingInput) Run(runner pipeline.InputRunner,
input.runner = runner
input.hostname = helper.PipelineConfig().Hostname()
tickChan := runner.Ticker()

sRunner := runner.NewSplitterRunner("")
if !sRunner.UseMsgBytes() {
sRunner.SetPackDecorator(input.packDecorator)
}

for {
select {
Expand Down
2 changes: 2 additions & 0 deletions plugins/file/file_polling_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func FilePollingInputSpec(c gs.Context) {
c.Assume(err, gs.IsNil)

ith.MockInputRunner.EXPECT().NewSplitterRunner("").Return(ith.MockSplitterRunner)
ith.MockSplitterRunner.EXPECT().UseMsgBytes().Return(false)
ith.MockSplitterRunner.EXPECT().SetPackDecorator(gomock.Any())
splitCall := ith.MockSplitterRunner.EXPECT().SplitStream(gomock.Any(),
nil).Return(io.EOF).Times(2)
splitCall.Do(func(f *os.File, del Deliverer) {
Expand Down

0 comments on commit af72598

Please sign in to comment.