Skip to content
This repository was archived by the owner on Aug 29, 2018. It is now read-only.

Commit 7fd1910

Browse files
author
Abhishek Gupta
authored
Merge pull request #6449 from ironcladlou/logshifter-wedge-fix
Fix input/output deadlock condition
2 parents aa81418 + 0171a60 commit 7fd1910

3 files changed

Lines changed: 106 additions & 17 deletions

File tree

logshifter/core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestDropOnBlockedOutput(t *testing.T) {
9191
// and the writer is unblocked.
9292
ag.AssertStatsEqual(t, map[string]float64{
9393
"input.read": 1000,
94-
"input.drop": 998,
94+
"input.evict": 998,
9595
"output.write": 2,
9696
})
9797

logshifter/input.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
// be placed on the channel:
1515
//
1616
// input.read => number of messages processed during a Read call
17-
// input.drop => number of messages dropped from queue during a Read call
17+
// input.queue => number of messages queued during a Read call
18+
// input.drop => number of new messages dropped during a Read call
19+
// input.evict => number of old messages evicted from the queue during a Read call
1820
// input.read.duration => time taken to process a line from reader (micros)
1921
type Input struct {
2022
bufferSize int
@@ -40,17 +42,20 @@ func (input *Input) Read() *sync.WaitGroup {
4042
return wg
4143
}
4244

45+
func (input *Input) record(name string, val float64) {
46+
if input.statsChannel != nil {
47+
input.statsChannel <- Stat{name: name, value: val}
48+
}
49+
}
50+
4351
// Private synchronous portion of Read.
4452
func (input *Input) read() {
4553
reader := bufio.NewReaderSize(input.reader, input.bufferSize)
4654

4755
for {
4856
line, _, err := reader.ReadLine()
4957

50-
var start time.Time
51-
if input.statsChannel != nil {
52-
start = time.Now()
53-
}
58+
start := time.Now()
5459

5560
if err != nil {
5661
break
@@ -64,24 +69,39 @@ func (input *Input) read() {
6469

6570
copy(cp, line)
6671

67-
if input.statsChannel != nil {
68-
input.statsChannel <- Stat{name: "input.read", value: 1.0}
69-
}
72+
input.record("input.read", 1.0)
7073

7174
select {
7275
case input.queue <- cp:
7376
// queued
77+
input.record("input.queue", 1.0)
7478
default:
75-
// evict the oldest entry to make room
76-
<-input.queue
77-
if input.statsChannel != nil {
78-
input.statsChannel <- Stat{name: "input.drop", value: 1.0}
79+
// try to evict the oldest entry to make room
80+
select {
81+
case <-input.queue:
82+
input.record("input.evict", 1.0)
83+
// try again to queue
84+
select {
85+
case input.queue <- cp:
86+
// queued
87+
input.record("input.queue", 1.0)
88+
default:
89+
// no room, drop it
90+
input.record("input.drop", 1.0)
91+
}
92+
default:
93+
// queue is already empty, try to queue
94+
select {
95+
case input.queue <- cp:
96+
// queued
97+
input.record("input.queue", 1.0)
98+
default:
99+
// no room, drop it
100+
input.record("input.drop", 1.0)
101+
}
79102
}
80-
input.queue <- cp
81103
}
82104

83-
if input.statsChannel != nil {
84-
input.statsChannel <- Stat{name: "input.read.duration", value: float64(time.Now().Sub(start).Nanoseconds()) / float64(1000)}
85-
}
105+
input.record("input.read.duration", float64(time.Now().Sub(start).Nanoseconds())/float64(1000))
86106
}
87107
}

logshifter/input_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"strings"
6+
"sync"
7+
"testing"
8+
)
9+
10+
// TestInputBlocking ensures that Input never blocks, regardless of the queue
11+
// consumption rate.
12+
func TestInputBlocking(t *testing.T) {
13+
messageCount := int64(10000)
14+
messageSize := 1024
15+
16+
// Use a small queue to help ensure an input backup.
17+
queue := make(chan []byte, 1)
18+
19+
// Set up a fast output sink to help ensure an input backup.
20+
go func() {
21+
for {
22+
<-queue
23+
}
24+
}()
25+
26+
// Collect stats.
27+
stats := make(map[string]float64)
28+
statsCh := make(chan Stat)
29+
stop := make(chan struct{})
30+
statsWg := sync.WaitGroup{}
31+
statsWg.Add(1)
32+
go func() {
33+
for {
34+
select {
35+
case <-stop:
36+
statsWg.Done()
37+
return
38+
case s := <-statsCh:
39+
stats[s.name] = stats[s.name] + s.value
40+
}
41+
}
42+
}()
43+
44+
// Create an input Reader.
45+
var data string
46+
var i int64 = 0
47+
for ; i < messageCount; i++ {
48+
data += strings.Repeat("0", messageSize-1) + "\n"
49+
}
50+
buffer := bytes.NewBufferString(data)
51+
t.Logf("created %d byte test input (%d lines @ %d bytes each)\n", buffer.Len(), messageCount, messageSize)
52+
53+
// Read until the test reader closes.
54+
input := &Input{
55+
reader: buffer,
56+
queue: queue,
57+
bufferSize: messageSize,
58+
statsChannel: statsCh,
59+
}
60+
input.read()
61+
62+
// If execution reaches this point, the Input hasn't wedged and the test is
63+
// successful.
64+
65+
// Shut down the stats collector.
66+
close(stop)
67+
statsWg.Wait()
68+
t.Logf("stats: %#v", stats)
69+
}

0 commit comments

Comments
 (0)