Skip to content

Commit

Permalink
PostgreSQL heap page parser implememtation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Safonov committed Jul 29, 2022
1 parent bebdfa9 commit 51878dc
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 28 deletions.
1 change: 1 addition & 0 deletions format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const (
PGMULTIXACTOFF = "pgmultixact_offsets"
PGMULTIXACTMEM = "pgmultixact_members"
PG_CONTROL = "pg_control"
PGHEAP = "pgheap"
PNG = "png"
PROTOBUF = "protobuf"
PROTOBUF_WIDEVINE = "protobuf_widevine"
Expand Down
10 changes: 10 additions & 0 deletions format/postgres/common/pg_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,13 @@ func (m versionMapper) MapScalar(s scalar.S) (scalar.S, error) {
}

var VersionMapper = versionMapper{}

type hexMapper struct{}

func (m hexMapper) MapScalar(s scalar.S) (scalar.S, error) {
v := s.ActualU()
s.Sym = fmt.Sprintf("%X", v)
return s, nil
}

var HexMapper = hexMapper{}
39 changes: 39 additions & 0 deletions format/postgres/common/pgheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package common

import (
"github.com/wader/fq/pkg/scalar"
)

const (
HeapPageSize = 8192
)

type lpOffMapper struct{}

func (m lpOffMapper) MapScalar(s scalar.S) (scalar.S, error) {
v := s.ActualU() & 0x7fff
s.Actual = v
return s, nil
}

var LpOffMapper = lpOffMapper{}

type lpFlagsMapper struct{}

func (m lpFlagsMapper) MapScalar(s scalar.S) (scalar.S, error) {
v := (s.ActualU() >> 15) & 0x3
s.Actual = v
return s, nil
}

var LpFlagsMapper = lpFlagsMapper{}

type lpLenMapper struct{}

func (m lpLenMapper) MapScalar(s scalar.S) (scalar.S, error) {
v := (s.ActualU() >> 17) & 0x7fff
s.Actual = v
return s, nil
}

var LpLenMapper = lpLenMapper{}
13 changes: 13 additions & 0 deletions format/postgres/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

func TypeAlign(alignVal uint64, alignLen uint64) uint64 {
return (alignLen + alignVal - 1) & ^(alignVal - 1)
}

func TypeAlign8(alignLen uint64) uint64 {
return TypeAlign(8, alignLen)
}

func RoundDown(alignVal uint64, alignLen uint64) uint64 {
return (alignLen / alignVal) * alignVal
}
44 changes: 44 additions & 0 deletions format/postgres/common/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common_test

import (
"github.com/wader/fq/format/postgres/common"
"testing"
)

func TestTypeAlign8(t *testing.T) {
expected39 := common.TypeAlign8(39)
if expected39 != 40 {
t.Errorf("must be 40\n")
}
expected41 := common.TypeAlign8(41)
if expected41 != 48 {
t.Errorf("must be 40\n")
}
}

func TestRoundDown(t *testing.T) {
const pageSize1 = 8192
expected1 := common.RoundDown(pageSize1, 7*pageSize1+35)
if expected1 != 7*pageSize1 {
t.Errorf("must be %d\n", 7*pageSize1)
}
expected2 := common.RoundDown(pageSize1, 7*pageSize1-1)
if expected2 != 6*pageSize1 {
t.Errorf("must be %d\n", 6*pageSize1)
}

const pageSize2 = 7744
expected3 := common.RoundDown(pageSize2, 15*pageSize2+61)
if expected3 != 15*pageSize2 {
t.Errorf("must be %d\n", 15*pageSize2)
}
expected4 := common.RoundDown(pageSize2, 3*pageSize2-15)
if expected4 != 2*pageSize2 {
t.Errorf("must be %d\n", 2*pageSize2)
}

expected5 := common.RoundDown(pageSize1, 5*pageSize1)
if expected5 != 5*pageSize1 {
t.Errorf("must be %d\n", 5*pageSize1)
}
}
135 changes: 135 additions & 0 deletions format/postgres/flavours/postgres14/pgheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package postgres14

import (
"context"
"github.com/wader/fq/format/postgres/common"
"github.com/wader/fq/pkg/decode"
)

// type = struct PageHeaderData
/* 0 | 8 */ // PageXLogRecPtr pd_lsn;
/* 8 | 2 */ // uint16 pd_checksum;
/* 10 | 2 */ // uint16 pd_flags;
/* 12 | 2 */ // LocationIndex pd_lower;
/* 14 | 2 */ // LocationIndex pd_upper;
/* 16 | 2 */ // LocationIndex pd_special;
/* 18 | 2 */ // uint16 pd_pagesize_version;
/* 20 | 4 */ // TransactionId pd_prune_xid;
/* 24 | 0 */ // ItemIdData pd_linp[];
//
/* total size (bytes): 24 */

// type = struct PageXLogRecPtr {
/* 0 | 4 */ // uint32 xlogid;
/* 4 | 4 */ // uint32 xrecoff;

/* total size (bytes): 8 */

// type = struct ItemIdData {
/* 0: 0 | 4 */ // unsigned int lp_off: 15
/* 1: 7 | 4 */ // unsigned int lp_flags: 2
/* 2: 1 | 4 */ // unsigned int lp_len: 15

/* total size (bytes): 4 */

// typedef uint16 LocationIndex;
// #define SizeOfPageHeaderData (offsetof(PageHeaderData, pd_linp))

type heapD struct {
pageSize uint64

// current page
page *heapPageD
}

type heapPageD struct {
pdLower uint16
pdUpper uint16
pdSpecial uint16
pd_pagesize_version uint16
}

func getHeapD(d *decode.D) *heapD {
val := d.Ctx.Value("heap")
return val.(*heapD)
}

func DecodeHeap(d *decode.D) any {
heap := &heapD{
pageSize: common.HeapPageSize,
}
parentCtx := d.Ctx
ctx := context.WithValue(parentCtx, "heap", heap)
d.Ctx = ctx

d.SeekAbs(0)
d.FieldArray("Pages", decodeHeapPages)

return nil
}

func decodeHeapPages(d *decode.D) {
heap := getHeapD(d)
page := &heapPageD{}
heap.page = page

pagePosBegin := common.RoundDown(heap.pageSize, uint64(d.Pos()/8))

for {
d.FieldStruct("HeapPage", func(d *decode.D) {
/* 0 | 8 */ // PageXLogRecPtr pd_lsn;
/* 8 | 2 */ // uint16 pd_checksum;
/* 10 | 2 */ // uint16 pd_flags;
/* 12 | 2 */ // LocationIndex pd_lower;
/* 14 | 2 */ // LocationIndex pd_upper;
/* 16 | 2 */ // LocationIndex pd_special;
/* 18 | 2 */ // uint16 pd_pagesize_version;
/* 20 | 4 */ // TransactionId pd_prune_xid;
/* 24 | 0 */ // ItemIdData pd_linp[];
d.FieldStruct("PageHeaderData", func(d *decode.D) {
d.FieldStruct("pd_lsn", func(d *decode.D) {
/* 0 | 4 */ // uint32 xlogid;
/* 4 | 4 */ // uint32 xrecoff;
d.FieldU32("xlogid", common.HexMapper)
d.FieldU32("xrecoff", common.HexMapper)
})
d.FieldU16("pd_checksum")
d.FieldU16("pd_flags")
page.pdLower = uint16(d.FieldU16("pd_lower"))
page.pdUpper = uint16(d.FieldU16("pd_upper"))
page.pdSpecial = uint16(d.FieldU16("pd_special"))
page.pd_pagesize_version = uint16(d.FieldU16("pd_pagesize_version"))
d.FieldU32("pd_prune_xid")

// ItemIdData pd_linp[];
itemsEnd := int64(pagePosBegin*8) + int64(page.pdLower*8)
d.FieldArray("pd_linp", func(d *decode.D) {
for {
checkPos := d.Pos()
if checkPos >= itemsEnd {
break
}
/* 0: 0 | 4 */ // unsigned int lp_off: 15
/* 1: 7 | 4 */ // unsigned int lp_flags: 2
/* 2: 1 | 4 */ // unsigned int lp_len: 15
d.FieldStruct("ItemIdData", func(d *decode.D) {
itemPos := d.Pos()
d.FieldU32("lp_off", common.LpOffMapper)
d.SeekAbs(itemPos)
d.FieldU32("lp_flags", common.LpFlagsMapper)
d.SeekAbs(itemPos)
d.FieldU32("lp_len", common.LpLenMapper)
})
} // for pd_linp
}) // pd_linp in PageHeaderData

}) // PageHeaderData, PageHeader

// end of page
endLen := uint64(d.Pos() / 8)
pageEnd := common.TypeAlign(heap.pageSize, endLen)
d.SeekAbs(int64(pageEnd) * 8)
}) // HeapPage

} // for Heap pages
}
15 changes: 4 additions & 11 deletions format/postgres/flavours/postgres14/pgwal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgres14
import (
"context"
"fmt"
"github.com/wader/fq/format/postgres/common"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)
Expand Down Expand Up @@ -151,14 +152,6 @@ func DecodePgwal(d *decode.D, in any) any {
return nil
}

func TypeAlign(alignVal uint64, alignLen uint64) uint64 {
return (alignLen + alignVal - 1) & ^(alignVal - 1)
}

func TypeAlign8(alignLen uint64) uint64 {
return TypeAlign(8, alignLen)
}

func decodeXLogPage(d *decode.D) {

wal := getWalD(d)
Expand Down Expand Up @@ -193,7 +186,7 @@ func decodeXLogPage(d *decode.D) {

record := wal.record
if record == nil {
rawLen := int64(TypeAlign8(remLen))
rawLen := int64(common.TypeAlign8(remLen))
page.FieldRawLen("prev_file_rec", rawLen*8)
}

Expand All @@ -214,7 +207,7 @@ func decodeXLogRecords(d *decode.D) {
pageRecords := wal.pageRecords

pos := d.Pos() / 8
posMaxOfPage := int64(TypeAlign(8192, uint64(pos)))
posMaxOfPage := int64(common.TypeAlign(8192, uint64(pos)))
fmt.Printf("posMaxOfPage = %d\n", posMaxOfPage)

for {
Expand Down Expand Up @@ -269,7 +262,7 @@ func decodeXLogRecords(d *decode.D) {

xLogRecordBodyLen := xlTotLen - uint64(sizeOfXLogRecord)

rawLen := int64(TypeAlign8(xLogRecordBodyLen))
rawLen := int64(common.TypeAlign8(xLogRecordBodyLen))
record.FieldRawLen("xLogBody", rawLen*8)
})

Expand Down
17 changes: 0 additions & 17 deletions format/postgres/flavours/postgres14/pgwal_test.go

This file was deleted.

33 changes: 33 additions & 0 deletions format/postgres/pgheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package postgres

import (
"github.com/wader/fq/format"
"github.com/wader/fq/format/postgres/flavours/postgres14"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/interp"
)

func init() {
interp.RegisterFormat(decode.Format{
Name: format.PGHEAP,
Description: "PostgreSQL heap file",
DecodeFn: decodePgheap,
DecodeInArg: format.PostgresIn{
Flavour: "default",
},
})
}

func decodePgheap(d *decode.D, in any) any {
d.Endian = decode.LittleEndian

flavour := in.(format.PostgresIn).Flavour
switch flavour {
case PG_FLAVOUR_POSTGRES14, PG_FLAVOUR_POSTGRES:
return postgres14.DecodeHeap(d)
default:
break
}

return postgres14.DecodeHeap(d)
}

0 comments on commit 51878dc

Please sign in to comment.