Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add daemon mode for wal-push command #1353

Merged
merged 23 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
375724c
Add hostname to backup-push output
usernamedt Sep 16, 2022
73dc6f5
Change missing AO files metadata message lvl to warn
usernamedt Sep 16, 2022
ba10323
fmt & remove unnecessary debug
usernamedt Sep 17, 2022
f1cd865
Add data catalog size to backup sentinel
usernamedt Sep 20, 2022
2d1f34c
Add data catalog size to backup sentinel (greenplum)
usernamedt Sep 20, 2022
97fcd78
working version
Oct 4, 2022
4b83717
removed daemon flag
Oct 5, 2022
7a9c2d8
communication into one socket connection
Oct 7, 2022
b716637
Merge branch 'wal-g:master' into daemon-master-dev
rogaliiik Oct 7, 2022
ee98bdd
Added daemon handler, errors handled, uploading folder initialized
Oct 7, 2022
ba3a1ad
Merge branch 'daemon-master-dev' of https://github.com/rogaliiik/wal-…
Oct 7, 2022
4da5e24
daemon test
Oct 10, 2022
0d7f569
Socket logic added to HandleDaemon, daemon cmd takes socketPath in args
Oct 10, 2022
7f05253
binary message protocol daemon handler
Oct 14, 2022
bcff731
rename functions
Oct 17, 2022
ec6b490
message type implementation
Oct 17, 2022
1f1af79
SocketMessageType fix, SocketMessageReader implementation
Oct 18, 2022
2c117fa
daemon docker test
Oct 20, 2022
6ffc97c
daemon test compose
Oct 21, 2022
766a6e2
PGDATA added, netcat-openbsd instalation, socket creating check
Oct 24, 2022
6bfd42d
daemon_handler messageHandler unexpected msg type case processing, co…
Oct 25, 2022
6fedfec
Refactoring: error handling, new msg types, rename funcs; cmd descrip…
Oct 27, 2022
e14af01
daemon test archive grep
Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 55 additions & 0 deletions cmd/pg/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pg

import (
"os"
"path"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/asm"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/utility"
)

const DaemonShortDescription = "Uploads a WAL file to storage"

// daemonCmd represents the daemon archive command
var daemonCmd = &cobra.Command{
Use: "daemon daemon_socket_path",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some information about this command to the documentation.

Short: DaemonShortDescription, // TODO : improve description
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
uploader, err := postgres.ConfigureWalUploader()
tracelog.ErrorLogger.FatalOnError(err)

archiveStatusManager, err := internal.ConfigureArchiveStatusManager()
if err == nil {
uploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.ArchiveStatusManager = asm.NewNopASM()
}

PGArchiveStatusManager, err := internal.ConfigurePGArchiveStatusManager()
if err == nil {
uploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.PGArchiveStatusManager = asm.NewNopASM()
}

uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
pathToData, exist := os.LookupEnv("PGDATA")
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
if !exist {
tracelog.ErrorLogger.Fatal("'PGDATA' variable is not defined or does not exist")
}
pathToWal := path.Join(pathToData, "pg_wal")

postgres.HandleDaemon(uploader, args[0], pathToWal)
},
}

func init() {
Cmd.AddCommand(daemonCmd)
}
7 changes: 6 additions & 1 deletion cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/asm"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/utility"
)

const WalPushShortDescription = "Uploads a WAL file to storage"
Expand Down Expand Up @@ -35,7 +36,11 @@ var walPushCmd = &cobra.Command{
uploader.PGArchiveStatusManager = asm.NewNopASM()
}

postgres.HandleWALPush(uploader, args[0])
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
err = postgres.HandleWALPush(uploader, args[0])
if err != nil {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
tracelog.ErrorLogger.FatalOnError(err)
}
},
}

Expand Down
28 changes: 28 additions & 0 deletions docker/pg_tests/scripts/tests/daemon_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/sh
set -e -x
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved

CONFIG_FILE="/tmp/configs/wal_perftest_config.json"

COMMON_CONFIG="/tmp/configs/common_config.json"
TMP_CONFIG="/tmp/configs/tmp_config.json"
cat ${CONFIG_FILE} > ${TMP_CONFIG}

echo "," >> ${TMP_CONFIG}
cat ${COMMON_CONFIG} >> ${TMP_CONFIG}
/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG}

WAL=$(ls -l ${PGDATA}/pg_wal | head -n2 | tail -n1 | egrep -o "[0-9A-F]{24}")

wal-g --config=${TMP_CONFIG} daemon "${PGDATA}"/pg_wal

# shellcheck disable=SC2039
nc -U /tmp/wal-push.sock <<< 'CHECK'
sleep 1
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved

# shellcheck disable=SC2039
nc -U /tmp/wal-push.sock <<< "${WAL}"
sleep 1

wal-g st ls WALG_S3_PREFIX/wal_005/"${WAL}".br


132 changes: 132 additions & 0 deletions internal/databases/postgres/daemon_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package postgres

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"path"

"github.com/wal-g/tracelog"
)

type CheckType byte
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
type FileType byte

type MessageSocketHandler interface {
Handle(messageBody []byte, c net.Conn, f func(string) error) error
}

func MessageTypeConstruct(messageType byte) MessageSocketHandler {
switch messageType {
case 'C':
return CheckType('C')
case 'F':
return FileType('F')
default:
return nil
}
}

