Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add daemon mode for wal-push command #1353

Merged
merged 23 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
375724c
Add hostname to backup-push output
usernamedt Sep 16, 2022
73dc6f5
Change missing AO files metadata message lvl to warn
usernamedt Sep 16, 2022
ba10323
fmt & remove unnecessary debug
usernamedt Sep 17, 2022
f1cd865
Add data catalog size to backup sentinel
usernamedt Sep 20, 2022
2d1f34c
Add data catalog size to backup sentinel (greenplum)
usernamedt Sep 20, 2022
97fcd78
working version
Oct 4, 2022
4b83717
removed daemon flag
Oct 5, 2022
7a9c2d8
communication into one socket connection
Oct 7, 2022
b716637
Merge branch 'wal-g:master' into daemon-master-dev
rogaliiik Oct 7, 2022
ee98bdd
Added daemon handler, errors handled, uploading folder initialized
Oct 7, 2022
ba3a1ad
Merge branch 'daemon-master-dev' of https://github.com/rogaliiik/wal-…
Oct 7, 2022
4da5e24
daemon test
Oct 10, 2022
0d7f569
Socket logic added to HandleDaemon, daemon cmd takes socketPath in args
Oct 10, 2022
7f05253
binary message protocol daemon handler
Oct 14, 2022
bcff731
rename functions
Oct 17, 2022
ec6b490
message type implementation
Oct 17, 2022
1f1af79
SocketMessageType fix, SocketMessageReader implementation
Oct 18, 2022
2c117fa
daemon docker test
Oct 20, 2022
6ffc97c
daemon test compose
Oct 21, 2022
766a6e2
PGDATA added, netcat-openbsd instalation, socket creating check
Oct 24, 2022
6bfd42d
daemon_handler messageHandler unexpected msg type case processing, co…
Oct 25, 2022
6fedfec
Refactoring: error handling, new msg types, rename funcs; cmd descrip…
Oct 27, 2022
e14af01
daemon test archive grep
Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ var walPushCmd = &cobra.Command{

uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
err = postgres.HandleWALPush(uploader, args[0])
if err != nil {
tracelog.ErrorLogger.FatalOnError(err)
}
tracelog.ErrorLogger.FatalOnError(err)
},
}

Expand Down
3 changes: 2 additions & 1 deletion docker/pg_tests/scripts/tests/daemon_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ wal-g --config=${TMP_CONFIG} daemon ${SOCKET} &

until [ -S ${SOCKET} ]
do
sleep 2
sleep 1
done
echo "walg-daemon is working"

Expand All @@ -40,6 +40,7 @@ if {
echo -n "${WAL}"
} | nc -U ${SOCKET} | grep -q "OO"; then
echo "WAL-G response is correct"
wal-g --config=${TMP_CONFIG} st ls wal_005/${WAL}.br
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
else
echo "Error in WAL-G response"
exit 1
Expand Down
2 changes: 1 addition & 1 deletion docs/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ wal-g wal-restore path/to/target-pgdata path/to/source-pgdata

### ``daemon``

Archives all WAL segments in the background.
Archives all WAL segments in the background. Works with the PostgreSQL archive library `walg_archive`.

