Skip to content

Commit

Permalink
Merge pull request #924 from sanchezl/check-endpoints-min-delay
Browse files Browse the repository at this point in the history
check-endpoints: minimize sorting delay
  • Loading branch information
openshift-merge-robot committed Aug 10, 2020
2 parents 54b098a + 626592b commit 96ceb2c
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 45 deletions.
78 changes: 40 additions & 38 deletions pkg/cmd/checkendpoints/controller/connection_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net"
"regexp"
"sync"
"time"

operatorcontrolplanev1alpha1 "github.com/openshift/api/operatorcontrolplane/v1alpha1"
Expand All @@ -19,10 +18,15 @@ import (
"github.com/openshift/cluster-kube-apiserver-operator/pkg/cmd/checkendpoints/trace"
)

const (
checkPeriod = 1 * time.Second
checkTimeout = 10 * time.Second
)

// ConnectionChecker checks a single connection and updates status when appropriate
type ConnectionChecker interface {
Run(ctx context.Context)
Stop()
Stop(ctx context.Context)
}

type GetCheckFunc func() *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck
Expand All @@ -36,10 +40,18 @@ func NewConnectionChecker(name, podName string, getCheck GetCheckFunc, client v1
client: client,
clientCertGetter: clientCertGetter,
recorder: recorder,
updates: NewUpdatesManager(checkPeriod, checkTimeout, newUpdatesProcessor(client, name)),
stop: make(chan interface{}),
}
}

func newUpdatesProcessor(client v1alpha1helpers.PodNetworkConnectivityCheckClient, name string) UpdatesProcessor {
return func(ctx context.Context, updates ...v1alpha1helpers.UpdateStatusFunc) error {
_, _, err := v1alpha1helpers.UpdateStatus(ctx, client, name, updates...)
return err
}
}

type CertificatesGetter func() []tls.Certificate

type connectionChecker struct {
Expand All @@ -50,21 +62,13 @@ type connectionChecker struct {
client v1alpha1helpers.PodNetworkConnectivityCheckClient
clientCertGetter CertificatesGetter
recorder events.Recorder
updatesLock sync.Mutex
updates []v1alpha1helpers.UpdateStatusFunc
updates UpdatesManager
stop chan interface{}
}

// add queues status updates in a queue.
func (c *connectionChecker) add(updates ...v1alpha1helpers.UpdateStatusFunc) {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
c.updates = append(c.updates, updates...)
}

// checkConnection checks the connection every second, updating status as needed
// checkConnection checks the connection periodically, updating status as needed
func (c *connectionChecker) checkConnection(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(checkPeriod)
defer ticker.Stop()
defer klog.V(1).Infof("Stopped connectivity check %s.", c.name)
for {
Expand All @@ -79,11 +83,11 @@ func (c *connectionChecker) checkConnection(ctx context.Context) {
currCheck := c.getCheck()
// if we have no check or the check isn't for us or the check has no target, report status if needed, but nothing else
if currCheck == nil || currCheck.Spec.SourcePod != c.podName || len(currCheck.Spec.TargetEndpoint) == 0 {
c.updateStatus(ctx)
c.updateStatus(ctx, false)
return
}
c.checkEndpoint(ctx, currCheck)
c.updateStatus(ctx)
c.updateStatus(ctx, false)
}()
}
}
Expand All @@ -101,43 +105,36 @@ func (c *connectionChecker) Run(ctx context.Context) {
}()
go wait.UntilWithContext(ctx2, func(ctx context.Context) {
c.checkConnection(ctx2)
}, 1*time.Second)
}, checkPeriod)
klog.V(1).Infof("Started connectivity check %s.", c.name)
<-ctx2.Done()
}

