Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check-endpoints: minimize sorting delay #924

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 40 additions & 38 deletions pkg/cmd/checkendpoints/controller/connection_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net"
"regexp"
"sync"
"time"

operatorcontrolplanev1alpha1 "github.com/openshift/api/operatorcontrolplane/v1alpha1"
Expand All @@ -19,10 +18,15 @@ import (
"github.com/openshift/cluster-kube-apiserver-operator/pkg/cmd/checkendpoints/trace"
)

const (
checkPeriod = 1 * time.Second
checkTimeout = 10 * time.Second
)

// ConnectionChecker checks a single connection and updates status when appropriate
type ConnectionChecker interface {
Run(ctx context.Context)
Stop()
Stop(ctx context.Context)
}

type GetCheckFunc func() *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck
Expand All @@ -36,10 +40,18 @@ func NewConnectionChecker(name, podName string, getCheck GetCheckFunc, client v1
client: client,
clientCertGetter: clientCertGetter,
recorder: recorder,
updates: NewUpdatesManager(checkPeriod, checkTimeout, newUpdatesProcessor(client, name)),
stop: make(chan interface{}),
}
}

func newUpdatesProcessor(client v1alpha1helpers.PodNetworkConnectivityCheckClient, name string) UpdatesProcessor {
return func(ctx context.Context, updates ...v1alpha1helpers.UpdateStatusFunc) error {
_, _, err := v1alpha1helpers.UpdateStatus(ctx, client, name, updates...)
return err
}
}

type CertificatesGetter func() []tls.Certificate

type connectionChecker struct {
Expand All @@ -50,21 +62,13 @@ type connectionChecker struct {
client v1alpha1helpers.PodNetworkConnectivityCheckClient
clientCertGetter CertificatesGetter
recorder events.Recorder
updatesLock sync.Mutex
updates []v1alpha1helpers.UpdateStatusFunc
updates UpdatesManager
stop chan interface{}
}

// add queues status updates in a queue.
func (c *connectionChecker) add(updates ...v1alpha1helpers.UpdateStatusFunc) {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
c.updates = append(c.updates, updates...)
}

// checkConnection checks the connection every second, updating status as needed
// checkConnection checks the connection periodically, updating status as needed
func (c *connectionChecker) checkConnection(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(checkPeriod)
defer ticker.Stop()
defer klog.V(1).Infof("Stopped connectivity check %s.", c.name)
for {
Expand All @@ -79,11 +83,11 @@ func (c *connectionChecker) checkConnection(ctx context.Context) {
currCheck := c.getCheck()
// if we have no check or the check isn't for us or the check has no target, report status if needed, but nothing else
if currCheck == nil || currCheck.Spec.SourcePod != c.podName || len(currCheck.Spec.TargetEndpoint) == 0 {
c.updateStatus(ctx)
c.updateStatus(ctx, false)
return
}
c.checkEndpoint(ctx, currCheck)
c.updateStatus(ctx)
c.updateStatus(ctx, false)
}()
}
}
Expand All @@ -101,43 +105,36 @@ func (c *connectionChecker) Run(ctx context.Context) {
}()
go wait.UntilWithContext(ctx2, func(ctx context.Context) {
c.checkConnection(ctx2)
}, 1*time.Second)
}, checkPeriod)
klog.V(1).Infof("Started connectivity check %s.", c.name)
<-ctx2.Done()
}

