Skip to content

Commit

Permalink
Merge pull request #730 from johnSchnake/retryResults
Browse files Browse the repository at this point in the history
Retry results
  • Loading branch information
stevesloka committed Jun 5, 2019
2 parents 98d7d32 + f51d88d commit 4d4ff7a
Show file tree
Hide file tree
Showing 3 changed files with 1,814 additions and 18 deletions.
61 changes: 51 additions & 10 deletions pkg/plugin/aggregation/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"path"
"sync"
"time"

"github.com/heptio/sonobuoy/pkg/plugin"
"github.com/heptio/sonobuoy/pkg/tarball"
Expand All @@ -35,7 +36,8 @@ import (
)

const (
gzipMimeType = "application/gzip"
gzipMimeType = "application/gzip"
defaultRetryWindow = 15 * time.Second
)

// Aggregator is responsible for taking results from an HTTP server (configured
Expand All @@ -45,18 +47,32 @@ const (
type Aggregator struct {
// OutputDir is the directory to write the node results
OutputDir string

// Results stores a map of check-in results the server has seen
Results map[string]*plugin.Result

// ExpectedResults stores a map of results the server should expect
ExpectedResults map[string]*plugin.ExpectedResult

// FailedResults is a map to track which plugin results were received
// but returned errors during processing. This enables us to retry results
// that failed to process if the client tries, as opposed to rejecting
// them as duplicates. Important if connection resets or network issues
// are common.
FailedResults map[string]time.Time

// resultEvents is a channel that is written to when results are seen
// by the server, so we can block until we're done.
resultEvents chan *plugin.Result

// resultsMutex prevents race conditions if two identical results
// come in at the same time.
resultsMutex sync.Mutex

// retryWindow is the duration which the server will continue to block during
// Wait() after a FailedResult has been reported, even if all expected results
// are accounted for. This prevents racing the client retries that may occur.
retryWindow time.Duration
}

// httpError is an internal error type which allows us to unify result processing
Expand Down Expand Up @@ -87,7 +103,9 @@ func NewAggregator(outputDir string, expected []plugin.ExpectedResult) *Aggregat
OutputDir: outputDir,
Results: make(map[string]*plugin.Result, len(expected)),
ExpectedResults: make(map[string]*plugin.ExpectedResult, len(expected)),
FailedResults: make(map[string]time.Time, len(expected)),
resultEvents: make(chan *plugin.Result, len(expected)),
retryWindow: defaultRetryWindow,
}

for i, expResult := range expected {
Expand All @@ -106,6 +124,22 @@ func (a *Aggregator) Wait(stop chan bool) {
return
}
}

// Give all clients a chance to retry failed requests.
for _, failedTime := range a.FailedResults {
remainingTime := retryWindowRemaining(failedTime, time.Now(), a.retryWindow)

// A sleep for 0 or < 0 returns immediately.
time.Sleep(remainingTime)
}
}

// retryWindowRemaining wraps the awkward looking calculation to see the time beteween
// two events and subtract out a given duration. If the returned duration is 0 or negative
// it means that the time between the first and second events is equal or greater to the
// window's duration.
func retryWindowRemaining(first, second time.Time, window time.Duration) time.Duration {
return first.Add(window).Sub(second)
}

// isComplete returns true if sure all expected results have checked in.
Expand Down Expand Up @@ -150,21 +184,35 @@ func (a *Aggregator) processResult(result *plugin.Result) error {
}
}

// Don't allow duplicates
if a.isResultDuplicate(result) {
// Don't allow duplicates unless it failed to process fully.
isDup := a.isResultDuplicate(result)
_, hadErrs := a.FailedResults[resultID]
if isDup && !hadErrs {
return &httpError{
err: fmt.Errorf("result %v already received", resultID),
code: http.StatusConflict,
}
}

// Send an event that we got this result even if we get an error, so
// that Wait() doesn't hang forever on problems.
defer func() {
a.Results[result.ExpectedResultID()] = result
a.resultEvents <- result
}()

if err := a.handleResult(result); err != nil {
// Drop a breadcrumb so that we reconsider new results from this result.
a.FailedResults[result.ExpectedResultID()] = time.Now()
return &httpError{
err: fmt.Errorf("error handling result %v: %v", resultID, err),
code: http.StatusInternalServerError,
}
}

// Upon success, we no longer want to keep processing duplicate results.
delete(a.FailedResults, result.ExpectedResultID())

return nil
}