// Stop
func (c *connectionChecker) Stop() {
c.updateStatus(context.TODO())
func (c *connectionChecker) Stop(ctx context.Context) {
c.updateStatus(ctx, true)
close(c.stop)
}

// updateStatus applies updates. If an error occurs applying an update,
// it remain on the queue and retried on the next call to updateStatus.
func (c *connectionChecker) updateStatus(ctx context.Context) {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
if len(c.updates) > 20 {
_, _, err := v1alpha1helpers.UpdateStatus(ctx, c.client, c.name, c.updates...)
if err != nil {
klog.Warningf("Unable to update %s: %v", c.name, err)
return
}
c.updates = nil
func (c *connectionChecker) updateStatus(ctx context.Context, flush bool) {
if err := c.updates.Process(ctx, false); err != nil {
klog.Warningf("Unable to update status of %s: %v", c.name, err)
}
}

// checkEndpoint performs the check and manages the PodNetworkConnectivityCheck.Status changes that result.
func (c *connectionChecker) checkEndpoint(ctx context.Context, check *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck) {
latencyInfo, err := c.getTCPConnectLatency(ctx, check.Spec.TargetEndpoint)
statusUpdates := manageStatusLogs(check, err, latencyInfo)
statusUpdates, timestamp := manageStatusLogs(check, err, latencyInfo)
if len(statusUpdates) > 0 {
statusUpdates = append(statusUpdates, manageStatusOutage(c.recorder))
}
if len(statusUpdates) > 0 {
statusUpdates = append(statusUpdates, manageStatusConditions)
}
c.add(statusUpdates...)
c.updates.Add(timestamp, statusUpdates...)
}

// getTCPConnectLatency connects to a tcp endpoint and collects latency info
Expand All @@ -148,7 +145,7 @@ func (c *connectionChecker) getTCPConnectLatency(ctx context.Context, address st

// tcp connection
dialer := &net.Dialer{
Timeout: 10 * time.Second,
Timeout: checkTimeout,
}
tcpConn, err := dialer.DialContext(ctx, "tcp", address)
if err != nil {
Expand Down Expand Up @@ -184,9 +181,9 @@ func isDNSError(err error) bool {
return false
}

// manageStatusLogs returns a status update function that updates the PodNetworkConnectivityCheck.Status's
// Successes/Failures logs reflect the results of the check.
func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck, checkErr error, latency *trace.LatencyInfo) []v1alpha1helpers.UpdateStatusFunc {
// manageStatusLogs returns status update functions that updates the PodNetworkConnectivityCheck.Status's
// Successes/Failures logs reflect the results of the check. The time that the check started is also returned.
func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck, checkErr error, latency *trace.LatencyInfo) ([]v1alpha1helpers.UpdateStatusFunc, time.Time) {
var statusUpdates []v1alpha1helpers.UpdateStatusFunc
description := regexp.MustCompile(".*-to-").ReplaceAllString(check.Name, "")
host, _, _ := net.SplitHostPort(check.Spec.TargetEndpoint)
Expand All @@ -198,8 +195,9 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Reason: operatorcontrolplanev1alpha1.LogEntryReasonDNSError,
Message: fmt.Sprintf("%s: failure looking up host %s: %v", description, host, checkErr),
Latency: metav1.Duration{Duration: latency.DNS},
}))
})), latency.DNSStart
}
var overallStart time.Time
if latency.DNS != 0 {
klog.V(2).Infof("%7s | %-15s | %10s | Resolved host name %s successfully", "Success", "DNSResolve", latency.DNS, host)
statusUpdates = append(statusUpdates, v1alpha1helpers.AddSuccessLogEntry(operatorcontrolplanev1alpha1.LogEntry{
Expand All @@ -209,6 +207,10 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Message: fmt.Sprintf("%s: resolved host name %s successfully", description, host),
Latency: metav1.Duration{Duration: latency.DNS},
}))
overallStart = latency.DNSStart
}
if overallStart.IsZero() {
overallStart = latency.ConnectStart
}
if checkErr != nil {
klog.V(2).Infof("%7s | %-15s | %10s | Failed to establish a TCP connection to %s: %v", "Failure", "TCPConnectError", latency.Connect, check.Spec.TargetEndpoint, checkErr)
Expand All @@ -218,7 +220,7 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Reason: operatorcontrolplanev1alpha1.LogEntryReasonTCPConnectError,
Message: fmt.Sprintf("%s: failed to establish a TCP connection to %s: %v", description, check.Spec.TargetEndpoint, checkErr),
Latency: metav1.Duration{Duration: latency.Connect},
}))
})), overallStart
}
klog.V(2).Infof("%7s | %-15s | %10s | TCP connection to %v succeeded", "Success", "TCPConnect", latency.Connect, check.Spec.TargetEndpoint)
return append(statusUpdates, v1alpha1helpers.AddSuccessLogEntry(operatorcontrolplanev1alpha1.LogEntry{
Expand All @@ -227,11 +229,11 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Reason: operatorcontrolplanev1alpha1.LogEntryReasonTCPConnect,
Message: fmt.Sprintf("%s: tcp connection to %s succeeded", description, check.Spec.TargetEndpoint),
Latency: metav1.Duration{Duration: latency.Connect},
}))
})), overallStart
}

