Skip to content

Commit

Permalink
postgres: add state to wal struct
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Safonov committed Sep 16, 2022
1 parent 448c369 commit 09c42c3
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions format/postgres/flavours/postgres14/pg_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,20 @@ type walD struct {

pageRecords *decode.D

state *walState
}

type walState struct {
record *decode.D
recordRemLenBytes int64
}

func DecodePgwal(d *decode.D, maxOffset uint32) any {
pages := d.FieldArrayValue("Pages")
wal := &walD{
maxOffset: int64(maxOffset),
pages: pages,
records: d.FieldArrayValue("Records"),
recordRemLenBytes: -1, // -1 means not initialized
maxOffset: int64(maxOffset),
pages: pages,
records: d.FieldArrayValue("Records"),
}

for {
Expand Down Expand Up @@ -210,9 +213,9 @@ func decodeXLogPage(wal *walD, d *decode.D) {
})
}

if wal.recordRemLenBytes >= 0 { // check recordRemLenBytes is initialized
if wal.recordRemLenBytes != int64(remLenBytes) {
d.Fatalf("incorrect wal.recordRemLenBytes = %d, remLenBytes = %d", wal.recordRemLenBytes, remLenBytes)
if wal.state != nil { // check recordRemLenBytes is initialized
if wal.state.recordRemLenBytes != int64(remLenBytes) {
d.Fatalf("recordRemLenBytes = %d != remLenBytes = %d", wal.state.recordRemLenBytes, remLenBytes)
}
}

Expand All @@ -224,7 +227,7 @@ func decodeXLogPage(wal *walD, d *decode.D) {

// parted XLogRecord
if remLen > 0 {
if wal.record == nil {
if wal.state == nil {
// record of previous file
checkPosBytes := xLogPage.Pos() / 8
if checkPosBytes >= XLOG_BLCKSZ {
Expand All @@ -239,8 +242,8 @@ func decodeXLogPage(wal *walD, d *decode.D) {

pos2 := xLogPage.Pos()

if wal.record != nil {
wal.record.SeekAbs(pos1)
if wal.state != nil && wal.state.record != nil {
wal.state.record.SeekAbs(pos1)
}

xLogPage.SeekAbs(pos2)
Expand Down Expand Up @@ -282,15 +285,16 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
// can't read xl_tot_len on this page
// can't create row in this page
// continue on next page
wal.record = nil
wal.recordRemLenBytes = 0
wal.state = nil
return
}

d.SeekAbs(posBytes1Aligned * 8)

record := pageRecords.FieldStructValue("XLogRecord")
wal.record = record
wal.state = &walState{
record: record,
}
wal.records.AddChild(record.Value)

xlTotLen := record.FieldU32("xl_tot_len")
Expand All @@ -305,7 +309,7 @@ func decodeXLogRecords(wal *walD, d *decode.D) {
if remOnPage < int64(xlTotLen1Bytes) {
//record.FieldRawLen("xLogBody", remOnPage*8)
decodeXLogRecord(wal, remOnPage)
wal.recordRemLenBytes = int64(xlTotLen1Bytes) - remOnPage
wal.state.recordRemLenBytes = int64(xlTotLen1Bytes) - remOnPage
break
}

Expand All @@ -316,8 +320,7 @@ func decodeXLogRecords(wal *walD, d *decode.D) {

//record.FieldRawLen("xLogBody", xLogBodyLen)
decodeXLogRecord(wal, int64(xlTotLen1Bytes))
wal.record = nil
wal.recordRemLenBytes = 0
wal.state = nil
}
}

Expand Down Expand Up @@ -359,7 +362,7 @@ func fieldTryGetScalarActualU(d *decode.D, name string, posMax int64, bitsCount
}

func decodeXLogRecord(wal *walD, maxBytes int64) {
record := wal.record
record := wal.state.record

pos0 := record.Pos()
maxLen := maxBytes * 8
Expand Down Expand Up @@ -441,16 +444,6 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
return
}

if blockId == XLR_BLOCK_ID_DATA_SHORT {
//typedef struct XLogRecordDataHeaderShort
//{
// uint8 id; /* XLR_BLOCK_ID_DATA_SHORT */
// uint8 data_length; /* number of payload bytes */
//}
//
/* total size (bytes): 24 */
}

//XLR_BLOCK_ID_DATA_SHORT = 255
//XLR_BLOCK_ID_DATA_LONG = 254
//XLR_BLOCK_ID_ORIGIN = 253
Expand All @@ -460,6 +453,12 @@ func decodeXLogRecord(wal *walD, maxBytes int64) {
recordOrigin := uint64(0)
toplevelXid := uint64(0)
if blockId == XLR_BLOCK_ID_DATA_SHORT {
//typedef struct XLogRecordDataHeaderShort
//{
// uint8 id; /* XLR_BLOCK_ID_DATA_SHORT */
// uint8 data_length; /* number of payload bytes */
//}
//
// COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
mainDataLen, end = fieldTryGetScalarActualU(record, "main_data_len", posMax, 8)
if end {
Expand Down

0 comments on commit 09c42c3

Please sign in to comment.