Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions pkg/vm/engine/tae/db/gc/v3/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
)
return
}
var snapshots map[uint32]containers.Vector
var snapshots *logtail.SnapshotInfo
var pitrs *logtail.PitrInfo
pitrs, err = c.GetPITRsLocked(ctx)
if err != nil {
Expand All @@ -440,8 +440,6 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
)
return
}
accountSnapshots := TransformToTSList(snapshots)
logtail.CloseSnapshotList(snapshots)
_, sarg, _ := fault.TriggerFault("replay error UT")
if sarg != "" {
err = moerr.NewInternalErrorNoCtxf("GC-REPLAY-GET-CHECKPOINT-DATA-ERROR %s", sarg)
Expand All @@ -464,13 +462,12 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
ckpBatch,
c.mutation.snapshotMeta,
accountSnapshots,
snapshots,
pitrs,
0)
logutil.Info(
"GC-REPLAY-COLLECT-SNAPSHOT-SIZE",
zap.String("task", c.TaskNameLocked()),
zap.Int("size", len(accountSnapshots)),
zap.Duration("duration", time.Since(start)),
zap.String("checkpoint", compacted.String()),
zap.Int("count", ckpBatch.RowCount()),
Expand Down Expand Up @@ -777,7 +774,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
ctx context.Context,
checkpointLowWaterMark *types.TS,
memoryBuffer *containers.OneSchemaBatchBuffer,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
gcFileCount int,
) (err error) {
Expand Down Expand Up @@ -888,7 +885,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
newCkpData,
c.mutation.snapshotMeta,
accountSnapshots,
snapshots,
pitrs,
gcFileCount)
if newCkp == nil {
Expand Down Expand Up @@ -1069,10 +1066,9 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
memoryBuffer *containers.OneSchemaBatchBuffer,
) (err error) {
now := time.Now()
var snapshots map[uint32]containers.Vector
var snapshots *logtail.SnapshotInfo
var extraErrMsg string
defer func() {
logtail.CloseSnapshotList(snapshots)
logutil.Info(
"GC-TRACE-TRY-GC-AGAINST-GCKP",
zap.String("task", c.TaskNameLocked()),
Expand All @@ -1092,9 +1088,8 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
extraErrMsg = "GetSnapshot failed"
return
}
accountSnapshots := TransformToTSList(snapshots)
filesToGC, err := c.doGCAgainstGlobalCheckpointLocked(
ctx, gckp, accountSnapshots, pitrs, memoryBuffer,
ctx, gckp, snapshots, pitrs, memoryBuffer,
)
if err != nil {
extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed"
Expand Down Expand Up @@ -1132,7 +1127,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
waterMark = scanMark
}
err = c.mergeCheckpointFilesLocked(
ctx, &waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC),
ctx, &waterMark, memoryBuffer, snapshots, pitrs, len(filesToGC),
)
if err != nil {
extraErrMsg = fmt.Sprintf("mergeCheckpointFilesLocked %v failed", waterMark.ToString())
Expand All @@ -1145,7 +1140,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
ctx context.Context,
gckp *checkpoint.CheckpointEntry,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
memoryBuffer *containers.OneSchemaBatchBuffer,
) ([]string, error) {
Expand Down Expand Up @@ -1189,7 +1184,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
if filesToGC, metafile, err = scannedWindow.ExecuteGlobalCheckpointBasedGC(
ctx,
gckp,
accountSnapshots,
snapshots,
pitrs,
c.mutation.snapshotMeta,
iscp,
Expand Down Expand Up @@ -1232,7 +1227,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
now = time.Now()
// TODO:
c.updateGCWaterMark(gckp)
c.mutation.snapshotMeta.MergeTableInfo(accountSnapshots, pitrs)
c.mutation.snapshotMeta.MergeTableInfo(snapshots, pitrs)
mergeDuration = time.Since(now)
return filesToGC, nil
}
Expand Down Expand Up @@ -1324,7 +1319,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
// TODO
return err
}
var snapshots map[uint32]containers.Vector
var snapshots *logtail.SnapshotInfo
snapshots, err = c.GetSnapshotsLocked()
if err != nil {
logutil.Error(
Expand All @@ -1334,7 +1329,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
)
return err
}
defer logtail.CloseSnapshotList(snapshots)
var pitr *logtail.PitrInfo
pitr, err = c.GetPITRsLocked(c.ctx)
if err != nil {
Expand All @@ -1348,8 +1342,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {

mergeWindow := c.GetScannedWindowLocked().Clone()
defer mergeWindow.Close()

accoutSnapshots := TransformToTSList(snapshots)
logutil.Info(
"GC-TRACE-MERGE-WINDOW",
zap.String("task", c.TaskNameLocked()),
Expand All @@ -1362,7 +1354,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
if _, _, err = mergeWindow.ExecuteGlobalCheckpointBasedGC(
c.ctx,
gCkp,
accoutSnapshots,
snapshots,
pitr,
c.mutation.snapshotMeta,
iscp,
Expand All @@ -1386,7 +1378,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
if _, _, err = debugWindow.ExecuteGlobalCheckpointBasedGC(
c.ctx,
gCkp,
accoutSnapshots,
snapshots,
pitr,
c.mutation.snapshotMeta,
iscp,
Expand Down Expand Up @@ -1468,7 +1460,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
}
collectObjectsFromCheckpointData(c.ctx, ckpReader, cptCkpObjects)

tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(accoutSnapshots, pitr)
tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(snapshots, pitr)
for name, tables := range ickpObjects {
for _, entry := range tables {
if cptCkpObjects[name] != nil {
Expand Down Expand Up @@ -1840,12 +1832,13 @@ func (c *checkpointCleaner) mutUpdateSnapshotMetaLocked(
)
}

func (c *checkpointCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
func (c *checkpointCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
c.mutation.Lock()
defer c.mutation.Unlock()
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
}
func (c *checkpointCleaner) GetSnapshotsLocked() (map[uint32]containers.Vector, error) {

func (c *checkpointCleaner) GetSnapshotsLocked() (*logtail.SnapshotInfo, error) {
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
}
func (c *checkpointCleaner) GetTablePK(tid uint64) string {
Expand Down
52 changes: 26 additions & 26 deletions pkg/vm/engine/tae/db/gc/v3/exec_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ type CheckpointBasedGCJob struct {
coarseProbility float64
canGCCacheSize int
}
sourcer engine.BaseReader
snapshotMeta *logtail.SnapshotMeta
accountSnapshots map[uint32][]types.TS
iscpTables map[uint64]types.TS
pitr *logtail.PitrInfo
ts *types.TS
globalCkpLoc objectio.Location
globalCkpVer uint32
checkpointCli checkpoint.Runner // Added to access catalog
sourcer engine.BaseReader
snapshotMeta *logtail.SnapshotMeta
snapshots *logtail.SnapshotInfo
iscpTables map[uint64]types.TS
pitr *logtail.PitrInfo
ts *types.TS
globalCkpLoc objectio.Location
globalCkpVer uint32
checkpointCli checkpoint.Runner // Added to access catalog

result struct {
vecToGC *vector.Vector
Expand All @@ -85,7 +85,7 @@ func NewCheckpointBasedGCJob(
gckpVersion uint32,
sourcer engine.BaseReader,
pitr *logtail.PitrInfo,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
iscpTables map[uint64]types.TS,
snapshotMeta *logtail.SnapshotMeta,
checkpointCli checkpoint.Runner,
Expand All @@ -97,15 +97,15 @@ func NewCheckpointBasedGCJob(
opts ...GCJobExecutorOption,
) *CheckpointBasedGCJob {
e := &CheckpointBasedGCJob{
sourcer: sourcer,
snapshotMeta: snapshotMeta,
accountSnapshots: accountSnapshots,
pitr: pitr,
ts: ts,
globalCkpLoc: globalCkpLoc,
globalCkpVer: gckpVersion,
iscpTables: iscpTables,
checkpointCli: checkpointCli,
sourcer: sourcer,
snapshotMeta: snapshotMeta,
snapshots: snapshots,
pitr: pitr,
ts: ts,
globalCkpLoc: globalCkpLoc,
globalCkpVer: gckpVersion,
iscpTables: iscpTables,
checkpointCli: checkpointCli,
}
for _, opt := range opts {
opt(e)
Expand All @@ -121,7 +121,7 @@ func (e *CheckpointBasedGCJob) Close() error {
e.sourcer = nil
}
e.snapshotMeta = nil
e.accountSnapshots = nil
e.snapshots = nil
e.pitr = nil
e.ts = nil
e.globalCkpLoc = nil
Expand Down Expand Up @@ -171,7 +171,7 @@ func (e *CheckpointBasedGCJob) Execute(ctx context.Context) error {

fineFilter, err := MakeSnapshotAndPitrFineFilter(
e.ts,
e.accountSnapshots,
e.snapshots,
e.pitr,
e.snapshotMeta,
transObjects,
Expand Down Expand Up @@ -356,7 +356,7 @@ func buildTableExistenceMap(snapshotMeta *logtail.SnapshotMeta, checkpointCli ch

func MakeSnapshotAndPitrFineFilter(
ts *types.TS,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
snapshotMeta *logtail.SnapshotMeta,
transObjects map[string]map[uint64]*ObjectEntry,
Expand All @@ -373,7 +373,7 @@ func MakeSnapshotAndPitrFineFilter(
}

tableSnapshots, tablePitrs := snapshotMeta.AccountToTableSnapshots(
accountSnapshots,
snapshots,
pitrs,
)
return func(
Expand All @@ -393,7 +393,7 @@ func MakeSnapshotAndPitrFineFilter(
createTS := createTSs[i]
deleteTS := deleteTSs[i]

snapshots := tableSnapshots[tableID]
sp := tableSnapshots[tableID]
pitr := tablePitrs[tableID]

if transObjects[name] != nil {
Expand All @@ -407,7 +407,7 @@ func MakeSnapshotAndPitrFineFilter(
}

if !logtail.ObjectIsSnapshotRefers(
entry.stats, pitr, &entry.createTS, &entry.dropTS, snapshots,
entry.stats, pitr, &entry.createTS, &entry.dropTS, sp,
) {
if iscpTables == nil {
bm.Add(uint64(i))
Expand Down Expand Up @@ -436,7 +436,7 @@ func MakeSnapshotAndPitrFineFilter(
continue
}
if !logtail.ObjectIsSnapshotRefers(
&stats, pitr, &createTS, &deleteTS, snapshots,
&stats, pitr, &createTS, &deleteTS, sp,
) {
if iscpTables == nil {
bm.Add(uint64(i))
Expand Down
3 changes: 1 addition & 2 deletions pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
)
Expand Down Expand Up @@ -139,7 +138,7 @@ func (c *MockCleaner) GetMPool() *mpool.MPool {
return nil
}

func (c *MockCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
func (c *MockCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
return nil, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/vm/engine/tae/db/gc/v3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
)
Expand Down Expand Up @@ -148,7 +147,7 @@ type Cleaner interface {
DisableGC()
GCEnabled() bool
GetMPool() *mpool.MPool
GetSnapshots() (map[uint32]containers.Vector, error)
GetSnapshots() (*logtail.SnapshotInfo, error)
GetDetails(ctx context.Context) (map[uint32]*TableStats, error)
Verify(ctx context.Context) string
ISCPTables() (map[uint64]types.TS, error)
Expand Down
12 changes: 0 additions & 12 deletions pkg/vm/engine/tae/db/gc/v3/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
Expand Down Expand Up @@ -72,16 +70,6 @@ func MakeLoadFunc(
}, releaseFn
}

func TransformToTSList(
fromKV map[uint32]containers.Vector,
) map[uint32][]types.TS {
newKV := make(map[uint32][]types.TS, len(fromKV))
for k, v := range fromKV {
newKV[k] = vector.MustFixedColWithTypeCheck[types.TS](v.GetDownstreamVector())
}
return newKV
}

func MakeGCWindowBuffer(size int) *containers.OneSchemaBatchBuffer {
return containers.NewOneSchemaBatchBuffer(
size, ObjectTableAttrs, ObjectTableTypes, false,
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/gc/v3/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (w *GCWindow) MakeFilesReader(
func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
ctx context.Context,
gCkp *checkpoint.CheckpointEntry,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
snapshotMeta *logtail.SnapshotMeta,
iscpTables map[uint64]types.TS,
Expand All @@ -142,7 +142,7 @@ func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
gCkp.GetVersion(),
sourcer,
pitrs,
accountSnapshots,
snapshots,
iscpTables,
snapshotMeta,
checkpointCli,
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/db/gc/v3/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ func NewMockSnapshotMeta() *MockSnapshotMeta {

// AccountToTableSnapshots mocks the same method in logtail.SnapshotMeta
func (m *MockSnapshotMeta) AccountToTableSnapshots(
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
) (map[uint64][]types.TS, map[uint64][]types.TS) {
) (map[uint64][]types.TS, map[uint64]*types.TS) {
tableSnapshots := make(map[uint64][]types.TS)
tablePitrs := make(map[uint64][]types.TS)
tablePitrs := make(map[uint64]*types.TS)
return tableSnapshots, tablePitrs
}

Expand Down
Loading
Loading