Skip to content

Commit

Permalink
Merge pull request #154 from alexanderConstantinescu/back-port-ovs-retry
Browse files Browse the repository at this point in the history
Bug 1853193: [release-4.5] Backport OVS-retries
  • Loading branch information
openshift-merge-robot committed Jul 21, 2020
2 parents 7150d11 + df08413 commit b71c64a
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 33 deletions.
39 changes: 22 additions & 17 deletions pkg/network/node/metrics.go → pkg/network/node/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build linux

package node
package metrics

import (
"fmt"
Expand All @@ -14,22 +14,26 @@ import (

utilruntime "k8s.io/apimachinery/pkg/util/runtime"

"github.com/openshift/sdn/pkg/network/node/ovs"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)

const (
SDNNamespace = "openshift"
SDNSubsystem = "sdn"
hostLocalDataDir = "/var/lib/cni/networks"
SDNNamespace = "openshift"
SDNSubsystem = "sdn"

OVSFlowsKey = "ovs_flows"
OVSOperationsKey = "ovs_operations"
ARPCacheAvailableEntriesKey = "arp_cache_entries"
PodIPsKey = "pod_ips"
PodOperationsErrorsKey = "pod_operations_errors"
PodOperationsLatencyKey = "pod_operations_latency"
VnidNotFoundErrorsKey = "vnid_not_found_errors"

// OVS Operation result type
OVSOperationSuccess = "success"
OVSOperationFailure = "failure"
// Pod Operation types
PodOperationSetup = "setup"
PodOperationTeardown = "teardown"
Expand All @@ -44,6 +48,15 @@ var (
Help: "Number of Open vSwitch flows",
},
)
OVSOperationsResult = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: SDNNamespace,
Subsystem: SDNSubsystem,
Name: OVSOperationsKey,
Help: "Cumulative number of OVS operations by result type",
},
[]string{"result_type"},
)

ARPCacheAvailableEntries = metrics.NewGauge(
&metrics.GaugeOpts{
Expand Down Expand Up @@ -106,6 +119,7 @@ var registerMetrics sync.Once
func RegisterMetrics() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(OVSFlows)
legacyregistry.MustRegister(OVSOperationsResult)
legacyregistry.MustRegister(ARPCacheAvailableEntries)
legacyregistry.MustRegister(PodIPs)
legacyregistry.MustRegister(PodOperationsErrors)
Expand All @@ -114,26 +128,17 @@ func RegisterMetrics() {
})
}

// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
// SinceInMicroseconds gets the time since the specified start in microseconds.
func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start) / time.Microsecond)
}

func gatherPeriodicMetrics(ovs ovs.Interface) {
updateOVSMetrics(ovs)
// GatherPeriodicMetrics is used to periodically gather metrics.
func GatherPeriodicMetrics() {
updateARPMetrics()
updatePodIPMetrics()
}

func updateOVSMetrics(ovs ovs.Interface) {
flows, err := ovs.DumpFlows("")
if err == nil {
OVSFlows.Set(float64(len(flows)))
} else {
utilruntime.HandleError(fmt.Errorf("failed to dump OVS flows for metrics: %v", err))
}
}

func updateARPMetrics() {
var used int
data, err := ioutil.ReadFile("/proc/net/arp")
Expand Down
6 changes: 4 additions & 2 deletions pkg/network/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/vishvananda/netlink"
"k8s.io/klog"

metrics "github.com/openshift/sdn/pkg/network/node/metrics"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -184,7 +185,7 @@ func New(c *OsdnNodeConfig) (*OsdnNode, error) {
egressIP: newEgressIPWatcher(oc, c.NodeIP, c.MasqueradeBit),
}

RegisterMetrics()
metrics.RegisterMetrics()

return plugin, nil
}
Expand Down Expand Up @@ -406,7 +407,8 @@ func (node *OsdnNode) Start() error {

go kwait.Forever(node.policy.SyncVNIDRules, time.Hour)
go kwait.Forever(func() {
gatherPeriodicMetrics(node.oc.ovs)
metrics.GatherPeriodicMetrics()
node.oc.ovs.UpdateOVSMetrics()
}, time.Minute*2)

return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/network/node/ovs/fake_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ func (fake *ovsFake) NewTransaction() Transaction {
return &ovsFakeTx{fake: fake, flows: []string{}}
}

func (fake *ovsFake) UpdateOVSMetrics() {
}

// sort.Interface support
type ovsFlows []OvsFlow

Expand Down
54 changes: 46 additions & 8 deletions pkg/network/node/ovs/ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"fmt"
"strconv"
"strings"
"time"

"k8s.io/klog"

metrics "github.com/openshift/sdn/pkg/network/node/metrics"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"k8s.io/utils/exec"
)

Expand Down Expand Up @@ -78,6 +81,9 @@ type Interface interface {

// NewTransaction begins a new OVS transaction.
NewTransaction() Transaction

// UpdateOVSMetrics runs a Dumpflows transaction and sets the gauge with the existing amount of flows
UpdateOVSMetrics()
}

// Transaction manages a single set of OVS flow modifications
Expand All @@ -102,6 +108,12 @@ const (
OVS_VSCTL = "ovs-vsctl"
)

var ovsBackoff utilwait.Backoff = utilwait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.25,
Steps: 10,
}

