Skip to content

Commit

Permalink
Refactoring: error handling, new msg types, rename funcs; cmd descrip…
Browse files Browse the repository at this point in the history
…tion; test listing.
  • Loading branch information
Artem Galkin committed Oct 27, 2022
1 parent 6bfd42d commit 8a191d3
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 58 deletions.
4 changes: 1 addition & 3 deletions cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ var walPushCmd = &cobra.Command{

uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
err = postgres.HandleWALPush(uploader, args[0])
if err != nil {
tracelog.ErrorLogger.FatalOnError(err)
}
tracelog.ErrorLogger.FatalOnError(err)
},
}

Expand Down
3 changes: 2 additions & 1 deletion docker/pg_tests/scripts/tests/daemon_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ wal-g --config=${TMP_CONFIG} daemon ${SOCKET} &

until [ -S ${SOCKET} ]
do
sleep 2
sleep 1
done
echo "walg-daemon is working"

Expand All @@ -40,6 +40,7 @@ if {
echo -n "${WAL}"
} | nc -U ${SOCKET} | grep -q "OO"; then
echo "WAL-G response is correct"
wal-g st ls wal_005/${WAL}.br
else
echo "Error in WAL-G response"
exit 1
Expand Down
2 changes: 1 addition & 1 deletion docs/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ wal-g wal-restore path/to/target-pgdata path/to/source-pgdata

### ``daemon``

Archives all WAL segments in the background.
Archives all WAL segments in the background. Works with the PostgreSQL archive library `walg_archive`.

