Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add daemon mode for wal-push command #1353

Merged
merged 23 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
375724c
Add hostname to backup-push output
usernamedt Sep 16, 2022
73dc6f5
Change missing AO files metadata message lvl to warn
usernamedt Sep 16, 2022
ba10323
fmt & remove unnecessary debug
usernamedt Sep 17, 2022
f1cd865
Add data catalog size to backup sentinel
usernamedt Sep 20, 2022
2d1f34c
Add data catalog size to backup sentinel (greenplum)
usernamedt Sep 20, 2022
97fcd78
working version
Oct 4, 2022
4b83717
removed daemon flag
Oct 5, 2022
7a9c2d8
communication into one socket connection
Oct 7, 2022
b716637
Merge branch 'wal-g:master' into daemon-master-dev
rogaliiik Oct 7, 2022
ee98bdd
Added daemon handler, errors handled, uploading folder initialized
Oct 7, 2022
ba3a1ad
Merge branch 'daemon-master-dev' of https://github.com/rogaliiik/wal-…
Oct 7, 2022
4da5e24
daemon test
Oct 10, 2022
0d7f569
Socket logic added to HandleDaemon, daemon cmd takes socketPath in args
Oct 10, 2022
7f05253
binary message protocol daemon handler
Oct 14, 2022
bcff731
rename functions
Oct 17, 2022
ec6b490
message type implementation
Oct 17, 2022
1f1af79
SocketMessageType fix, SocketMessageReader implementation
Oct 18, 2022
2c117fa
daemon docker test
Oct 20, 2022
6ffc97c
daemon test compose
Oct 21, 2022
766a6e2
PGDATA added, netcat-openbsd instalation, socket creating check
Oct 24, 2022
6bfd42d
daemon_handler messageHandler unexpected msg type case processing, co…
Oct 25, 2022
6fedfec
Refactoring: error handling, new msg types, rename funcs; cmd descrip…
Oct 27, 2022
e14af01
daemon test archive grep
Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 70 additions & 0 deletions cmd/pg/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package pg

import (
"net"
"os"
"path"

"github.com/wal-g/wal-g/utility"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/asm"
"github.com/wal-g/wal-g/internal/databases/postgres"
)

const DaemonShortDescription = "Uploads a WAL file to storage"
const DaemonSocketName = "/tmp/wal-push.sock"

// daemonCmd represents the daemon command
var daemonCmd = &cobra.Command{
Use: "daemon wal_filepath",
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
Short: DaemonShortDescription, // TODO : improve description
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
uploader, err := postgres.ConfigureWalUploader()
tracelog.ErrorLogger.FatalOnError(err)

archiveStatusManager, err := internal.ConfigureArchiveStatusManager()
if err == nil {
uploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.ArchiveStatusManager = asm.NewNopASM()
}

PGArchiveStatusManager, err := internal.ConfigurePGArchiveStatusManager()
if err == nil {
uploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.PGArchiveStatusManager = asm.NewNopASM()
}

uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)

_ = os.Remove(DaemonSocketName)
l, err := net.Listen("unix", DaemonSocketName)
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tracelog.ErrorLogger.Fatal("listen error:", err)
}
for {
fd, err := l.Accept()
if err != nil {
tracelog.ErrorLogger.Println("accept error:", err)
}

go postgres.HandleDaemon(fd, func(walFileName string) error {
fullPath := path.Join(args[0], walFileName)
tracelog.InfoLogger.Printf("starting wal-push for %s\n", fullPath)
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
return postgres.HandleWALPush(uploader, fullPath)
})
}
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
},
}

func init() {
Cmd.AddCommand(daemonCmd)
}
7 changes: 6 additions & 1 deletion cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/asm"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/utility"
)

const WalPushShortDescription = "Uploads a WAL file to storage"
Expand Down Expand Up @@ -35,7 +36,11 @@ var walPushCmd = &cobra.Command{
uploader.PGArchiveStatusManager = asm.NewNopASM()
}

postgres.HandleWALPush(uploader, args[0])
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
err = postgres.HandleWALPush(uploader, args[0])
if err != nil {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
tracelog.ErrorLogger.FatalOnError(err)
}
},
}

Expand Down
28 changes: 28 additions & 0 deletions docker/pg_tests/scripts/tests/daemon_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/sh
set -e -x
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved

CONFIG_FILE="/tmp/configs/wal_perftest_config.json"

COMMON_CONFIG="/tmp/configs/common_config.json"
TMP_CONFIG="/tmp/configs/tmp_config.json"
cat ${CONFIG_FILE} > ${TMP_CONFIG}

