Skip to content

Commit

Permalink
sqlserver: limit concurrent database operations
Browse files Browse the repository at this point in the history
  • Loading branch information
mialinx committed Aug 19, 2021
1 parent a636399 commit 9be21a6
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 5 deletions.
9 changes: 9 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
SQLServerBlobKeyFile = "SQLSERVER_BLOB_KEY_FILE"
SQLServerBlobLockFile = "SQLSERVER_BLOB_LOCK_FILE"
SQLServerConnectionString = "SQLSERVER_CONNECTION_STRING"
SQLServerDBConcurency = "SQLSERVER_DB_CONCURENCY"

EndpointSourceSetting = "S3_ENDPOINT_SOURCE"
EndpointPortSetting = "S3_ENDPOINT_PORT"
Expand Down Expand Up @@ -154,6 +155,10 @@ var (
MongoDBLastWriteUpdateInterval: "3s",
}

SQLServerDefaultSettings = map[string]string{
SQLServerDBConcurency: "10",
}

PGDefaultSettings = map[string]string{
PgWalSize: "16",
}
Expand Down Expand Up @@ -308,6 +313,7 @@ var (
SQLServerBlobKeyFile: true,
SQLServerBlobLockFile: true,
SQLServerConnectionString: true,
SQLServerDBConcurency: true,
}

MysqlAllowedSettings = map[string]bool{
Expand All @@ -334,6 +340,7 @@ var (
Turbo bool
)

// nolint: gocyclo
func ConfigureSettings(currentType string) {
if len(defaultConfigValues) == 0 {
defaultConfigValues = commonDefaultConfigValues
Expand All @@ -343,6 +350,8 @@ func ConfigureSettings(currentType string) {
dbSpecificDefaultSettings = PGDefaultSettings
case MONGO:
dbSpecificDefaultSettings = MongoDefaultSettings
case SQLSERVER:
dbSpecificDefaultSettings = SQLServerDefaultSettings
}

for k, v := range dbSpecificDefaultSettings {
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
}
err = runParallel(func(i int) error {
return backupSingleDatabase(ctx, db, backupName, dbnames[i], compression)
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall backup failed: %v", err)

sentinel.StopLocalTime = utility.TimeNowCrossPlatformLocal()
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/backup_restore_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func HandleBackupRestore(backupName string, dbnames []string, fromnames []string
return recoverSingleDatabase(ctx, db, dbname)
}
return nil
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall restore failed: %v", err)

tracelog.InfoLogger.Printf("restore finished")
Expand Down
8 changes: 8 additions & 0 deletions internal/databases/sqlserver/blob/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httputil"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -145,6 +146,13 @@ func (bs *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

func (bs *Server) ServeHTTP2(w http.ResponseWriter, req *http.Request) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
tracelog.ErrorLogger.Printf("proxy server goroutine panic: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
}()
// default headers
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", "0")
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/log_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func HandleLogPush(dbnames []string, compression bool) {
logBackupName := generateLogBackupName()
err = runParallel(func(i int) error {
return backupSingleLog(ctx, db, logBackupName, dbnames[i], compression)
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log backup failed: %v", err)

tracelog.InfoLogger.Printf("log backup finished")
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/log_restore_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func HandleLogRestore(backupName string, untilTS string, dbnames []string, fromn
return recoverSingleDatabase(ctx, db, dbname)
}
return nil
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log restore failed: %v", err)

tracelog.InfoLogger.Printf("log restore finished")
Expand Down
17 changes: 16 additions & 1 deletion internal/databases/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,16 @@ func getLogsSinceBackup(folder storage.Folder, backupName string, stopAt time.Ti
return logNames, nil
}

func runParallel(f func(int) error, cnt int) error {
func runParallel(f func(int) error, cnt int, concurency int) error {
if concurency == 0 {
concurency = cnt
}
sem := make(chan struct{}, concurency)
errs := make(chan error, cnt)
for i := 0; i < cnt; i++ {
go func(i int) {
sem <- struct{}{}
defer func() { <-sem }()
errs <- f(i)
}(i)
}
Expand All @@ -398,6 +404,15 @@ func runParallel(f func(int) error, cnt int) error {
return nil
}

func getDBConcurrency() int {
concurrency, err := internal.GetMaxConcurrency(internal.SQLServerDBConcurency)
if err != nil {
tracelog.WarningLogger.Printf("config error: %v", err)
return blob.DefaultConcurency
}
return concurrency
}

func exclude(src, excl []string) []string {
var res []string
SRC:
Expand Down

0 comments on commit 9be21a6

Please sign in to comment.