// Stop
func (c *connectionChecker) Stop() {
c.updateStatus(context.TODO())
func (c *connectionChecker) Stop(ctx context.Context) {
c.updateStatus(ctx, true)
close(c.stop)
}

// updateStatus applies updates. If an error occurs applying an update,
// it remain on the queue and retried on the next call to updateStatus.
func (c *connectionChecker) updateStatus(ctx context.Context) {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
if len(c.updates) > 20 {
_, _, err := v1alpha1helpers.UpdateStatus(ctx, c.client, c.name, c.updates...)
if err != nil {
klog.Warningf("Unable to update %s: %v", c.name, err)
return
}
c.updates = nil
func (c *connectionChecker) updateStatus(ctx context.Context, flush bool) {
if err := c.updates.Process(ctx, false); err != nil {
klog.Warningf("Unable to update status of %s: %v", c.name, err)
}
}

// checkEndpoint performs the check and manages the PodNetworkConnectivityCheck.Status changes that result.
func (c *connectionChecker) checkEndpoint(ctx context.Context, check *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck) {
latencyInfo, err := c.getTCPConnectLatency(ctx, check.Spec.TargetEndpoint)
statusUpdates := manageStatusLogs(check, err, latencyInfo)
statusUpdates, timestamp := manageStatusLogs(check, err, latencyInfo)
if len(statusUpdates) > 0 {
statusUpdates = append(statusUpdates, manageStatusOutage(c.recorder))
}
if len(statusUpdates) > 0 {
statusUpdates = append(statusUpdates, manageStatusConditions)
}
c.add(statusUpdates...)
c.updates.Add(timestamp, statusUpdates...)
}

// getTCPConnectLatency connects to a tcp endpoint and collects latency info
Expand All @@ -148,7 +145,7 @@ func (c *connectionChecker) getTCPConnectLatency(ctx context.Context, address st

// tcp connection
dialer := &net.Dialer{
Timeout: 10 * time.Second,
Timeout: checkTimeout,
}
tcpConn, err := dialer.DialContext(ctx, "tcp", address)
if err != nil {
Expand Down Expand Up @@ -184,9 +181,9 @@ func isDNSError(err error) bool {
return false
}

// manageStatusLogs returns a status update function that updates the PodNetworkConnectivityCheck.Status's
// Successes/Failures logs reflect the results of the check.
func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck, checkErr error, latency *trace.LatencyInfo) []v1alpha1helpers.UpdateStatusFunc {
// manageStatusLogs returns status update functions that updates the PodNetworkConnectivityCheck.Status's
// Successes/Failures logs reflect the results of the check. The time that the check started is also returned.
func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheck, checkErr error, latency *trace.LatencyInfo) ([]v1alpha1helpers.UpdateStatusFunc, time.Time) {
var statusUpdates []v1alpha1helpers.UpdateStatusFunc
description := regexp.MustCompile(".*-to-").ReplaceAllString(check.Name, "")
host, _, _ := net.SplitHostPort(check.Spec.TargetEndpoint)
Expand All @@ -198,8 +195,9 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Reason: operatorcontrolplanev1alpha1.LogEntryReasonDNSError,
Message: fmt.Sprintf("%s: failure looking up host %s: %v", description, host, checkErr),
Latency: metav1.Duration{Duration: latency.DNS},
}))
})), latency.DNSStart
}
var overallStart time.Time
if latency.DNS != 0 {
klog.V(2).Infof("%7s | %-15s | %10s | Resolved host name %s successfully", "Success", "DNSResolve", latency.DNS, host)
statusUpdates = append(statusUpdates, v1alpha1helpers.AddSuccessLogEntry(operatorcontrolplanev1alpha1.LogEntry{
Expand All @@ -209,6 +207,10 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Message: fmt.Sprintf("%s: resolved host name %s successfully", description, host),
Latency: metav1.Duration{Duration: latency.DNS},
}))
overallStart = latency.DNSStart
}
if overallStart.IsZero() {
overallStart = latency.ConnectStart
}
if checkErr != nil {
klog.V(2).Infof("%7s | %-15s | %10s | Failed to establish a TCP connection to %s: %v", "Failure", "TCPConnectError", latency.Connect, check.Spec.TargetEndpoint, checkErr)
Expand All @@ -218,7 +220,7 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Reason: operatorcontrolplanev1alpha1.LogEntryReasonTCPConnectError,
Message: fmt.Sprintf("%s: failed to establish a TCP connection to %s: %v", description, check.Spec.TargetEndpoint, checkErr),
Latency: metav1.Duration{Duration: latency.Connect},
}))
})), overallStart
}
klog.V(2).Infof("%7s | %-15s | %10s | TCP connection to %v succeeded", "Success", "TCPConnect", latency.Connect, check.Spec.TargetEndpoint)
return append(statusUpdates, v1alpha1helpers.AddSuccessLogEntry(operatorcontrolplanev1alpha1.LogEntry{
Expand All @@ -227,11 +229,11 @@ func manageStatusLogs(check *operatorcontrolplanev1alpha1.PodNetworkConnectivity
Reason: operatorcontrolplanev1alpha1.LogEntryReasonTCPConnect,
Message: fmt.Sprintf("%s: tcp connection to %s succeeded", description, check.Spec.TargetEndpoint),
Latency: metav1.Duration{Duration: latency.Connect},
}))
})), overallStart
}