echo "," >> ${TMP_CONFIG}
cat ${COMMON_CONFIG} >> ${TMP_CONFIG}
/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG}

WAL=$(ls -l ${PGDATA}/pg_wal | head -n2 | tail -n1 | egrep -o "[0-9A-F]{24}")

wal-g --config=${TMP_CONFIG} daemon "${PGDATA}"/pg_wal

# shellcheck disable=SC2039
nc -U /tmp/wal-push.sock <<< 'CHECK'
sleep 1
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved

# shellcheck disable=SC2039
nc -U /tmp/wal-push.sock <<< "${WAL}"
sleep 1

wal-g st ls WALG_S3_PREFIX/wal_005/"${WAL}".br


62 changes: 62 additions & 0 deletions internal/databases/postgres/daemon_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package postgres

import (
"net"

"github.com/wal-g/tracelog"
)

// HandleDaemon is invoked to perform daemon mode
func HandleDaemon(c net.Conn, f func(string) error) {
defer func(c net.Conn) {
err := c.Close()
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tracelog.ErrorLogger.Printf("Failed to close connection with %s, err: %v\n", c.RemoteAddr(), err)
}
}(c)
buf := make([]byte, 512)
nr, err := c.Read(buf)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read checking message from client %s, err: %v\n", c.RemoteAddr(), err)
_, _ = c.Write([]byte("READ_FAILED"))
return
}
if nr == 5 && string(buf[0:5]) == "CHECK" {
_, _ = c.Write([]byte("CHECKED"))
tracelog.InfoLogger.Printf("Successful configuration check")
} else {
tracelog.ErrorLogger.Printf("Error on configuration check")
return
}
n, err := c.Read(buf)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read message with file from client %s, err: %v\n", c.RemoteAddr(), err)
_, _ = c.Write([]byte("READ_FAILED"))
return
}

if n < 24 {
if n > 0 {
tracelog.ErrorLogger.Printf("Received incorrect message %s from %s", buf[0:n], c.RemoteAddr())
} else {
tracelog.ErrorLogger.Printf("Received empty message from %s", c.RemoteAddr())
}
_, _ = c.Write([]byte("BAD_MSG"))
return
}

data := buf[0:24]
tracelog.InfoLogger.Printf("wal file name: %s\n", string(data))

err = f(string(data))
if err != nil {
tracelog.ErrorLogger.Printf("wal-push failed: %v\n", err)
_, _ = c.Write([]byte("FAIL"))
return
}

_, err = c.Write([]byte("OK"))
if err != nil {
tracelog.ErrorLogger.Println("OK write fail: ", err)
}
}
25 changes: 15 additions & 10 deletions internal/databases/postgres/wal_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/pkg/errors"
"github.com/spf13/viper"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/utility"
)

type CantOverwriteWalFileError struct {
Expand All @@ -32,21 +31,20 @@ func (err CantOverwriteWalFileError) Error() string {

// TODO : unit tests
// HandleWALPush is invoked to perform wal-g wal-push
func HandleWALPush(uploader *WalUploader, walFilePath string) {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
func HandleWALPush(uploader *WalUploader, walFilePath string) error {
if uploader.ArchiveStatusManager.IsWalAlreadyUploaded(walFilePath) {
err := uploader.ArchiveStatusManager.UnmarkWalFile(walFilePath)

if err != nil {
tracelog.ErrorLogger.Printf("unmark wal-g status for %s file failed due following error %+v", walFilePath, err)
}
err = uploadLocalWalMetadata(walFilePath, uploader.Uploader)
tracelog.ErrorLogger.FatalOnError(err)
return
return uploadLocalWalMetadata(walFilePath, uploader.Uploader)
}

concurrency, err := internal.GetMaxUploadConcurrency()
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}

totalBgUploadedLimit := viper.GetInt32(internal.TotalBgUploadedLimit)
// .history files must not be overwritten, see https://github.com/wal-g/wal-g/issues/420
Expand All @@ -58,16 +56,23 @@ func HandleWALPush(uploader *WalUploader, walFilePath string) {
bgUploader.Start()

err = uploadWALFile(uploader, walFilePath, bgUploader.preventWalOverwrite)
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}
err = uploadLocalWalMetadata(walFilePath, uploader.Uploader)
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}

err = bgUploader.Stop()
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}

if uploader.getUseWalDelta() {
uploader.FlushFiles()
}
return nil
}

// TODO : unit tests
Expand Down