Skip to content

Commit

Permalink
Add daemon mode for wal-push command (#1353)
Browse files Browse the repository at this point in the history
Added daemon command for archiving wal segments

Co-authored-by: usernamedt <usernamedt@yandex-team.com>
Co-authored-by: Artem Galkin <artem-galkin@yandex-team.ru>
  • Loading branch information
3 people committed Oct 28, 2022
1 parent 018b4c9 commit 925d2c4
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dockertests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ jobs:
'make TEST="pg_delete_target_delta_find_full_test" pg_integration_test',
'make TEST="pg_backup_mark_permanent_no_error_test" pg_integration_test',
'make TEST="pg_delete_garbage_test" pg_integration_test',
'make TEST="pg_daemon_test" pg_integration_test',
'make mongo_test',
'make MONGO_VERSION="5.0.10" MONGO_MAJOR="5.0" MONGO_REPO="repo.mongodb.org" MONGO_PACKAGE="mongodb-org" mongo_features',
'make MONGO_VERSION="5.0.10" MONGO_MAJOR="5.0" MONGO_REPO="repo.mongodb.com" MONGO_PACKAGE="mongodb-enterprise" mongo_features',
Expand Down
45 changes: 45 additions & 0 deletions cmd/pg/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pg

import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/asm"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/utility"
)

const DaemonShortDescription = "Uploads a WAL file to storage"

// daemonCmd represents the daemon archive command
var daemonCmd = &cobra.Command{
Use: "daemon daemon_socket_path",
Short: DaemonShortDescription, // TODO : improve description
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
uploader, err := postgres.ConfigureWalUploader()
tracelog.ErrorLogger.FatalOnError(err)

archiveStatusManager, err := internal.ConfigureArchiveStatusManager()
if err == nil {
uploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.ArchiveStatusManager = asm.NewNopASM()
}

PGArchiveStatusManager, err := internal.ConfigurePGArchiveStatusManager()
if err == nil {
uploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.PGArchiveStatusManager = asm.NewNopASM()
}
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
postgres.HandleDaemon(uploader, args[0])
},
}

func init() {
Cmd.AddCommand(daemonCmd)
}
5 changes: 4 additions & 1 deletion cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/asm"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/utility"
)

const WalPushShortDescription = "Uploads a WAL file to storage"
Expand Down Expand Up @@ -35,7 +36,9 @@ var walPushCmd = &cobra.Command{
uploader.PGArchiveStatusManager = asm.NewNopASM()
}

postgres.HandleWALPush(uploader, args[0])
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
err = postgres.HandleWALPush(uploader, args[0])
tracelog.ErrorLogger.FatalOnError(err)
},
}

Expand Down
12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ services:
&& mkdir -p /export/createrestorepointbucket
&& mkdir -p /export/storagetoolsbucket
&& mkdir -p /export/walrestorebucket
&& mkdir -p /export/daemonbucket
&& /usr/bin/minio server /export'
s3-another:
Expand Down Expand Up @@ -580,6 +581,17 @@ services:
links:
- s3

pg_daemon_test:
build:
dockerfile: docker/pg_tests/Dockerfile_daemon_test
context: .
image: wal-g/daemon_test
container_name: wal-g_pg_daemon_test
depends_on:
- s3
links:
- s3

pg_pgbackrest:
build:
dockerfile: docker/pg_tests/Dockerfile_pgbackrest
Expand Down
5 changes: 5 additions & 0 deletions docker/pg_tests/Dockerfile_daemon_test
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM wal-g/docker_prefix:latest

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y netcat-openbsd && apt-get clean

CMD su - postgres /tmp/tests/daemon_test.sh
6 changes: 6 additions & 0 deletions docker/pg_tests/scripts/configs/daemon_test_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"WALG_DELTA_MAX_STEPS": "0",
"WALE_S3_PREFIX": "s3://daemonbucket",
"WALG_USE_WAL_DELTA": "true",
"WALG_UPLOAD_CONCURRENCY": "1",
"WALG_UPLOAD_QUEUE": "1",
"WALG_UPLOAD_DISK_CONCURRENCY": "1"
52 changes: 52 additions & 0 deletions docker/pg_tests/scripts/tests/daemon_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash
set -e -x

export PGDATA=/var/lib/postgresql/10/main

CONFIG_FILE="/tmp/configs/daemon_test_config.json"

COMMON_CONFIG="/tmp/configs/common_config.json"
TMP_CONFIG="/tmp/configs/tmp_config.json"
cat ${CONFIG_FILE} > ${TMP_CONFIG}

echo "," >> ${TMP_CONFIG}
cat ${COMMON_CONFIG} >> ${TMP_CONFIG}
/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG}

/usr/lib/postgresql/10/bin/initdb ${PGDATA}
/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start
/tmp/scripts/wait_while_pg_not_ready.sh

wal-g --config=${TMP_CONFIG} delete everything FORCE --confirm

pgbench -i -s 50 postgres
du -hs ${PGDATA}
sleep 1
WAL=$(ls -l ${PGDATA}/pg_wal | head -n2 | tail -n1 | egrep -o "[0-9A-F]{24}")

SOCKET="/tmp/configs/wal-daemon.sock"
wal-g --config=${TMP_CONFIG} daemon ${SOCKET} &

until [ -S ${SOCKET} ]
do
sleep 1
done
echo "walg-daemon is working"