// manageStatusOutage returns a status update function that manages the
// PodNetworkConnectivityCheck.Status entries based on Successes/Failures log entries.
// PodNetworkConnectivityCheck.Status.Outage entries based on Successes/Failures log entries.
func manageStatusOutage(recorder events.Recorder) v1alpha1helpers.UpdateStatusFunc {
return func(status *operatorcontrolplanev1alpha1.PodNetworkConnectivityCheckStatus) {
// This func is kept simple by assuming that only one log entry has been
Expand Down
20 changes: 14 additions & 6 deletions pkg/cmd/checkendpoints/controller/connection_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ func TestManageStatusLogs(t *testing.T) {
testDNSErr := &net.OpError{Op: "connect", Net: "tcp", Err: &net.DNSError{Err: "test error", Name: "host"}}

testCases := []struct {
name string
err error
trace *trace.LatencyInfo
initial *v1alpha1.PodNetworkConnectivityCheckStatus
expected *v1alpha1.PodNetworkConnectivityCheckStatus
name string
err error
trace *trace.LatencyInfo
initial *v1alpha1.PodNetworkConnectivityCheckStatus
expected *v1alpha1.PodNetworkConnectivityCheckStatus
expectedTimestamp time.Time
}{
{
name: "TCPConnect",
Expand All @@ -37,6 +38,7 @@ func TestManageStatusLogs(t *testing.T) {
expected: podNetworkConnectivityCheckStatus(
withSuccessEntry(tcpConnectEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "DNSResolve",
Expand All @@ -51,6 +53,7 @@ func TestManageStatusLogs(t *testing.T) {
withSuccessEntry(tcpConnectEntry(1)),
withSuccessEntry(dnsResolveEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "DNSError",
Expand All @@ -63,6 +66,7 @@ func TestManageStatusLogs(t *testing.T) {
expected: podNetworkConnectivityCheckStatus(
withFailureEntry(dnsErrorEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "TCPConnectError",
Expand All @@ -75,6 +79,7 @@ func TestManageStatusLogs(t *testing.T) {
expected: podNetworkConnectivityCheckStatus(
withFailureEntry(tcpConnectErrorEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "DNSResolveTCPConnectError",
Expand All @@ -90,6 +95,7 @@ func TestManageStatusLogs(t *testing.T) {
withFailureEntry(tcpConnectErrorEntry(1)),
withSuccessEntry(dnsResolveEntry(0)),
),
expectedTimestamp: testTime(0),
},
{
name: "SuccessSort",
Expand All @@ -111,12 +117,13 @@ func TestManageStatusLogs(t *testing.T) {
withSuccessEntry(tcpConnectEntry(1)),
withSuccessEntry(tcpConnectEntry(0)),
),
expectedTimestamp: testTime(3),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
status := tc.initial
updateStatusFuncs := manageStatusLogs(&v1alpha1.PodNetworkConnectivityCheck{
updateStatusFuncs, timestamp := manageStatusLogs(&v1alpha1.PodNetworkConnectivityCheck{
ObjectMeta: metav1.ObjectMeta{
Name: "test-to-target-endpoint",
},
Expand All @@ -128,6 +135,7 @@ func TestManageStatusLogs(t *testing.T) {
updateStatusFunc(status)
}
assert.Equal(t, tc.expected, status)
assert.Equal(t, tc.expectedTimestamp, timestamp)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *controller) Sync(ctx context.Context, syncContext factory.SyncContext)
}
}
if !keep {
updater.Stop()
updater.Stop(ctx)
delete(c.updaters, name)
}
}
Expand Down