// manageStatusOutage returns a status update function that manages the
// PodNetworkConnectivityCheck.Status entries based on Successes/Failures log entries.
// PodNetworkConnectivityCheck.Status.Outage entries based on Successes/Failures log entries.
func manageStatusOutage(recorder events.Recorder) v1alpha1helpers.UpdateStatusFunc {
return func(status *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheckStatus) {
// This func is kept simple by assuming that only one log entry has been
Expand Down
20 changes: 14 additions & 6 deletions pkg/cmd/checkendpoints/controller/connection_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ func TestManageStatusLogs(t *testing.T) {
testDNSErr := &net.OpError{Op: "connect", Net: "tcp", Err: &net.DNSError{Err: "test error", Name: "host"}}

testCases := []struct {
name string
err error
trace *trace.LatencyInfo
initial *v1alpha1.PodNetworkConnectivityCheckStatus
expected *v1alpha1.PodNetworkConnectivityCheckStatus
name string
err error
trace *trace.LatencyInfo
initial *v1alpha1.PodNetworkConnectivityCheckStatus
expected *v1alpha1.PodNetworkConnectivityCheckStatus
expectedTimestamp time.Time
}{
{
name: "TCPConnect",
Expand All @@ -37,6 +38,7 @@ func TestManageStatusLogs(t *testing.T) {
expected: podNetworkConnectivityCheckStatus(
withSuccessEntry(tcpConnectEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "DNSResolve",
Expand All @@ -51,6 +53,7 @@ func TestManageStatusLogs(t *testing.T) {
withSuccessEntry(tcpConnectEntry(1)),
withSuccessEntry(dnsResolveEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "DNSError",
Expand All @@ -63,6 +66,7 @@ func TestManageStatusLogs(t *testing.T) {
expected: podNetworkConnectivityCheckStatus(
withFailureEntry(dnsErrorEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "TCPConnectError",
Expand All @@ -75,6 +79,7 @@ func TestManageStatusLogs(t *testing.T) {
expected: podNetworkConnectivityCheckStatus(
withFailureEntry(tcpConnectErrorEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "DNSResolveTCPConnectError",
Expand All @@ -90,6 +95,7 @@ func TestManageStatusLogs(t *testing.T) {
withFailureEntry(tcpConnectErrorEntry(1)),
withSuccessEntry(dnsResolveEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "SuccessSort",
Expand All @@ -111,12 +117,13 @@ func TestManageStatusLogs(t *testing.T) {
withSuccessEntry(tcpConnectEntry(1)),
withSuccessEntry(tcpConnectEntry(0)),
),
expectedTimestamp: testTime(3),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
status := tc.initial
updateStatusFuncs := manageStatusLogs(&v1alpha1.PodNetworkConnectivityCheck{
updateStatusFuncs, timestamp := manageStatusLogs(&v1alpha1.PodNetworkConnectivityCheck{
ObjectMeta: metav1.ObjectMeta{
Name: "test-to-target-endpoint",
},
Expand All @@ -128,6 +135,7 @@ func TestManageStatusLogs(t *testing.T) {
updateStatusFunc(status)
}
assert.Equal(t, tc.expected, status)
assert.Equal(t, tc.expectedTimestamp, timestamp)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *controller) Sync(ctx context.Context, syncContext factory.SyncContext)
}
}
if !keep {
updater.Stop()
updater.Stop(ctx)
delete(c.updaters, name)
}
}
Expand Down

0 comments on commit 96ceb2c

Please sign in to comment.