// ovsExec implements ovs.Interface via calls to ovs-ofctl and ovs-vsctl
type ovsExec struct {
execer exec.Interface
Expand Down Expand Up @@ -182,7 +194,17 @@ func (ovsif *ovsExec) execWithStdin(cmd string, stdinArgs []string, args ...stri
}

func (ovsif *ovsExec) exec(cmd string, args ...string) (string, error) {
return ovsif.execWithStdin(cmd, nil, args...)
var output string
var err error
return output, utilwait.ExponentialBackoff(ovsBackoff, func() (bool, error) {
output, err = ovsif.execWithStdin(cmd, nil, args...)
if err == nil {
metrics.OVSOperationsResult.WithLabelValues(metrics.OVSOperationSuccess).Inc()
return true, nil
}
metrics.OVSOperationsResult.WithLabelValues(metrics.OVSOperationFailure).Inc()
return false, nil
})
}

func validateColumns(columns ...string) error {
Expand Down Expand Up @@ -403,6 +425,15 @@ func (ovsif *ovsExec) bundle(flows []string) error {
return err
}

func (ovsif *ovsExec) UpdateOVSMetrics() {
flows, err := ovsif.DumpFlows("")
if err == nil {
metrics.OVSFlows.Set(float64(len(flows)))
} else {
utilruntime.HandleError(fmt.Errorf("failed to dump OVS flows for metrics: %v", err))
}
}

// ovsExecTx implements ovs.Transaction and maintains current flow context
type ovsExecTx struct {
ovsif *ovsExec
Expand All @@ -424,9 +455,16 @@ func (tx *ovsExecTx) DeleteFlows(flow string, args ...interface{}) {
}

func (tx *ovsExecTx) Commit() error {
err := tx.ovsif.bundle(tx.flows)

// Reset flow context
tx.flows = []string{}
return err
defer func() {
tx.flows = []string{}
}()
return utilwait.ExponentialBackoff(ovsBackoff, func() (bool, error) {
err := tx.ovsif.bundle(tx.flows)
if err == nil {
metrics.OVSOperationsResult.WithLabelValues(metrics.OVSOperationSuccess).Inc()
return true, nil
}
metrics.OVSOperationsResult.WithLabelValues(metrics.OVSOperationFailure).Inc()
return false, nil
})
}
22 changes: 21 additions & 1 deletion pkg/network/node/ovs/ovs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func TestTransactionSuccess(t *testing.T) {
ensureTestResults(t, fexec)

// Test Failed transaction
fakeCmd = addTestResult(t, fexec, "ovs-ofctl -O OpenFlow13 bundle br0 -", "", fmt.Errorf("Something bad happened"))
for i := 0; i < ovsBackoff.Steps; i++ {
fakeCmd = addTestResult(t, fexec, "ovs-ofctl -O OpenFlow13 bundle br0 -", "", fmt.Errorf("Something bad happened"))
}
otx = ovsif.NewTransaction()
otx.AddFlow("flow1")
otx.DeleteFlows("flow2")
Expand All @@ -122,6 +124,24 @@ func TestTransactionSuccess(t *testing.T) {
"flow delete flow2",
}
ensureInputFlows(t, fakeCmd, expectedInputFlows)

// Test a couple of failed transaction, but that finally pass
for i := 0; i < ovsBackoff.Steps-1; i++ {
fakeCmd = addTestResult(t, fexec, "ovs-ofctl -O OpenFlow13 bundle br0 -", "", fmt.Errorf("Something bad happened"))
}
fakeCmd = addTestResult(t, fexec, "ovs-ofctl -O OpenFlow13 bundle br0 -", "", nil)
otx = ovsif.NewTransaction()
otx.AddFlow("flow1")
otx.DeleteFlows("flow2")
if err = otx.Commit(); err != nil {
t.Fatalf("Unexpected error from command: %v", err)
}
ensureTestResults(t, fexec)
expectedInputFlows = []string{
"flow add flow1",
"flow delete flow2",
}
ensureInputFlows(t, fakeCmd, expectedInputFlows)
}

func TestDumpFlows(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/network/node/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
networkv1 "github.com/openshift/api/network/v1"
"github.com/openshift/sdn/pkg/network/common"
"github.com/openshift/sdn/pkg/network/node/cniserver"
metrics "github.com/openshift/sdn/pkg/network/node/metrics"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
Expand Down Expand Up @@ -273,7 +274,7 @@ func (m *podManager) processRequest(request *cniserver.PodRequest) *cniserver.Po
}
if err != nil {
klog.Warningf("CNI_ADD %s failed: %v", pk, err)
PodOperationsErrors.WithLabelValues(PodOperationSetup).Inc()
metrics.PodOperationsErrors.WithLabelValues(metrics.PodOperationSetup).Inc()
result.Err = err
}
case cniserver.CNI_UPDATE:
Expand All @@ -296,7 +297,7 @@ func (m *podManager) processRequest(request *cniserver.PodRequest) *cniserver.Po
result.Err = m.podHandler.teardown(request)
if result.Err != nil {
klog.Warningf("CNI_DEL %s failed: %v", pk, result.Err)
PodOperationsErrors.WithLabelValues(PodOperationTeardown).Inc()
metrics.PodOperationsErrors.WithLabelValues(metrics.PodOperationTeardown).Inc()
}
default:
result.Err = fmt.Errorf("unhandled CNI request %v", request.Command)
Expand Down Expand Up @@ -459,7 +460,7 @@ func podIsExited(p *kcontainer.Pod) bool {

// Set up all networking (host/container veth, OVS flows, IPAM, loopback, etc)
func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *runningPod, error) {
defer PodOperationsLatency.WithLabelValues(PodOperationSetup).Observe(sinceInMicroseconds(time.Now()))
defer metrics.PodOperationsLatency.WithLabelValues(metrics.PodOperationSetup).Observe(metrics.SinceInMicroseconds(time.Now()))

// Release any IPAM allocations if the setup failed
var success bool
Expand Down Expand Up @@ -521,7 +522,7 @@ func (m *podManager) update(req *cniserver.PodRequest) (uint32, error) {

// Clean up all pod networking (clear OVS flows, release IPAM lease, remove host/container veth)
func (m *podManager) teardown(req *cniserver.PodRequest) error {
defer PodOperationsLatency.WithLabelValues(PodOperationTeardown).Observe(sinceInMicroseconds(time.Now()))
defer metrics.PodOperationsLatency.WithLabelValues(metrics.PodOperationTeardown).Observe(metrics.SinceInMicroseconds(time.Now()))

errList := []error{}

Expand Down
3 changes: 2 additions & 1 deletion pkg/network/node/vnids.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"k8s.io/klog"

metrics "github.com/openshift/sdn/pkg/network/node/metrics"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (vmap *nodeVNIDMap) WaitAndGetVNID(name string) (uint32, error) {
// We may find netid when we check with api server but we will
// still treat this as an error if we don't find it in vnid map.
// So that we can imply insufficient timeout if we see many VnidNotFoundErrors.
VnidNotFoundErrors.Inc()
metrics.VnidNotFoundErrors.Inc()

netns, err := vmap.networkClient.NetworkV1().NetNamespaces().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
Expand Down

0 comments on commit b71c64a

Please sign in to comment.