-
Notifications
You must be signed in to change notification settings - Fork 435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add daemon mode for wal-push command #1353
Changes from all commits
375724c
73dc6f5
ba10323
f1cd865
2d1f34c
97fcd78
4b83717
7a9c2d8
b716637
ee98bdd
ba3a1ad
4da5e24
0d7f569
7f05253
bcff731
ec6b490
1f1af79
2c117fa
6ffc97c
766a6e2
6bfd42d
6fedfec
e14af01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package pg | ||
|
||
import ( | ||
"github.com/spf13/cobra" | ||
"github.com/wal-g/tracelog" | ||
"github.com/wal-g/wal-g/internal" | ||
"github.com/wal-g/wal-g/internal/asm" | ||
"github.com/wal-g/wal-g/internal/databases/postgres" | ||
"github.com/wal-g/wal-g/utility" | ||
) | ||
|
||
const DaemonShortDescription = "Uploads a WAL file to storage" | ||
|
||
// daemonCmd represents the daemon archive command | ||
var daemonCmd = &cobra.Command{ | ||
Use: "daemon daemon_socket_path", | ||
Short: DaemonShortDescription, // TODO : improve description | ||
Args: cobra.ExactArgs(1), | ||
Run: func(cmd *cobra.Command, args []string) { | ||
rogaliiik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
uploader, err := postgres.ConfigureWalUploader() | ||
tracelog.ErrorLogger.FatalOnError(err) | ||
|
||
archiveStatusManager, err := internal.ConfigureArchiveStatusManager() | ||
if err == nil { | ||
uploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager) | ||
} else { | ||
tracelog.ErrorLogger.PrintError(err) | ||
uploader.ArchiveStatusManager = asm.NewNopASM() | ||
} | ||
|
||
PGArchiveStatusManager, err := internal.ConfigurePGArchiveStatusManager() | ||
if err == nil { | ||
uploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager) | ||
} else { | ||
tracelog.ErrorLogger.PrintError(err) | ||
uploader.PGArchiveStatusManager = asm.NewNopASM() | ||
} | ||
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath) | ||
postgres.HandleDaemon(uploader, args[0]) | ||
}, | ||
} | ||
|
||
func init() { | ||
Cmd.AddCommand(daemonCmd) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
FROM wal-g/docker_prefix:latest | ||
|
||
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y netcat-openbsd && apt-get clean | ||
|
||
CMD su - postgres /tmp/tests/daemon_test.sh |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
"WALG_DELTA_MAX_STEPS": "0", | ||
"WALE_S3_PREFIX": "s3://daemonbucket", | ||
"WALG_USE_WAL_DELTA": "true", | ||
"WALG_UPLOAD_CONCURRENCY": "1", | ||
"WALG_UPLOAD_QUEUE": "1", | ||
"WALG_UPLOAD_DISK_CONCURRENCY": "1" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
#!/bin/bash | ||
set -e -x | ||
rogaliiik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
export PGDATA=/var/lib/postgresql/10/main | ||
|
||
CONFIG_FILE="/tmp/configs/daemon_test_config.json" | ||
|
||
COMMON_CONFIG="/tmp/configs/common_config.json" | ||
TMP_CONFIG="/tmp/configs/tmp_config.json" | ||
cat ${CONFIG_FILE} > ${TMP_CONFIG} | ||
|
||
echo "," >> ${TMP_CONFIG} | ||
cat ${COMMON_CONFIG} >> ${TMP_CONFIG} | ||
/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG} | ||
|
||
/usr/lib/postgresql/10/bin/initdb ${PGDATA} | ||
/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start | ||
/tmp/scripts/wait_while_pg_not_ready.sh | ||
|
||
wal-g --config=${TMP_CONFIG} delete everything FORCE --confirm | ||
|
||
pgbench -i -s 50 postgres | ||
du -hs ${PGDATA} | ||
sleep 1 | ||
WAL=$(ls -l ${PGDATA}/pg_wal | head -n2 | tail -n1 | egrep -o "[0-9A-F]{24}") | ||
|
||
SOCKET="/tmp/configs/wal-daemon.sock" | ||
wal-g --config=${TMP_CONFIG} daemon ${SOCKET} & | ||
|
||
until [ -S ${SOCKET} ] | ||
do | ||
sleep 1 | ||
done | ||
echo "walg-daemon is working" | ||
|
||
if { | ||
echo -en "C\x0\x8" | ||
echo -n "CHECK" | ||
echo -en "F\x0\x1B" | ||
echo -n "${WAL}" | ||
} | nc -U ${SOCKET} | grep -q "OO"; then | ||
echo "WAL-G response is correct" | ||
if wal-g --config=${TMP_CONFIG} st ls /wal_005 | grep ${WAL}.br ; then | ||
echo "Archive file in folder" | ||
else | ||
echo "Archive not in folder. Error." | ||
exit 1 | ||
fi | ||
else | ||
echo "Error in WAL-G response." | ||
exit 1 | ||
fi | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to check that WAL has been archived correctly. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
package postgres | ||
|
||
import ( | ||
"bytes" | ||
"encoding/binary" | ||
"fmt" | ||
"io" | ||
"net" | ||
"os" | ||
"path" | ||
|
||
"github.com/wal-g/tracelog" | ||
"github.com/wal-g/wal-g/internal" | ||
"github.com/wal-g/wal-g/utility" | ||
) | ||
|
||
type SocketMessageType byte | ||
|
||
const ( | ||
CheckType SocketMessageType = 'C' | ||
FileNameType SocketMessageType = 'F' | ||
OkType SocketMessageType = 'O' | ||
ErrorType SocketMessageType = 'E' | ||
) | ||
|
||
func (msg SocketMessageType) ToBytes() []byte { | ||
return []byte{byte(msg)} | ||
} | ||
|
||
type SocketMessageHandler interface { | ||
Handle(messageBody []byte) error | ||
} | ||
|
||
type CheckMessageHandler struct { | ||
messageType SocketMessageType | ||
fd net.Conn | ||
uploader *WalUploader | ||
} | ||
|
||
func (h *CheckMessageHandler) Handle(messageBody []byte) error { | ||
rogaliiik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_, err := h.fd.Write(OkType.ToBytes()) | ||
if err != nil { | ||
return fmt.Errorf("failed to write in socket: %w", err) | ||
} | ||
tracelog.InfoLogger.Println("Successful configuration check") | ||
return nil | ||
} | ||
|
||
type ArchiveMessageHandler struct { | ||
messageType SocketMessageType | ||
fd net.Conn | ||
uploader *WalUploader | ||
} | ||
|
||
func (h *ArchiveMessageHandler) Handle(messageBody []byte) error { | ||
tracelog.InfoLogger.Printf("wal file name: %s\n", string(messageBody)) | ||
PgDataSettingString, ok := internal.GetSetting(internal.PgDataSetting) | ||
if !ok { | ||
return fmt.Errorf("PGDATA is not set in the conf") | ||
} | ||
pathToWal := path.Join(PgDataSettingString, "pg_wal") | ||
fullPath := path.Join(pathToWal, string(messageBody)) | ||
tracelog.InfoLogger.Printf("starting wal-push for %s\n", fullPath) | ||
err := HandleWALPush(h.uploader, fullPath) | ||
if err != nil { | ||
return fmt.Errorf("file archiving failed: %w", err) | ||
} | ||
_, err = h.fd.Write(OkType.ToBytes()) | ||
if err != nil { | ||
return fmt.Errorf("socket write failed: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
func NewMessageHandler(messageType SocketMessageType, c net.Conn, uploader *WalUploader) SocketMessageHandler { | ||
switch messageType { | ||
case CheckType: | ||
return &CheckMessageHandler{CheckType, c, uploader} | ||
case FileNameType: | ||
return &ArchiveMessageHandler{FileNameType, c, uploader} | ||
default: | ||
return nil | ||
} | ||
} | ||
|
||
type SocketMessageReader struct { | ||
c net.Conn | ||
} | ||
|
||
func NewMessageReader(c net.Conn) *SocketMessageReader { | ||
return &SocketMessageReader{c} | ||
} | ||
|
||
// Next method reads messages sequentially from the Reader | ||
func (r SocketMessageReader) Next() (messageType SocketMessageType, messageBody []byte, err error) { | ||
messageParameters := make([]byte, 3) | ||
_, err = io.ReadFull(r.c, messageParameters) | ||
if err != nil { | ||
return ErrorType, nil, fmt.Errorf("failed to read params: %w", err) | ||
} | ||
messageType = SocketMessageType(messageParameters[0]) | ||
var messageLength uint16 | ||
l := bytes.NewReader(messageParameters[1:3]) | ||
err = binary.Read(l, binary.BigEndian, &messageLength) | ||
if err != nil { | ||
return ErrorType, nil, fmt.Errorf("fail to read message len: %w", err) | ||
} | ||
messageBody = make([]byte, messageLength-3) | ||
_, err = io.ReadFull(r.c, messageBody) | ||
if err != nil { | ||
return ErrorType, nil, fmt.Errorf("failed to read msg body: %w", err) | ||
} | ||
return messageType, messageBody, err | ||
} | ||
|
||
// HandleDaemon is invoked to perform daemon mode | ||
func HandleDaemon(uploader *WalUploader, pathToSocket string) { | ||
if _, err := os.Stat(pathToSocket); err == nil { | ||
err = os.Remove(pathToSocket) | ||
if err != nil { | ||
tracelog.ErrorLogger.Fatal("Failed to remove socket file:", err) | ||
} | ||
} | ||
l, err := net.Listen("unix", pathToSocket) | ||
if err != nil { | ||
tracelog.ErrorLogger.Fatal("Error on listening socket:", err) | ||
} | ||
for { | ||
fd, err := l.Accept() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we would want to terminate the running daemon? I suggest to add the signal listener and listend for |
||
if err != nil { | ||
tracelog.ErrorLogger.Fatal("Failed to accept, err:", err) | ||
} | ||
go Listen(fd, uploader) | ||
} | ||
} | ||
|
||
// Listen is used for listening connection and processing messages | ||
func Listen(c net.Conn, uploader *WalUploader) { | ||
defer utility.LoggedClose(c, fmt.Sprintf("Failed to close connection with %s \n", c.RemoteAddr())) | ||
messageReader := NewMessageReader(c) | ||
for { | ||
messageType, messageBody, err := messageReader.Next() | ||
if err != nil { | ||
tracelog.ErrorLogger.Printf("Failed to read message from %s, err: %v\n", c.RemoteAddr(), err) | ||
_, err = c.Write(ErrorType.ToBytes()) | ||
tracelog.ErrorLogger.PrintOnError(err) | ||
return | ||
} | ||
messageHandler := NewMessageHandler(messageType, c, uploader) | ||
if messageHandler == nil { | ||
tracelog.ErrorLogger.Printf("Unexpected message type: %s", string(messageType)) | ||
_, err = c.Write(ErrorType.ToBytes()) | ||
tracelog.ErrorLogger.PrintOnError(err) | ||
return | ||
} | ||
err = messageHandler.Handle(messageBody) | ||
if err != nil { | ||
tracelog.ErrorLogger.Println("Failed to handle message:", err) | ||
_, err = c.Write(ErrorType.ToBytes()) | ||
tracelog.ErrorLogger.PrintOnError(err) | ||
return | ||
} | ||
if messageType == FileNameType { | ||
tracelog.InfoLogger.Printf("Successful archiving for %s\n", string(messageBody)) | ||
return | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some information about this command to the documentation.