if {
echo -en "C\x0\x8"
echo -n "CHECK"
echo -en "F\x0\x1B"
echo -n "${WAL}"
} | nc -U ${SOCKET} | grep -q "OO"; then
echo "WAL-G response is correct"
if wal-g --config=${TMP_CONFIG} st ls /wal_005 | grep ${WAL}.br ; then
echo "Archive file in folder"
else
echo "Archive not in folder. Error."
exit 1
fi
else
echo "Error in WAL-G response."
exit 1
fi
9 changes: 9 additions & 0 deletions docs/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,15 @@ Usage:
wal-g wal-restore path/to/target-pgdata path/to/source-pgdata
```

### ``daemon``

Archives all WAL segments in the background. Works with the PostgreSQL archive library `walg_archive`.

Usage:
```bash
wal-g daemon path/to/socket-descriptor
```

pgBackRest backups support (beta version)
-----------
### ``pgbackrest backup-list``
Expand Down
168 changes: 168 additions & 0 deletions internal/databases/postgres/daemon_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package postgres

import (
"bytes"
"encoding/binary"
"fmt"
"io"
"net"
"os"
"path"

"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/utility"
)

type SocketMessageType byte

const (
CheckType SocketMessageType = 'C'
FileNameType SocketMessageType = 'F'
OkType SocketMessageType = 'O'
ErrorType SocketMessageType = 'E'
)

func (msg SocketMessageType) ToBytes() []byte {
return []byte{byte(msg)}
}

type SocketMessageHandler interface {
Handle(messageBody []byte) error
}

type CheckMessageHandler struct {
messageType SocketMessageType
fd net.Conn
uploader *WalUploader
}

func (h *CheckMessageHandler) Handle(messageBody []byte) error {
_, err := h.fd.Write(OkType.ToBytes())
if err != nil {
return fmt.Errorf("failed to write in socket: %w", err)
}
tracelog.InfoLogger.Println("Successful configuration check")
return nil
}

type ArchiveMessageHandler struct {
messageType SocketMessageType
fd net.Conn
uploader *WalUploader
}

func (h *ArchiveMessageHandler) Handle(messageBody []byte) error {
tracelog.InfoLogger.Printf("wal file name: %s\n", string(messageBody))
PgDataSettingString, ok := internal.GetSetting(internal.PgDataSetting)
if !ok {
return fmt.Errorf("PGDATA is not set in the conf")
}
pathToWal := path.Join(PgDataSettingString, "pg_wal")
fullPath := path.Join(pathToWal, string(messageBody))
tracelog.InfoLogger.Printf("starting wal-push for %s\n", fullPath)
err := HandleWALPush(h.uploader, fullPath)
if err != nil {
return fmt.Errorf("file archiving failed: %w", err)
}
_, err = h.fd.Write(OkType.ToBytes())
if err != nil {
return fmt.Errorf("socket write failed: %w", err)
}
return nil
}

func NewMessageHandler(messageType SocketMessageType, c net.Conn, uploader *WalUploader) SocketMessageHandler {
switch messageType {
case CheckType:
return &CheckMessageHandler{CheckType, c, uploader}
case FileNameType:
return &ArchiveMessageHandler{FileNameType, c, uploader}
default:
return nil
}
}

type SocketMessageReader struct {
c net.Conn
}

func NewMessageReader(c net.Conn) *SocketMessageReader {
return &SocketMessageReader{c}
}

// Next method reads messages sequentially from the Reader
func (r SocketMessageReader) Next() (messageType SocketMessageType, messageBody []byte, err error) {
messageParameters := make([]byte, 3)
_, err = io.ReadFull(r.c, messageParameters)
if err != nil {
return ErrorType, nil, fmt.Errorf("failed to read params: %w", err)
}
messageType = SocketMessageType(messageParameters[0])
var messageLength uint16
l := bytes.NewReader(messageParameters[1:3])
err = binary.Read(l, binary.BigEndian, &messageLength)
if err != nil {
return ErrorType, nil, fmt.Errorf("fail to read message len: %w", err)
}
messageBody = make([]byte, messageLength-3)
_, err = io.ReadFull(r.c, messageBody)
if err != nil {
return ErrorType, nil, fmt.Errorf("failed to read msg body: %w", err)
}
return messageType, messageBody, err
}

// HandleDaemon is invoked to perform daemon mode
func HandleDaemon(uploader *WalUploader, pathToSocket string) {
if _, err := os.Stat(pathToSocket); err == nil {
err = os.Remove(pathToSocket)
if err != nil {
tracelog.ErrorLogger.Fatal("Failed to remove socket file:", err)
}
}
l, err := net.Listen("unix", pathToSocket)
if err != nil {
tracelog.ErrorLogger.Fatal("Error on listening socket:", err)
}
for {
fd, err := l.Accept()
if err != nil {
tracelog.ErrorLogger.Fatal("Failed to accept, err:", err)
}
go Listen(fd, uploader)
}
}

// Listen is used for listening connection and processing messages
func Listen(c net.Conn, uploader *WalUploader) {
defer utility.LoggedClose(c, fmt.Sprintf("Failed to close connection with %s \n", c.RemoteAddr()))
messageReader := NewMessageReader(c)
for {
messageType, messageBody, err := messageReader.Next()
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read message from %s, err: %v\n", c.RemoteAddr(), err)
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
messageHandler := NewMessageHandler(messageType, c, uploader)
if messageHandler == nil {
tracelog.ErrorLogger.Printf("Unexpected message type: %s", string(messageType))
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
err = messageHandler.Handle(messageBody)
if err != nil {
tracelog.ErrorLogger.Println("Failed to handle message:", err)
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
if messageType == FileNameType {
tracelog.InfoLogger.Printf("Successful archiving for %s\n", string(messageBody))
return
}
}
}

0 comments on commit 925d2c4

Please sign in to comment.