Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlserver: limit concurrent database operations #1067

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/blang/semver v3.5.1+incompatible
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cyberdelia/lzo v0.0.0-20171006181345-d85071271a6f
github.com/denisenkom/go-mssqldb v0.9.0
github.com/denisenkom/go-mssqldb v0.10.0
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v1.13.1
github.com/docker/go-connections v0.4.0 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ github.com/cyberdelia/lzo v0.0.0-20171006181345-d85071271a6f/go.mod h1:JlLvEOSII
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.9.0 h1:RSohk2RsiZqLZ0zCjtfn3S4Gp4exhpBWHyQ7D0yGjAk=
github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denisenkom/go-mssqldb v0.10.0 h1:QykgLZBorFE95+gO3u9esLd0BmbvpWp0/waNNZfHBM8=
github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
Expand Down Expand Up @@ -227,7 +227,6 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -329,7 +328,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down Expand Up @@ -629,7 +627,6 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25 h1:OKbAoGs4fGM5cPLlVQLZGYkFC8OnOfgo6tt0Smf9XhM=
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4 h1:kCCpuwSAoYJPkNc6x0xT9yTtV4oKtARo4RGBQWOfg9E=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -763,7 +760,6 @@ google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2M
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
9 changes: 9 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
SQLServerBlobKeyFile = "SQLSERVER_BLOB_KEY_FILE"
SQLServerBlobLockFile = "SQLSERVER_BLOB_LOCK_FILE"
SQLServerConnectionString = "SQLSERVER_CONNECTION_STRING"
SQLServerDBConcurrency = "SQLSERVER_DB_CONCURRENCY"

EndpointSourceSetting = "S3_ENDPOINT_SOURCE"
EndpointPortSetting = "S3_ENDPOINT_PORT"
Expand Down Expand Up @@ -154,6 +155,10 @@ var (
MongoDBLastWriteUpdateInterval: "3s",
}

SQLServerDefaultSettings = map[string]string{
SQLServerDBConcurrency: "10",
}

PGDefaultSettings = map[string]string{
PgWalSize: "16",
}
Expand Down Expand Up @@ -308,6 +313,7 @@ var (
SQLServerBlobKeyFile: true,
SQLServerBlobLockFile: true,
SQLServerConnectionString: true,
SQLServerDBConcurrency: true,
}

MysqlAllowedSettings = map[string]bool{
Expand All @@ -334,6 +340,7 @@ var (
Turbo bool
)

// nolint: gocyclo
func ConfigureSettings(currentType string) {
if len(defaultConfigValues) == 0 {
defaultConfigValues = commonDefaultConfigValues
Expand All @@ -343,6 +350,8 @@ func ConfigureSettings(currentType string) {
dbSpecificDefaultSettings = PGDefaultSettings
case MONGO:
dbSpecificDefaultSettings = MongoDefaultSettings
case SQLSERVER:
dbSpecificDefaultSettings = SQLServerDefaultSettings
}

for k, v := range dbSpecificDefaultSettings {
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
}
err = runParallel(func(i int) error {
return backupSingleDatabase(ctx, db, backupName, dbnames[i], compression)
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall backup failed: %v", err)

sentinel.StopLocalTime = utility.TimeNowCrossPlatformLocal()
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/backup_restore_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func HandleBackupRestore(backupName string, dbnames []string, fromnames []string
return recoverSingleDatabase(ctx, db, dbname)
}
return nil
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall restore failed: %v", err)

tracelog.InfoLogger.Printf("restore finished")
Expand Down
22 changes: 15 additions & 7 deletions internal/databases/sqlserver/blob/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httputil"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand All @@ -25,7 +26,7 @@ const ProxyStartTimeout = 10 * time.Second

const ReqIDHeader = "X-Ms-Request-Id"

const DefaultConcurency = 8
const DefaultConcurrency = 8

type Server struct {
folder storage.Folder
Expand Down Expand Up @@ -57,16 +58,16 @@ func NewServer(folder storage.Folder) (*Server, error) {
if err != nil {
return nil, err
}
downloadConcurency, err := internal.GetMaxDownloadConcurrency()
downloadConcurrency, err := internal.GetMaxDownloadConcurrency()
if err != nil {
downloadConcurency = DefaultConcurency
downloadConcurrency = DefaultConcurrency
}
bs.downloadSem = make(chan struct{}, downloadConcurency)
uploadConcurency, err := internal.GetMaxUploadConcurrency()
bs.downloadSem = make(chan struct{}, downloadConcurrency)
uploadConcurrency, err := internal.GetMaxUploadConcurrency()
if err != nil {
uploadConcurency = DefaultConcurency
uploadConcurrency = DefaultConcurrency
}
bs.uploadSem = make(chan struct{}, uploadConcurency)
bs.uploadSem = make(chan struct{}, uploadConcurrency)
bs.endpoint = fmt.Sprintf("%s:%d", hostname, 443)
bs.server = http.Server{Addr: bs.endpoint, Handler: bs}
bs.indexes = make(map[string]*Index)
Expand Down Expand Up @@ -145,6 +146,13 @@ func (bs *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

func (bs *Server) ServeHTTP2(w http.ResponseWriter, req *http.Request) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
tracelog.ErrorLogger.Printf("proxy server goroutine panic: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
}()
// default headers
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", "0")
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/log_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func HandleLogPush(dbnames []string, compression bool) {
logBackupName := generateLogBackupName()
err = runParallel(func(i int) error {
return backupSingleLog(ctx, db, logBackupName, dbnames[i], compression)
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log backup failed: %v", err)

tracelog.InfoLogger.Printf("log backup finished")
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/log_restore_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func HandleLogRestore(backupName string, untilTS string, dbnames []string, fromn
return recoverSingleDatabase(ctx, db, dbname)
}
return nil
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log restore failed: %v", err)

tracelog.InfoLogger.Printf("log restore finished")
Expand Down
18 changes: 17 additions & 1 deletion internal/databases/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,16 @@ func getLogsSinceBackup(folder storage.Folder, backupName string, stopAt time.Ti
return logNames, nil
}

func runParallel(f func(int) error, cnt int) error {
func runParallel(f func(int) error, cnt int, concurrency int) error {
if concurrency <= 0 {
concurrency = cnt
}
sem := make(chan struct{}, concurrency)
errs := make(chan error, cnt)
for i := 0; i < cnt; i++ {
go func(i int) {
sem <- struct{}{}
defer func() { <-sem }()
errs <- f(i)
}(i)
}
Expand All @@ -398,6 +404,16 @@ func runParallel(f func(int) error, cnt int) error {
return nil
}

func getDBConcurrency() int {
concurrency, err := internal.GetMaxConcurrency(internal.SQLServerDBConcurrency)
if err != nil {
tracelog.WarningLogger.Printf("config error: %v", err)
mialinx marked this conversation as resolved.
Show resolved Hide resolved
tracelog.WarningLogger.Printf("using default db concurrency: %d", blob.DefaultConcurrency)
return blob.DefaultConcurrency
}
return concurrency
}

func exclude(src, excl []string) []string {
var res []string
SRC:
Expand Down