Usage:
```bash
Expand Down
98 changes: 45 additions & 53 deletions internal/databases/postgres/daemon_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,78 +19,64 @@ type SocketMessageType byte
const (
CheckType SocketMessageType = 'C'
FileNameType SocketMessageType = 'F'
OkType SocketMessageType = 'O'
ErrorType SocketMessageType = 'E'
)

func (msg SocketMessageType) ToBytes() []byte {
return []byte{byte(msg)}
}

type SocketMessageHandler interface {
Handle(messageBody []byte) error
}

type ArchiveMessageHandler struct {
type CheckMessageHandler struct {
messageType SocketMessageType
fd net.Conn
uploader *WalUploader
}

func (h *CheckMessageHandler) Handle(messageBody []byte) error {
rogaliiik marked this conversation as resolved.
Show resolved Hide resolved
_, err := h.fd.Write([]byte{'O'})
_, err := h.fd.Write(OkType.ToBytes())
if err != nil {
tracelog.ErrorLogger.Printf("Error on writing in socket: %v \n", err)
return err
return fmt.Errorf("failed to write in socket: %w", err)
}
tracelog.InfoLogger.Println("Successful configuration check")
return nil
}

type CheckMessageHandler struct {
type ArchiveMessageHandler struct {
messageType SocketMessageType
fd net.Conn
uploader *WalUploader
}

func (h *ArchiveMessageHandler) Handle(messageBody []byte) error {
err := messageValidation(messageBody)
if err != nil {
tracelog.ErrorLogger.Printf("Incorrect message: %v\n", err)
return err
}
tracelog.InfoLogger.Printf("wal file name: %s\n", string(messageBody))
PgDataSettingString, ok := internal.GetSetting(internal.PgDataSetting)
if !ok {
tracelog.ErrorLogger.Print("\nPGDATA is not set in the conf.\n")
return fmt.Errorf("PGDATA is not set in the conf")
}
pathToWal := path.Join(PgDataSettingString, "pg_wal")
fullPath := path.Join(pathToWal, string(messageBody))
tracelog.InfoLogger.Printf("starting wal-push for %s\n", fullPath)
err = HandleWALPush(h.uploader, fullPath)
err := HandleWALPush(h.uploader, fullPath)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to archive file: %s, err: %v \n", string(messageBody), err)
return err
return fmt.Errorf("file archiving failed: %w", err)
}
_, err = h.fd.Write([]byte{'O'})
_, err = h.fd.Write(OkType.ToBytes())
if err != nil {
tracelog.ErrorLogger.Printf("failed to write in socket: %v\n", err)
return err
return fmt.Errorf("socket write failed: %w", err)
}
return nil
}

func messageValidation(messageBody []byte) error {
if len(messageBody) < 24 {
if len(messageBody) > 0 {
tracelog.ErrorLogger.Println("incorrect message accepted")
return fmt.Errorf("incorrect message accepted: %s", string(messageBody))
}
return fmt.Errorf("empty message accepted")
}
return nil
}

func GetMessageHandler(messageType byte, c net.Conn, uploader *WalUploader) SocketMessageHandler {
func NewMessageHandler(messageType SocketMessageType, c net.Conn, uploader *WalUploader) SocketMessageHandler {
switch messageType {
case 'C':
case CheckType:
return &CheckMessageHandler{CheckType, c, uploader}
case 'F':
case FileNameType:
return &ArchiveMessageHandler{FileNameType, c, uploader}
default:
return nil
Expand All @@ -101,75 +87,81 @@ type SocketMessageReader struct {
c net.Conn
}

func GetMessageReader(c net.Conn) *SocketMessageReader {
func NewMessageReader(c net.Conn) *SocketMessageReader {
return &SocketMessageReader{c}
}

// Next method reads messages sequentially from the Reader
func (r SocketMessageReader) Next() (messageType byte, messageBody []byte, err error) {
func (r SocketMessageReader) Next() (messageType SocketMessageType, messageBody []byte, err error) {
messageParameters := make([]byte, 3)
_, err = io.ReadFull(r.c, messageParameters)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read from socket, err: %v \n", err)
return 'E', nil, err
return ErrorType, nil, fmt.Errorf("failed to read params: %w", err)
}
messageType = messageParameters[0]
messageType = SocketMessageType(messageParameters[0])
var messageLength uint16
l := bytes.NewReader(messageParameters[1:3])
err = binary.Read(l, binary.BigEndian, &messageLength)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read message length, err: %v \n", err)
return 'E', nil, err
return ErrorType, nil, fmt.Errorf("fail to read message len: %w", err)
}
messageBody = make([]byte, messageLength-3)
_, err = io.ReadFull(r.c, messageBody)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read from socket, err: %v \n", err)
return 'E', nil, err
return ErrorType, nil, fmt.Errorf("failed to read msg body: %w", err)
}
return messageType, messageBody, err
}

// HandleDaemon is invoked to perform daemon mode
func HandleDaemon(uploader *WalUploader, pathToSocket string) {
_ = os.Remove(pathToSocket)
if _, err := os.Stat(pathToSocket); err == nil {
err = os.Remove(pathToSocket)
if err != nil {
tracelog.ErrorLogger.Fatal("Failed to remove socket file:", err)
}
}
l, err := net.Listen("unix", pathToSocket)
if err != nil {
tracelog.ErrorLogger.Fatal("Error on listening socket:", err)
}
for {
fd, err := l.Accept()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we would want to terminate the running daemon?

I suggest to add the signal listener and listend for SIGTERM to perform a graceful shutdown. But looks like it is a feature for the next PR, not for the MVP.

if err != nil {
tracelog.ErrorLogger.Println("Failed to accept, err:", err)
tracelog.ErrorLogger.Fatal("Failed to accept, err:", err)
}
go DaemonProcess(fd, uploader)
go Listen(fd, uploader)
}
}

// DaemonProcess reads data from connection and processes it
func DaemonProcess(c net.Conn, uploader *WalUploader) {
// Listen is used for listening connection and processing messages
func Listen(c net.Conn, uploader *WalUploader) {
defer utility.LoggedClose(c, fmt.Sprintf("Failed to close connection with %s \n", c.RemoteAddr()))
messageReader := GetMessageReader(c)
messageReader := NewMessageReader(c)
for {
messageType, messageBody, err := messageReader.Next()
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read message from client %s, err: %v\n", c.RemoteAddr(), err)
_, _ = c.Write([]byte{'E'})
tracelog.ErrorLogger.Printf("Failed to read message from %s, err: %v\n", c.RemoteAddr(), err)
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
messageHandler := GetMessageHandler(messageType, c, uploader)
messageHandler := NewMessageHandler(messageType, c, uploader)
if messageHandler == nil {
tracelog.ErrorLogger.Printf("Unexpected message type: %s", string(messageType))
_, _ = c.Write([]byte{'E'})
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
err = messageHandler.Handle(messageBody)
if err != nil {
tracelog.ErrorLogger.Println("Failed to handle message, err:", err)
_, _ = c.Write([]byte{'E'})
tracelog.ErrorLogger.Println("Failed to handle message:", err)
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
if messageType == 'F' {
if messageType == FileNameType {
tracelog.InfoLogger.Printf("Successful archiving for %s\n", string(messageBody))
return
}
}
Expand Down