func (msg CheckType) Handle(messageBody []byte, c net.Conn, f func(string) error) error {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
_, err := c.Write([]byte{'O', 0, 3})
if err != nil {
tracelog.ErrorLogger.Printf("Error on writing in socket: %v", err)
return err
}
return nil
}

func (msg FileType) Handle(messageBody []byte, c net.Conn, f func(string) error) error {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
if len(messageBody) < 24 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we archive not only WALs but other files with length other than 24.

I think we can remove this validation.

if len(messageBody) > 0 {
tracelog.ErrorLogger.Printf("Received incorrect message %s", messageBody)
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
} else {
tracelog.ErrorLogger.Println("Received empty message")
}
return errors.New(fmt.Sprint("Incorrect message accepted"))
}
tracelog.InfoLogger.Printf("wal file name: %s\n", messageBody)
err := f(string(messageBody))
if err != nil {
tracelog.ErrorLogger.Printf("wal-push failed: %v\n", err)
return err
}
_, err = c.Write([]byte{'O', 0, 3})
if err != nil {
return err
}
return nil
}

// HandleDaemon is invoked to perform daemon mode
func HandleDaemon(uploader *WalUploader, pathToSocket string, pathToWal string) {
_ = os.Remove(pathToSocket)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not ignore the errors

l, err := net.Listen("unix", pathToSocket)
if err != nil {
tracelog.ErrorLogger.Fatal("Error on listening socket:", err)
}
for {
fd, err := l.Accept()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we would want to terminate the running daemon?

I suggest to add the signal listener and listend for SIGTERM to perform a graceful shutdown. But looks like it is a feature for the next PR, not for the MVP.

if err != nil {
tracelog.ErrorLogger.Println("Failed to accept, err:", err)
}
go DaemonProcess(fd, func(walFileName string) error {
fullPath := path.Join(pathToWal, walFileName)
tracelog.InfoLogger.Printf("starting wal-push for %s\n", fullPath)
return HandleWALPush(uploader, fullPath)
})
}
}

func DaemonProcess(c net.Conn, f func(string) error) {
defer func(c net.Conn) {
err := c.Close()
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tracelog.ErrorLogger.Printf("Failed to close connection with %s, err: %v\n", c.RemoteAddr(), err)
}
}(c)
buf := make([]byte, 512)
for {
nr, err := c.Read(buf)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read checking message from client %s, err: %v\n", c.RemoteAddr(), err)
_, _ = c.Write([]byte{'E', 0, 3})
return
}

byteType, byteLength, byteBody := MessageParser(buf)
requestType := MessageTypeConstruct(byteType)
err = requestType.Handle(byteBody, c, f)
if err != nil {
tracelog.ErrorLogger.Println("Failed to handle message, err:", err)
}
if byteType == 'F' {
return
}
if int(byteLength) < nr {
tracelog.InfoLogger.Println("Read remaining buffer...")
byteType, byteLength, byteBody = MessageParser(buf[byteLength:])
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
err = requestType.Handle(byteBody, c, f)
if err != nil {
tracelog.ErrorLogger.Println("Failed to handle message, err:", err)
}
return
}
}
}

// MessageParser is invoked to read bytes from buffer
func MessageParser(buf []byte) (byte, uint16, []byte) {
messageType := buf[0]
var messageLength uint16
l := bytes.NewReader(buf[1:3])
err := binary.Read(l, binary.BigEndian, &messageLength)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read message length, err: %v", err)
}
messageBody := buf[3 : messageLength-1]
return messageType, messageLength, messageBody
}
25 changes: 15 additions & 10 deletions internal/databases/postgres/wal_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/pkg/errors"
"github.com/spf13/viper"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/utility"
)

type CantOverwriteWalFileError struct {
Expand All @@ -32,21 +31,20 @@ func (err CantOverwriteWalFileError) Error() string {

// TODO : unit tests
// HandleWALPush is invoked to perform wal-g wal-push
func HandleWALPush(uploader *WalUploader, walFilePath string) {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
func HandleWALPush(uploader *WalUploader, walFilePath string) error {
if uploader.ArchiveStatusManager.IsWalAlreadyUploaded(walFilePath) {
err := uploader.ArchiveStatusManager.UnmarkWalFile(walFilePath)

if err != nil {
tracelog.ErrorLogger.Printf("unmark wal-g status for %s file failed due following error %+v", walFilePath, err)
}
err = uploadLocalWalMetadata(walFilePath, uploader.Uploader)
tracelog.ErrorLogger.FatalOnError(err)
return
return uploadLocalWalMetadata(walFilePath, uploader.Uploader)
}

concurrency, err := internal.GetMaxUploadConcurrency()
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}

totalBgUploadedLimit := viper.GetInt32(internal.TotalBgUploadedLimit)
// .history files must not be overwritten, see https://github.com/wal-g/wal-g/issues/420
Expand All @@ -58,16 +56,23 @@ func HandleWALPush(uploader *WalUploader, walFilePath string) {
bgUploader.Start()

err = uploadWALFile(uploader, walFilePath, bgUploader.preventWalOverwrite)
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}
err = uploadLocalWalMetadata(walFilePath, uploader.Uploader)
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}

err = bgUploader.Stop()
tracelog.ErrorLogger.FatalOnError(err)
if err != nil {
return err
}

if uploader.getUseWalDelta() {
uploader.FlushFiles()
}
return nil
}

// TODO : unit tests
Expand Down