Skip to content

Commit

Permalink
sqlserver compression / encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
mialinx committed Nov 29, 2021
1 parent ce0e1ad commit 84f5802
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 25 deletions.
5 changes: 1 addition & 4 deletions cmd/sqlserver/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
const backupPushShortDescription = "Creates new backup and pushes it to the storage"

var backupPushDatabases []string
var backupCompression bool
var backupUpdateLatest bool

var backupPushCmd = &cobra.Command{
Use: "backup-push",
Short: backupPushShortDescription,
Run: func(cmd *cobra.Command, args []string) {
sqlserver.HandleBackupPush(backupPushDatabases, backupUpdateLatest, backupCompression)
sqlserver.HandleBackupPush(backupPushDatabases, backupUpdateLatest)
},
}

Expand All @@ -24,7 +23,5 @@ func init() {
"List of databases to backup. All not-system databases as default")
backupPushCmd.PersistentFlags().BoolVarP(&backupUpdateLatest, "update-latest", "u", false,
"Update latest backup instead of creating new one")
backupPushCmd.PersistentFlags().BoolVarP(&backupCompression, "compression", "c", true,
"Use built-in backup compression. Enabled by default")
cmd.AddCommand(backupPushCmd)
}
5 changes: 1 addition & 4 deletions cmd/sqlserver/log_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,19 @@ import (
const logPushShortDescription = "Creates new log backup and pushes it to the storage"

var logPushDatabases []string
var logCompression bool
var logNoRecovery bool

var logPushCmd = &cobra.Command{
Use: "log-push",
Short: logPushShortDescription,
Run: func(cmd *cobra.Command, args []string) {
sqlserver.HandleLogPush(logPushDatabases, logCompression, logNoRecovery)
sqlserver.HandleLogPush(logPushDatabases, logNoRecovery)
},
}

func init() {
logPushCmd.PersistentFlags().StringSliceVarP(&logPushDatabases, "databases", "d", []string{},
"List of databases to log. All not-system databases as default")
logPushCmd.PersistentFlags().BoolVarP(&logCompression, "compression", "c", true,
"Use built-in log compression. Enabled by default")
logPushCmd.PersistentFlags().BoolVarP(&logNoRecovery, "no-recovery", "n", false,
"Do a tail-log backup leaving database closed for further modifications")
cmd.AddCommand(logPushCmd)
Expand Down
9 changes: 5 additions & 4 deletions internal/databases/sqlserver/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/wal-g/wal-g/utility"
)

func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
func HandleBackupPush(dbnames []string, updateLatest bool) {
ctx, cancel := context.WithCancel(context.Background())
signalHandler := utility.NewSignalHandler(ctx, cancel, []os.Signal{syscall.SIGINT, syscall.SIGTERM})
defer func() { _ = signalHandler.Close() }()
Expand Down Expand Up @@ -52,8 +52,9 @@ func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
StartLocalTime: timeStart,
}
}
builtinCompression := UseBuiltinCompression()
err = runParallel(func(i int) error {
return backupSingleDatabase(ctx, db, backupName, dbnames[i], compression)
return backupSingleDatabase(ctx, db, backupName, dbnames[i], builtinCompression)
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall backup failed: %v", err)

Expand All @@ -66,7 +67,7 @@ func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
tracelog.InfoLogger.Printf("backup finished")
}

func backupSingleDatabase(ctx context.Context, db *sql.DB, backupName string, dbname string, compression bool) error {
func backupSingleDatabase(ctx context.Context, db *sql.DB, backupName string, dbname string, builtinCompression bool) error {
baseURL := getDatabaseBackupURL(backupName, dbname)
size, blobCount, err := estimateDBSize(db, dbname)
if err != nil {
Expand All @@ -76,7 +77,7 @@ func backupSingleDatabase(ctx context.Context, db *sql.DB, backupName string, db
urls := buildBackupUrls(baseURL, blobCount)
sql := fmt.Sprintf("BACKUP DATABASE %s TO %s", quoteName(dbname), urls)
sql += fmt.Sprintf(" WITH FORMAT, MAXTRANSFERSIZE=%d", MaxTransferSize)
if compression {
if builtinCompression {
sql += ", COMPRESSION"
}
tracelog.InfoLogger.Printf("starting backup database [%s] to %s", dbname, urls)
Expand Down
16 changes: 9 additions & 7 deletions internal/databases/sqlserver/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ const BgSaveInterval = 30 * time.Second

type Index struct {
sync.Mutex
folder storage.Folder
Size uint64 `json:"size"`
Blocks []*Block `json:"blocks"`
icache map[string]*Block // cache by id's
ocache []*Block // cache by offset, ordered, only committed
needSave bool
readCache *lru.Cache
folder storage.Folder
Size uint64 `json:"size"`
Blocks []*Block `json:"blocks"`
Compression string `json:"compression"`
Encryption string `json:"encryption"`
icache map[string]*Block // cache by id's
ocache []*Block // cache by offset, ordered, only committed
needSave bool
readCache *lru.Cache
}

type Block struct {
Expand Down
59 changes: 57 additions & 2 deletions internal/databases/sqlserver/blob/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"sync"
"time"

"github.com/wal-g/wal-g/internal/compression"
"github.com/wal-g/wal-g/internal/crypto"
"github.com/wal-g/wal-g/internal/databases/sqlserver"

"golang.org/x/xerrors"

"github.com/gofrs/flock"
Expand Down Expand Up @@ -40,6 +44,11 @@ type Server struct {
leasesMutex sync.Mutex
downloadSem chan struct{}
uploadSem chan struct{}
compression string
compressor compression.Compressor
decompressor compression.Decompressor
encryption string
crypter crypto.Crypter
}

func NewServer(folder storage.Folder) (*Server, error) {
Expand Down Expand Up @@ -72,6 +81,21 @@ func NewServer(folder storage.Folder) (*Server, error) {
bs.server = http.Server{Addr: bs.endpoint, Handler: bs}
bs.indexes = make(map[string]*Index)
bs.leases = make(map[string]Lease)
compressor, err := internal.ConfigureCompressor()
if err != nil {
if _, ok := err.(internal.UnknownCompressionMethodError); !ok || !sqlserver.UseBuiltinCompression() {
return nil, err
}
}
if compressor != nil {
bs.compression = compressor.FileExtension()
bs.compressor = compressor
bs.decompressor = compression.FindDecompressor(bs.compression)
}
bs.crypter = internal.ConfigureCrypter()
if bs.crypter != nil {
bs.encryption = bs.crypter.Name()
}
return bs, nil
}

Expand Down Expand Up @@ -372,6 +396,9 @@ func (bs *Server) HandleBlockPut(w http.ResponseWriter, req *http.Request) {
bs.returnError(w, req, err)
return
}
if err := bs.validateBlobCompressionEncryption(idx); err != nil {
bs.returnError(w, req, err)
}
blockID := strings.TrimSpace(req.Form.Get("blockid"))
blockSizeStr := req.Header.Get("Content-Length")
blockSize, err := strconv.ParseUint(blockSizeStr, 10, 64)
Expand All @@ -381,7 +408,7 @@ func (bs *Server) HandleBlockPut(w http.ResponseWriter, req *http.Request) {
}
filename := idx.PutBlock(blockID, blockSize)
bs.uploadSem <- struct{}{}
err = folder.PutObject(filename, req.Body)
err = folder.PutObject(filename, internal.CompressAndEncrypt(req.Body, bs.compressor, bs.crypter))
<-bs.uploadSem
req.Body.Close()
if err != nil {
Expand Down Expand Up @@ -502,6 +529,14 @@ func (bs *Server) HandleBlobGet(w http.ResponseWriter, req *http.Request) {
bs.returnError(w, req, err)
return
}
if idx.Compression != "" || idx.Encryption != "" {
err = bs.validateBlobCompressionEncryption(idx)
if err != nil {
tracelog.ErrorLogger.Printf("proxy: misconfiguration: %v", err)
bs.returnError(w, req, err)
return
}
}

rangeMin := uint64(0)
rangeMax := idx.Size - 1
Expand Down Expand Up @@ -537,6 +572,13 @@ func (bs *Server) HandleBlobGet(w http.ResponseWriter, req *http.Request) {
tracelog.ErrorLogger.Printf("proxy: failed to read object from storage: %v", err)
break
}
if idx.Compression != "" || idx.Encryption != "" {
r, err = internal.DecompressDecryptBytes(r, bs.decompressor)
if err != nil {
tracelog.ErrorLogger.Printf("proxy: failed to decompress / decrypt bytes: %v", err)
break
}
}
r2 := io.LimitReader(NewSkipReader(r, s.Offset), int64(s.Limit))
_, err = io.Copy(w, r2)
r.Close()
Expand All @@ -552,6 +594,8 @@ func (bs *Server) HandleBlobPut(w http.ResponseWriter, req *http.Request) {
idx, err := bs.loadBlobIndex(folder)
if err == ErrNotFound {
idx = NewIndex(folder)
idx.Compression = bs.compression
idx.Encryption = bs.encryption
} else if err != nil {
bs.returnError(w, req, err)
return
Expand All @@ -569,7 +613,8 @@ func (bs *Server) HandleBlobPut(w http.ResponseWriter, req *http.Request) {
garbage := idx.Clear()
if blobSize > 0 {
name := idx.PutBlock("data", blobSize)
err := folder.PutObject(name, req.Body)
encryptedReader := internal.CompressAndEncrypt(req.Body, bs.compressor, bs.crypter)
err := folder.PutObject(name, encryptedReader)
req.Body.Close()
if err != nil {
bs.returnError(w, req, err)
Expand Down Expand Up @@ -698,3 +743,13 @@ func (bs *Server) AcquireLock() (io.Closer, error) {
}
return lock, nil
}

func (bs *Server) validateBlobCompressionEncryption(idx *Index) error {
if idx.Encryption != bs.encryption {
return fmt.Errorf("blob encryption (%s) does not match configured (%s)", idx.Encryption, bs.encryption)
}
if idx.Compression != bs.compression {
return fmt.Errorf("blob compression (%s) does not match configured (%s)", idx.Compression, bs.compression)
}
return nil
}
9 changes: 5 additions & 4 deletions internal/databases/sqlserver/log_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/wal-g/wal-g/utility"
)

func HandleLogPush(dbnames []string, compression bool, norecovery bool) {
func HandleLogPush(dbnames []string, norecovery bool) {
ctx, cancel := context.WithCancel(context.Background())
signalHandler := utility.NewSignalHandler(ctx, cancel, []os.Signal{syscall.SIGINT, syscall.SIGTERM})
defer func() { _ = signalHandler.Close() }()
Expand All @@ -32,16 +32,17 @@ func HandleLogPush(dbnames []string, compression bool, norecovery bool) {
tracelog.ErrorLogger.FatalOnError(err)
defer lock.Close()

builtinCompression := UseBuiltinCompression()
logBackupName := generateLogBackupName()
err = runParallel(func(i int) error {
return backupSingleLog(ctx, db, logBackupName, dbnames[i], compression, norecovery)
return backupSingleLog(ctx, db, logBackupName, dbnames[i], builtinCompression, norecovery)
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log backup failed: %v", err)

tracelog.InfoLogger.Printf("log backup finished")
}

func backupSingleLog(ctx context.Context, db *sql.DB, backupName string, dbname string, compression bool, noRecovery bool) error {
func backupSingleLog(ctx context.Context, db *sql.DB, backupName string, dbname string, builtinCompression bool, noRecovery bool) error {
baseURL := getLogBackupURL(backupName, dbname)
size, blobCount, err := estimateLogSize(db, dbname)
if err != nil {
Expand All @@ -51,7 +52,7 @@ func backupSingleLog(ctx context.Context, db *sql.DB, backupName string, dbname
urls := buildBackupUrls(baseURL, blobCount)
sql := fmt.Sprintf("BACKUP LOG %s TO %s", quoteName(dbname), urls)
sql += fmt.Sprintf(" WITH FORMAT, MAXTRANSFERSIZE=%d", MaxTransferSize)
if compression {
if builtinCompression {
sql += ", COMPRESSION"
}
if noRecovery {
Expand Down
7 changes: 7 additions & 0 deletions internal/databases/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const MaxBlobSize = MaxTransferSize * MaxBlocksPerBlob

const BlobNamePrefix = "blob_"

const SQLServerCompressionMethod = "sqlserver"

var SystemDbnames = []string{
"master",
"msdb",
Expand Down Expand Up @@ -578,3 +580,8 @@ func IsLogAlreadyApplied(db *sql.DB, databaseName string, logBackupFilePropertie
}
return true, nil
}

func UseBuiltinCompression() bool {
method, _ := internal.GetSetting(internal.CompressionMethodSetting)
return strings.EqualFold(method, SQLServerCompressionMethod)
}
4 changes: 4 additions & 0 deletions internal/fetch_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func DecompressDecryptBytes(archiveReader io.Reader, decompressor compression.De
if err != nil {
return nil, err
}
if decompressor == nil {
tracelog.DebugLogger.Printf("No decompressor has been selected")
return ioutil.NopCloser(decryptReader), nil
}
return decompressor.Decompress(decryptReader)
}

Expand Down

0 comments on commit 84f5802

Please sign in to comment.