Expand Down Expand Up @@ -223,13 +271,6 @@ func (a *Aggregator) IngestResults(resultsCh <-chan *plugin.Result) {
// handleResult takes a given plugin Result and writes it out to the
// filesystem, signaling to the resultEvents channel when complete.
func (a *Aggregator) handleResult(result *plugin.Result) error {
// Send an event that we got this result even if we get an error, so
// that Wait() doesn't hang forever on problems.
defer func() {
a.Results[result.ExpectedResultID()] = result
a.resultEvents <- result
}()

if result.MimeType == gzipMimeType {
return a.handleArchiveResult(result)
}
Expand Down
161 changes: 153 additions & 8 deletions pkg/plugin/aggregation/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package aggregation

import (
"bytes"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"testing"
"testing/iotest"
"time"

"github.com/heptio/sonobuoy/pkg/backplane/ca/authtest"
"github.com/heptio/sonobuoy/pkg/plugin"
Expand All @@ -33,7 +38,7 @@ import (

func TestAggregation(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand Down Expand Up @@ -61,7 +66,7 @@ func TestAggregation(t *testing.T) {

func TestAggregation_noExtension(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand All @@ -88,7 +93,7 @@ func TestAggregation_noExtension(t *testing.T) {

func TestAggregation_tarfile(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{ResultType: "e2e"},
{ResultType: "e2e"},
}

fileBytes := []byte("foo")
Expand Down Expand Up @@ -125,7 +130,7 @@ func TestAggregation_tarfile(t *testing.T) {

func TestAggregation_wrongnodes(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand All @@ -147,8 +152,8 @@ func TestAggregation_wrongnodes(t *testing.T) {

func TestAggregation_duplicates(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
plugin.ExpectedResult{NodeName: "node12", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node12", ResultType: "systemd_logs"},
}
withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
URL, err := NodeResultURL(srv.URL, "node1", "systemd_logs")
Expand All @@ -174,9 +179,150 @@ func TestAggregation_duplicates(t *testing.T) {
})
}

func TestAggregation_duplicatesWithErrors(t *testing.T) {
// Setup aggregator with expected results and preload the test data/info
// that we want to transmit/compare against.
dir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
t.Fatalf("Could not create temp directory: %v", err)
}
defer os.RemoveAll(dir)
outpath := filepath.Join(dir, "systemd_logs", "results", "node1")
testDataPath := "./testdata/fakeLogData.txt"
testinfo, err := os.Stat(testDataPath)
if err != nil {
t.Fatalf("Could not stat test file: %v", err)
}
testDataReader, err := os.Open(testDataPath)
if err != nil {
t.Fatalf("Could not open test data file: %v", err)
}
defer testDataReader.Close()

expected := []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node12", ResultType: "systemd_logs"},
}
agg := NewAggregator(dir, expected)

// Send first result and force an error in processing.
errReader := iotest.TimeoutReader(testDataReader)
err = agg.processResult(&plugin.Result{Body: errReader, NodeName: "node1", ResultType: "systemd_logs"})
if err == nil {
t.Fatal("Expected error processing this due to reading error, instead got nil.")
}

// Confirm results are recorded but they are partial results.
realinfo, err := os.Stat(outpath)
if err != nil {
t.Fatalf("Could not stat output file: %v", err)
}
if realinfo.Size() == testinfo.Size() {
t.Fatal("Expected truncated results for first result (simulating error), but got all the data.")
}

// Retry the result without an error this time.
_, err = testDataReader.Seek(0, 0)
if err != nil {
t.Fatalf("Could not rewind test data file: %v", err)
}
err = agg.processResult(&plugin.Result{Body: testDataReader, NodeName: "node1", ResultType: "systemd_logs"})
if err != nil {
t.Errorf("Expected no error processing this result, got %v", err)
}

// Confirm the new results overwrite the old ones.
realinfo, err = os.Stat(outpath)
if err != nil {
t.Fatalf("Could not stat output file: %v", err)
}
if realinfo.Size() != testinfo.Size() {
t.Errorf("Expected all the data to be transmitted. Expected data size %v but got %v.", testinfo.Size(), realinfo.Size())
}
}

// TestAggregation_RetryWindow ensures that the server Wait() method
// gives clients a chance to retry if their results were not processed correctly.
func TestAggregation_RetryWindow(t *testing.T) {
// Setup aggregator with expected results and preload the test data/info
// that we want to transmit/compare against.
dir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
t.Fatalf("Could not create temp directory: %v", err)
}
defer os.RemoveAll(dir)
testRetryWindow := 1 * time.Second
testBufferDuration := 200 * time.Millisecond
expected := []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "systemd_logs"},
}

testCases := []struct {
desc string
postProcessSleep time.Duration
simulateErr bool
expectExtraWait time.Duration
}{
{
desc: "Error causes us to wait at least the retry window",
simulateErr: true,
expectExtraWait: testRetryWindow,
}, {
desc: "Retry window is sliding",
simulateErr: true,
postProcessSleep: 500 * time.Millisecond,
expectExtraWait: 500 * time.Millisecond,
}, {
desc: "Retry window can slide to 0",
simulateErr: true,
postProcessSleep: testRetryWindow,
expectExtraWait: 0,
}, {
desc: "No retry window without error",
simulateErr: false,
expectExtraWait: 0,
},
}

for _, tc := range testCases {
agg := NewAggregator(dir, expected)
// Shorten retry window for testing.
agg.retryWindow = testRetryWindow
testDataPath := "./testdata/fakeLogData.txt"
testDataReader, err := os.Open(testDataPath)
if err != nil {
t.Fatalf("Could not open test data file: %v", err)
}
defer testDataReader.Close()

var r io.Reader
if tc.simulateErr {
r = iotest.TimeoutReader(testDataReader)
} else {
r = strings.NewReader("foo")
}

err = agg.processResult(&plugin.Result{Body: r, NodeName: "node1", ResultType: "systemd_logs"})
if err == nil && tc.simulateErr {
t.Fatal("Expected error processing this due to reading error, instead got nil.")
}
// check time before/after wait and ensure it is greater than the retryWindow.
time.Sleep(tc.postProcessSleep)
start := time.Now()
agg.Wait(make(chan bool))
waitTime := time.Now().Sub(start)

// Add buffer to avoid raciness due to processing time.
diffTime := waitTime - tc.expectExtraWait
if diffTime > testBufferDuration || diffTime < -1*testBufferDuration {
t.Errorf("Expected Wait() to wait the duration (%v) due to failed result, instead waited only %v", agg.retryWindow, waitTime)
}
}
}

func TestAggregation_errors(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{ResultType: "e2e"},
{ResultType: "e2e"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand All @@ -202,7 +348,6 @@ func withAggregator(t *testing.T, expected []plugin.ExpectedResult, callback fun
dir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
t.Fatal("Could not create temp directory")
t.FailNow()
return
}
defer os.RemoveAll(dir)
Expand Down
Loading

0 comments on commit 4d4ff7a

Please sign in to comment.