Usage:
```bash
Expand Down
98 changes: 45 additions & 53 deletions internal/databases/postgres/daemon_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,78 +19,64 @@ type SocketMessageType byte
const (
CheckType SocketMessageType = 'C'
FileNameType SocketMessageType = 'F'
OkType SocketMessageType = 'O'
ErrorType SocketMessageType = 'E'
)

func (msg SocketMessageType) ToBytes() []byte {
return []byte{byte(msg)}
}

type SocketMessageHandler interface {
Handle(messageBody []byte) error
}

type ArchiveMessageHandler struct {
type CheckMessageHandler struct {
messageType SocketMessageType
fd net.Conn
uploader *WalUploader
}

func (h *CheckMessageHandler) Handle(messageBody []byte) error {
_, err := h.fd.Write([]byte{'O'})
_, err := h.fd.Write(OkType.ToBytes())
if err != nil {
tracelog.ErrorLogger.Printf("Error on writing in socket: %v \n", err)
return err
return fmt.Errorf("failed to write in socket: %w", err)
}
tracelog.InfoLogger.Println("Successful configuration check")
return nil
}

type CheckMessageHandler struct {
type ArchiveMessageHandler struct {
messageType SocketMessageType
fd net.Conn
uploader *WalUploader
}

func (h *ArchiveMessageHandler) Handle(messageBody []byte) error {
err := messageValidation(messageBody)
if err != nil {
tracelog.ErrorLogger.Printf("Incorrect message: %v\n", err)
return err
}
tracelog.InfoLogger.Printf("wal file name: %s\n", string(messageBody))
PgDataSettingString, ok := internal.GetSetting(internal.PgDataSetting)
if !ok {
tracelog.ErrorLogger.Print("\nPGDATA is not set in the conf.\n")
return fmt.Errorf("PGDATA is not set in the conf")
}
pathToWal := path.Join(PgDataSettingString, "pg_wal")
fullPath := path.Join(pathToWal, string(messageBody))
tracelog.InfoLogger.Printf("starting wal-push for %s\n", fullPath)
err = HandleWALPush(h.uploader, fullPath)
err := HandleWALPush(h.uploader, fullPath)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to archive file: %s, err: %v \n", string(messageBody), err)
return err
return fmt.Errorf("file archiving failed: %w", err)
}
_, err = h.fd.Write([]byte{'O'})
_, err = h.fd.Write(OkType.ToBytes())
if err != nil {
tracelog.ErrorLogger.Printf("failed to write in socket: %v\n", err)
return err
return fmt.Errorf("socket write failed: %w", err)
}
return nil
}

func messageValidation(messageBody []byte) error {
if len(messageBody) < 24 {
if len(messageBody) > 0 {
tracelog.ErrorLogger.Println("incorrect message accepted")
return fmt.Errorf("incorrect message accepted: %s", string(messageBody))
}
return fmt.Errorf("empty message accepted")
}
return nil
}

func GetMessageHandler(messageType byte, c net.Conn, uploader *WalUploader) SocketMessageHandler {
func NewMessageHandler(messageType SocketMessageType, c net.Conn, uploader *WalUploader) SocketMessageHandler {
switch messageType {
case 'C':
case CheckType:
return &CheckMessageHandler{CheckType, c, uploader}
case 'F':
case FileNameType:
return &ArchiveMessageHandler{FileNameType, c, uploader}
default:
return nil
Expand All @@ -101,75 +87,81 @@ type SocketMessageReader struct {
c net.Conn
}

func GetMessageReader(c net.Conn) *SocketMessageReader {
func NewMessageReader(c net.Conn) *SocketMessageReader {
return &SocketMessageReader{c}
}

// Next method reads messages sequentially from the Reader
func (r SocketMessageReader) Next() (messageType byte, messageBody []byte, err error) {
func (r SocketMessageReader) Next() (messageType SocketMessageType, messageBody []byte, err error) {
messageParameters := make([]byte, 3)
_, err = io.ReadFull(r.c, messageParameters)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read from socket, err: %v \n", err)
return 'E', nil, err
return ErrorType, nil, fmt.Errorf("failed to read params: %w", err)
}
messageType = messageParameters[0]
messageType = SocketMessageType(messageParameters[0])
var messageLength uint16
l := bytes.NewReader(messageParameters[1:3])
err = binary.Read(l, binary.BigEndian, &messageLength)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read message length, err: %v \n", err)
return 'E', nil, err
return ErrorType, nil, fmt.Errorf("fail to read message len: %w", err)
}
messageBody = make([]byte, messageLength-3)
_, err = io.ReadFull(r.c, messageBody)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read from socket, err: %v \n", err)
return 'E', nil, err
return ErrorType, nil, fmt.Errorf("failed to read msg body: %w", err)
}
return messageType, messageBody, err
}

// HandleDaemon is invoked to perform daemon mode
func HandleDaemon(uploader *WalUploader, pathToSocket string) {
_ = os.Remove(pathToSocket)
if _, err := os.Stat(pathToSocket); err == nil {
err = os.Remove(pathToSocket)
if err != nil {
tracelog.ErrorLogger.Fatal("Failed to remove socket file:", err)
}
}
l, err := net.Listen("unix", pathToSocket)
if err != nil {
tracelog.ErrorLogger.Fatal("Error on listening socket:", err)
}
for {
fd, err := l.Accept()
if err != nil {
tracelog.ErrorLogger.Println("Failed to accept, err:", err)
tracelog.ErrorLogger.Fatal("Failed to accept, err:", err)
}
go DaemonProcess(fd, uploader)
go Listen(fd, uploader)
}
}

// DaemonProcess reads data from connection and processes it
func DaemonProcess(c net.Conn, uploader *WalUploader) {
// Listen is used for listening connection and processing messages
func Listen(c net.Conn, uploader *WalUploader) {
defer utility.LoggedClose(c, fmt.Sprintf("Failed to close connection with %s \n", c.RemoteAddr()))
messageReader := GetMessageReader(c)
messageReader := NewMessageReader(c)
for {
messageType, messageBody, err := messageReader.Next()
if err != nil {
tracelog.ErrorLogger.Printf("Failed to read message from client %s, err: %v\n", c.RemoteAddr(), err)
_, _ = c.Write([]byte{'E'})
tracelog.ErrorLogger.Printf("Failed to read message from %s, err: %v\n", c.RemoteAddr(), err)
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
messageHandler := GetMessageHandler(messageType, c, uploader)
messageHandler := NewMessageHandler(messageType, c, uploader)
if messageHandler == nil {
tracelog.ErrorLogger.Printf("Unexpected message type: %s", string(messageType))
_, _ = c.Write([]byte{'E'})
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
err = messageHandler.Handle(messageBody)
if err != nil {
tracelog.ErrorLogger.Println("Failed to handle message, err:", err)
_, _ = c.Write([]byte{'E'})
tracelog.ErrorLogger.Println("Failed to handle message:", err)
_, err = c.Write(ErrorType.ToBytes())
tracelog.ErrorLogger.PrintOnError(err)
return
}
if messageType == 'F' {
if messageType == FileNameType {
tracelog.InfoLogger.Printf("Successful archiving for %s\n", string(messageBody))
return
}
}
Expand Down

0 comments on commit 8a191d3

Please sign in to comment.