Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlserver compression / encryption #1161

Merged
merged 1 commit into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions cmd/sqlserver/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
const backupPushShortDescription = "Creates new backup and pushes it to the storage"

var backupPushDatabases []string
var backupCompression bool
var backupUpdateLatest bool

var backupPushCmd = &cobra.Command{
Use: "backup-push",
Short: backupPushShortDescription,
Run: func(cmd *cobra.Command, args []string) {
sqlserver.HandleBackupPush(backupPushDatabases, backupUpdateLatest, backupCompression)
sqlserver.HandleBackupPush(backupPushDatabases, backupUpdateLatest)
},
}

Expand All @@ -24,7 +23,5 @@ func init() {
"List of databases to backup. All not-system databases as default")
backupPushCmd.PersistentFlags().BoolVarP(&backupUpdateLatest, "update-latest", "u", false,
"Update latest backup instead of creating new one")
backupPushCmd.PersistentFlags().BoolVarP(&backupCompression, "compression", "c", true,
"Use built-in backup compression. Enabled by default")
cmd.AddCommand(backupPushCmd)
}
5 changes: 1 addition & 4 deletions cmd/sqlserver/log_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,19 @@ import (
const logPushShortDescription = "Creates new log backup and pushes it to the storage"

var logPushDatabases []string
var logCompression bool
var logNoRecovery bool

var logPushCmd = &cobra.Command{
Use: "log-push",
Short: logPushShortDescription,
Run: func(cmd *cobra.Command, args []string) {
sqlserver.HandleLogPush(logPushDatabases, logCompression, logNoRecovery)
sqlserver.HandleLogPush(logPushDatabases, logNoRecovery)
},
}

func init() {
logPushCmd.PersistentFlags().StringSliceVarP(&logPushDatabases, "databases", "d", []string{},
"List of databases to log. All not-system databases as default")
logPushCmd.PersistentFlags().BoolVarP(&logCompression, "compression", "c", true,
"Use built-in log compression. Enabled by default")
logPushCmd.PersistentFlags().BoolVarP(&logNoRecovery, "no-recovery", "n", false,
"Do a tail-log backup leaving database closed for further modifications")
cmd.AddCommand(logPushCmd)
Expand Down
11 changes: 7 additions & 4 deletions internal/databases/sqlserver/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"os"
"syscall"

"github.com/wal-g/wal-g/internal/databases/sqlserver/blob"

"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/utility"
)

func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
func HandleBackupPush(dbnames []string, updateLatest bool) {
ctx, cancel := context.WithCancel(context.Background())
signalHandler := utility.NewSignalHandler(ctx, cancel, []os.Signal{syscall.SIGINT, syscall.SIGTERM})
defer func() { _ = signalHandler.Close() }()
Expand Down Expand Up @@ -52,8 +54,9 @@ func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
StartLocalTime: timeStart,
}
}
builtinCompression := blob.UseBuiltinCompression()
err = runParallel(func(i int) error {
return backupSingleDatabase(ctx, db, backupName, dbnames[i], compression)
return backupSingleDatabase(ctx, db, backupName, dbnames[i], builtinCompression)
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall backup failed: %v", err)

Expand All @@ -66,7 +69,7 @@ func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
tracelog.InfoLogger.Printf("backup finished")
}

func backupSingleDatabase(ctx context.Context, db *sql.DB, backupName string, dbname string, compression bool) error {
func backupSingleDatabase(ctx context.Context, db *sql.DB, backupName string, dbname string, builtinCompression bool) error {
baseURL := getDatabaseBackupURL(backupName, dbname)
size, blobCount, err := estimateDBSize(db, dbname)
if err != nil {
Expand All @@ -76,7 +79,7 @@ func backupSingleDatabase(ctx context.Context, db *sql.DB, backupName string, db
urls := buildBackupUrls(baseURL, blobCount)
sql := fmt.Sprintf("BACKUP DATABASE %s TO %s", quoteName(dbname), urls)
sql += fmt.Sprintf(" WITH FORMAT, MAXTRANSFERSIZE=%d", MaxTransferSize)
if compression {
if builtinCompression {
sql += ", COMPRESSION"
}
tracelog.InfoLogger.Printf("starting backup database [%s] to %s", dbname, urls)
Expand Down
16 changes: 9 additions & 7 deletions internal/databases/sqlserver/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ const BgSaveInterval = 30 * time.Second

type Index struct {
sync.Mutex
folder storage.Folder
Size uint64 `json:"size"`
Blocks []*Block `json:"blocks"`
icache map[string]*Block // cache by id's
ocache []*Block // cache by offset, ordered, only committed
needSave bool
readCache *lru.Cache
folder storage.Folder
Size uint64 `json:"size"`
Blocks []*Block `json:"blocks"`
Compression string `json:"compression"`
Encryption string `json:"encryption"`
icache map[string]*Block // cache by id's
ocache []*Block // cache by offset, ordered, only committed
needSave bool
readCache *lru.Cache
}

type Block struct {
Expand Down
57 changes: 55 additions & 2 deletions internal/databases/sqlserver/blob/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

"github.com/wal-g/wal-g/internal/compression"
"github.com/wal-g/wal-g/internal/crypto"
"golang.org/x/xerrors"

"github.com/gofrs/flock"
Expand Down Expand Up @@ -40,6 +42,11 @@ type Server struct {
leasesMutex sync.Mutex
downloadSem chan struct{}
uploadSem chan struct{}
compression string
compressor compression.Compressor
decompressor compression.Decompressor
encryption string
crypter crypto.Crypter
}

func NewServer(folder storage.Folder) (*Server, error) {
Expand Down Expand Up @@ -72,6 +79,21 @@ func NewServer(folder storage.Folder) (*Server, error) {
bs.server = http.Server{Addr: bs.endpoint, Handler: bs}
bs.indexes = make(map[string]*Index)
bs.leases = make(map[string]Lease)
compressor, err := internal.ConfigureCompressor()
if err != nil {
if _, ok := err.(internal.UnknownCompressionMethodError); !ok || !UseBuiltinCompression() {
return nil, err
}
}
if compressor != nil {
bs.compression = compressor.FileExtension()
bs.compressor = compressor
bs.decompressor = compression.FindDecompressor(bs.compression)
}
bs.crypter = internal.ConfigureCrypter()
if bs.crypter != nil {
bs.encryption = bs.crypter.Name()
}
return bs, nil
}

Expand Down Expand Up @@ -372,6 +394,9 @@ func (bs *Server) HandleBlockPut(w http.ResponseWriter, req *http.Request) {
bs.returnError(w, req, err)
return
}
if err := bs.validateBlobCompressionEncryption(idx); err != nil {
bs.returnError(w, req, err)
}
blockID := strings.TrimSpace(req.Form.Get("blockid"))
blockSizeStr := req.Header.Get("Content-Length")
blockSize, err := strconv.ParseUint(blockSizeStr, 10, 64)
Expand All @@ -381,7 +406,7 @@ func (bs *Server) HandleBlockPut(w http.ResponseWriter, req *http.Request) {
}
filename := idx.PutBlock(blockID, blockSize)
bs.uploadSem <- struct{}{}
err = folder.PutObject(filename, req.Body)
err = folder.PutObject(filename, internal.CompressAndEncrypt(req.Body, bs.compressor, bs.crypter))
<-bs.uploadSem
req.Body.Close()
if err != nil {
Expand Down Expand Up @@ -502,6 +527,14 @@ func (bs *Server) HandleBlobGet(w http.ResponseWriter, req *http.Request) {
bs.returnError(w, req, err)
return
}
if idx.Compression != "" || idx.Encryption != "" {
err = bs.validateBlobCompressionEncryption(idx)
if err != nil {
tracelog.ErrorLogger.Printf("proxy: misconfiguration: %v", err)
bs.returnError(w, req, err)
return
}
}

rangeMin := uint64(0)
rangeMax := idx.Size - 1
Expand Down Expand Up @@ -537,6 +570,13 @@ func (bs *Server) HandleBlobGet(w http.ResponseWriter, req *http.Request) {
tracelog.ErrorLogger.Printf("proxy: failed to read object from storage: %v", err)
break
}
if idx.Compression != "" || idx.Encryption != "" {
r, err = internal.DecompressDecryptBytes(r, bs.decompressor)
if err != nil {
tracelog.ErrorLogger.Printf("proxy: failed to decompress / decrypt bytes: %v", err)
break
}
}
r2 := io.LimitReader(NewSkipReader(r, s.Offset), int64(s.Limit))
_, err = io.Copy(w, r2)
r.Close()
Expand All @@ -552,6 +592,8 @@ func (bs *Server) HandleBlobPut(w http.ResponseWriter, req *http.Request) {
idx, err := bs.loadBlobIndex(folder)
if err == ErrNotFound {
idx = NewIndex(folder)
idx.Compression = bs.compression
idx.Encryption = bs.encryption
} else if err != nil {
bs.returnError(w, req, err)
return
Expand All @@ -569,7 +611,8 @@ func (bs *Server) HandleBlobPut(w http.ResponseWriter, req *http.Request) {
garbage := idx.Clear()
if blobSize > 0 {
name := idx.PutBlock("data", blobSize)
err := folder.PutObject(name, req.Body)
encryptedReader := internal.CompressAndEncrypt(req.Body, bs.compressor, bs.crypter)
err := folder.PutObject(name, encryptedReader)
req.Body.Close()
if err != nil {
bs.returnError(w, req, err)
Expand Down Expand Up @@ -698,3 +741,13 @@ func (bs *Server) AcquireLock() (io.Closer, error) {
}
return lock, nil
}

func (bs *Server) validateBlobCompressionEncryption(idx *Index) error {
if idx.Encryption != bs.encryption {
return fmt.Errorf("blob encryption (%s) does not match configured (%s)", idx.Encryption, bs.encryption)
}
if idx.Compression != bs.compression {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume the following case: Index with some encryption configured but without compression. Server has both encryption and compression enabled. Encryption check is passed, but compressors are obviously mismatched. In this setup, WAL-G will fail to download the uncompressed encrypted file. Maybe allow this scenario (with some warnings)?

Also, maybe go even further and allow to download the file using all currently supported compressors without the requirement that it should match the currently configured one? WAL-G supports this behavior, at least in Postgres (see DownloadAndDecompressStorageFile).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing compression algorithm - one part of the problem. The second one is changing encryption.
The proper solution is to keep metainfo with files (as file extension or in some JSON) and choose decompressor/crypter by it's name. But it's not possible for the crypter at the moment.

I wouldn't like to solve only half of the problem, especially it's pretty hypothetical for the SQLServer.

return fmt.Errorf("blob compression (%s) does not match configured (%s)", idx.Compression, bs.compression)
}
return nil
}
9 changes: 9 additions & 0 deletions internal/databases/sqlserver/blob/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
)

var ErrNoLease = errors.New("no lease")
Expand Down Expand Up @@ -64,3 +66,10 @@ func (r *SkipReader) Read(s []byte) (int, error) {
}
return r.reader.Read(s)
}

const SQLServerCompressionMethod = "sqlserver"

func UseBuiltinCompression() bool {
method, _ := internal.GetSetting(internal.CompressionMethodSetting)
return strings.EqualFold(method, SQLServerCompressionMethod)
}
11 changes: 7 additions & 4 deletions internal/databases/sqlserver/log_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"os"
"syscall"

"github.com/wal-g/wal-g/internal/databases/sqlserver/blob"

"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/utility"
)

func HandleLogPush(dbnames []string, compression bool, norecovery bool) {
func HandleLogPush(dbnames []string, norecovery bool) {
ctx, cancel := context.WithCancel(context.Background())
signalHandler := utility.NewSignalHandler(ctx, cancel, []os.Signal{syscall.SIGINT, syscall.SIGTERM})
defer func() { _ = signalHandler.Close() }()
Expand All @@ -32,16 +34,17 @@ func HandleLogPush(dbnames []string, compression bool, norecovery bool) {
tracelog.ErrorLogger.FatalOnError(err)
defer lock.Close()

builtinCompression := blob.UseBuiltinCompression()
logBackupName := generateLogBackupName()
err = runParallel(func(i int) error {
return backupSingleLog(ctx, db, logBackupName, dbnames[i], compression, norecovery)
return backupSingleLog(ctx, db, logBackupName, dbnames[i], builtinCompression, norecovery)
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log backup failed: %v", err)

tracelog.InfoLogger.Printf("log backup finished")
}

func backupSingleLog(ctx context.Context, db *sql.DB, backupName string, dbname string, compression bool, noRecovery bool) error {
func backupSingleLog(ctx context.Context, db *sql.DB, backupName string, dbname string, builtinCompression bool, noRecovery bool) error {
baseURL := getLogBackupURL(backupName, dbname)
size, blobCount, err := estimateLogSize(db, dbname)
if err != nil {
Expand All @@ -51,7 +54,7 @@ func backupSingleLog(ctx context.Context, db *sql.DB, backupName string, dbname
urls := buildBackupUrls(baseURL, blobCount)
sql := fmt.Sprintf("BACKUP LOG %s TO %s", quoteName(dbname), urls)
sql += fmt.Sprintf(" WITH FORMAT, MAXTRANSFERSIZE=%d", MaxTransferSize)
if compression {
if builtinCompression {
sql += ", COMPRESSION"
}
if noRecovery {
Expand Down
4 changes: 4 additions & 0 deletions internal/fetch_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func DecompressDecryptBytes(archiveReader io.Reader, decompressor compression.De
if err != nil {
return nil, err
}
if decompressor == nil {
tracelog.DebugLogger.Printf("No decompressor has been selected")
return ioutil.NopCloser(decryptReader), nil
}
return decompressor.Decompress(decryptReader)
}

Expand Down