Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1903660: Handle pruning of unhealthy db files on disk #406

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 62 additions & 1 deletion go-controller/pkg/ovndbmanager/ovndbmanager.go
Expand Up @@ -4,9 +4,11 @@ import (
"errors"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,6 +20,11 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
)

// retry counters for cluster statuses
var nbClusterStatusRetryCnt, sbClusterStatusRetryCnt int32

const maxClusterStatusRetry = 10

func RunDBChecker(kclient kube.Interface, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Info("Starting DB Checker to ensure cluster membership and DB consistency")
Expand Down Expand Up @@ -75,13 +82,14 @@ func ensureOvnDBState(db string, kclient kube.Interface, stopCh <-chan struct{})
func ensureLocalRaftServerID(db string) {
var dbName string
var appCtl func(args ...string) (string, string, error)

clusterStatusRetryCnt := &nbClusterStatusRetryCnt
if strings.Contains(db, "ovnnb") {
dbName = "OVN_Northbound"
appCtl = util.RunOVNNBAppCtl
} else {
dbName = "OVN_Southbound"
appCtl = util.RunOVNSBAppCtl
clusterStatusRetryCnt = &sbClusterStatusRetryCnt
}

out, stderr, err := util.RunOVSDBTool("db-sid", db)
Expand All @@ -98,8 +106,19 @@ func ensureLocalRaftServerID(db string) {
out, stderr, err = appCtl("cluster/status", dbName)
if err != nil {
klog.Warningf("Unable to get cluster status for: %s, stderr: %v, err: %v", db, stderr, err)
if atomic.LoadInt32(clusterStatusRetryCnt) > maxClusterStatusRetry {
//delete the db file and start master
resetRaftDB(db)
atomic.StoreInt32(clusterStatusRetryCnt, 0)
} else {
atomic.AddInt32(clusterStatusRetryCnt, 1)
klog.Infof("Failed to get cluster status for: %s, number of retries: %d", db, *clusterStatusRetryCnt)
}
return
}
// on retrieving cluster/status successfully reset the retry counter.
atomic.StoreInt32(clusterStatusRetryCnt, 0)

r, _ := regexp.Compile(`Address: *((ssl|tcp):[?[a-z0-9.:]+]?)`)
matches := r.FindStringSubmatch(out)
if len(matches) < 2 {
Expand Down Expand Up @@ -133,6 +152,7 @@ func ensureClusterRaftMembership(db string, kclient kube.Interface) {

var dbName string
var appCtl func(args ...string) (string, string, error)
clusterStatusRetryCnt := &nbClusterStatusRetryCnt

if strings.Contains(db, "ovnnb") {
dbName = "OVN_Northbound"
Expand All @@ -142,6 +162,7 @@ func ensureClusterRaftMembership(db string, kclient kube.Interface) {
dbName = "OVN_Southbound"
appCtl = util.RunOVNSBAppCtl
knownMembers = strings.Split(config.OvnSouth.Address, ",")
clusterStatusRetryCnt = &sbClusterStatusRetryCnt
}
for _, knownMember := range knownMembers {
server := strings.Split(knownMember, ":")
Expand All @@ -154,8 +175,19 @@ func ensureClusterRaftMembership(db string, kclient kube.Interface) {
out, stderr, err := appCtl("cluster/status", dbName)
if err != nil {
klog.Warningf("Unable to get cluster status for: %s, stderr: %v, err: %v", db, stderr, err)
if atomic.LoadInt32(clusterStatusRetryCnt) > maxClusterStatusRetry {
//delete the db file and start master
resetRaftDB(db)
atomic.StoreInt32(clusterStatusRetryCnt, 0)
} else {
atomic.AddInt32(clusterStatusRetryCnt, 1)
klog.Infof("Failed to get cluster status for: %s, number of retries: %d", db, *clusterStatusRetryCnt)
}
return
}
// on retrieving cluster/status successfully reset the retry counter.
atomic.StoreInt32(clusterStatusRetryCnt, 0)

r, _ := regexp.Compile(`([a-z0-9]{4}) at ((ssl|tcp):\[?[a-z0-9.:]+\]?)`)
members := r.FindAllStringSubmatch(out, -1)
kickedMembersCount := 0
Expand Down Expand Up @@ -209,3 +241,32 @@ func ensureClusterRaftMembership(db string, kclient kube.Interface) {
}
}
}

func resetRaftDB(db string) {
// backup the db by renaming it and then stop the nb/sb ovsdb process.
dbFile := filepath.Base(db)
backupFile := strings.TrimSuffix(dbFile, filepath.Ext(dbFile)) +
time.Now().UTC().Format("2006-01-02_150405") + "db_bak"
backupDB := filepath.Join(filepath.Dir(db), backupFile)
err := os.Rename(db, backupDB)
if err != nil {
klog.Warningf("Failed to back up the db to backupFile: %s", backupFile)
} else {
klog.Infof("Backed up the db to backupFile: %s", backupFile)
var dbName string
var appCtl func(args ...string) (string, string, error)
if strings.Contains(db, "ovnnb") {
dbName = "OVN_Northbound"
appCtl = util.RunOVNNBAppCtl
} else {
dbName = "OVN_Southbound"
appCtl = util.RunOVNSBAppCtl
}
_, stderr, err := appCtl("exit")
if err != nil {
klog.Warningf("Unable to restart the ovn db: %s ,"+
"stderr: %v, err: %v", dbName, stderr, err)
}
klog.Infof("Stopped %s db after backing up the db: %s", dbName, backupFile)
}
}