forked from wal-g/wal-g
/
wal_part_recorder.go
75 lines (66 loc) · 2.07 KB
/
wal_part_recorder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package internal
import (
"fmt"
"github.com/pkg/errors"
"github.com/tinsane/tracelog"
)
type NotWalFilenameError struct {
error
}
func NewNotWalFilenameError(filename string) NotWalFilenameError {
return NotWalFilenameError{errors.Errorf("expected to get wal filename, but found: '%s'", filename)}
}
func (err NotWalFilenameError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
type WalPartRecorder struct {
manager *DeltaFileManager
walFilename string
}
func NewWalPartRecorder(walFilename string, manager *DeltaFileManager) (*WalPartRecorder, error) {
if !isWalFilename(walFilename) {
return nil, NewNotWalFilenameError(walFilename)
}
return &WalPartRecorder{manager, walFilename}, nil
}
func (recorder *WalPartRecorder) SavePreviousWalTail(tailData []byte) error {
if tailData == nil {
tailData = make([]byte, 0)
}
deltaFilename, err := GetDeltaFilenameFor(recorder.walFilename)
if err != nil {
return err
}
partFile, err := recorder.manager.GetPartFile(deltaFilename)
if err != nil {
return err
}
partFile.WalTails[GetPositionInDelta(recorder.walFilename)] = tailData
return nil
}
func (recorder *WalPartRecorder) SaveNextWalHead(head []byte) error {
if head == nil {
head = make([]byte, 0)
}
deltaFilename, _ := GetDeltaFilenameFor(recorder.walFilename)
partFile, err := recorder.manager.GetPartFile(deltaFilename)
if err != nil {
return err
}
positionInDelta := GetPositionInDelta(recorder.walFilename)
partFile.WalHeads[positionInDelta] = head
if positionInDelta == int(WalFileInDelta)-1 {
nextWalFilename, _ := GetNextWalFilename(recorder.walFilename)
nextDeltaFilename, _ := GetDeltaFilenameFor(nextWalFilename)
nextPartFile, err := recorder.manager.GetPartFile(nextDeltaFilename)
if err != nil {
return err
}
nextPartFile.PreviousWalHead = head
}
return nil
}
func (recorder *WalPartRecorder) cancelRecordingWithErr(err error) {
tracelog.WarningLogger.Printf("Stopped wal file: '%s' recording because of error: '%v'\n", recorder.walFilename, err)
recorder.manager.CancelRecording(recorder.walFilename)
}