diff --git a/pkg/container/types/txnts.go b/pkg/container/types/txnts.go index dcbf17df4c818..465c633c47fe8 100644 --- a/pkg/container/types/txnts.go +++ b/pkg/container/types/txnts.go @@ -145,6 +145,11 @@ func (ts TS) ToString() string { return fmt.Sprintf("%d-%d", ts.Physical(), ts.Logical()) } +// format interface +func (ts TS) String() string { + return ts.ToString() +} + func (ts TS) Valid() bool { return ts.Physical() >= 0 } diff --git a/pkg/container/vector/vector_test.go b/pkg/container/vector/vector_test.go index 3b0397e8146c1..2dce1d11762cd 100644 --- a/pkg/container/vector/vector_test.go +++ b/pkg/container/vector/vector_test.go @@ -1400,7 +1400,7 @@ func TestShuffle(t *testing.T) { require.NoError(t, err) v.Shuffle([]int64{1, 2}, mp) require.Equal(t, vs[1:3], MustFixedColWithTypeCheck[types.TS](v)) - require.Equal(t, "[[0 0 0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0 0 0]]", v.String()) + require.Equal(t, "[0-0 0-0]", v.String()) v.Free(mp) require.Equal(t, int64(0), mp.CurrNB()) } @@ -2893,7 +2893,7 @@ func TestRowToString(t *testing.T) { v := NewVec(types.T_TS.ToType()) err := AppendFixedList(v, vs, nil, mp) require.NoError(t, err) - require.Equal(t, "[0 0 0 0 0 0 0 0 0 0 0 0]", v.RowToString(1)) + require.Equal(t, "0-0", v.RowToString(1)) v.Free(mp) require.Equal(t, int64(0), mp.CurrNB()) } diff --git a/pkg/objectio/funcs.go b/pkg/objectio/funcs.go index 39dad720bedd3..e90f37c97f31d 100644 --- a/pkg/objectio/funcs.go +++ b/pkg/objectio/funcs.go @@ -147,38 +147,49 @@ func ReadOneBlockWithMeta( } var filledEntries []fileservice.IOEntry + putFillHolder := func(i int, seqnum uint16) { + if filledEntries == nil { + filledEntries = make([]fileservice.IOEntry, len(seqnums)) + } + filledEntries[i] = fileservice.IOEntry{ + Size: int64(seqnum), // a marker, it can not be zero + } + } + blkmeta := meta.GetBlockMeta(uint32(blk)) maxSeqnum := blkmeta.GetMaxSeqnum() for i, seqnum := range seqnums { // special columns if seqnum >= SEQNUM_UPPER { metaColCnt := blkmeta.GetMetaColumnCount() - // read appendable block file, the last columns is commits and abort - if seqnum == SEQNUM_COMMITTS { + switch seqnum { + case SEQNUM_COMMITTS: seqnum = metaColCnt - 1 - } else if seqnum == SEQNUM_ABORT { + case SEQNUM_ABORT: panic("not support") - } else { + default: panic(fmt.Sprintf("bad path to read special column %d", seqnum)) } + // if the last column is not commits, do not read it + // 1. created by cn + // 2. old version tn nonappendable block col := blkmeta.ColumnMeta(seqnum) - ext := col.Location() - ioVec.Entries = append(ioVec.Entries, fileservice.IOEntry{ - Offset: int64(ext.Offset()), - Size: int64(ext.Length()), - ToCacheData: factory(int64(ext.OriginSize()), ext.Alg()), - }) + if col.DataType() != uint8(types.T_TS) { + putFillHolder(i, seqnum) + } else { + ext := col.Location() + ioVec.Entries = append(ioVec.Entries, fileservice.IOEntry{ + Offset: int64(ext.Offset()), + Size: int64(ext.Length()), + ToCacheData: factory(int64(ext.OriginSize()), ext.Alg()), + }) + } continue } // need fill vector if seqnum > maxSeqnum || blkmeta.ColumnMeta(seqnum).DataType() == 0 { - if filledEntries == nil { - filledEntries = make([]fileservice.IOEntry, len(seqnums)) - } - filledEntries[i] = fileservice.IOEntry{ - Size: int64(seqnum), // a marker, it can not be zero - } + putFillHolder(i, seqnum) continue } diff --git a/pkg/vm/engine/tae/rpc/tool.go b/pkg/vm/engine/tae/rpc/tool.go index 36a5bd7a7b242..ccc2eebbc8e67 100644 --- a/pkg/vm/engine/tae/rpc/tool.go +++ b/pkg/vm/engine/tae/rpc/tool.go @@ -762,9 +762,12 @@ func (c *objGetArg) GetData(ctx context.Context) (res string, err error) { return } col := blk.ColumnMeta(idx) - col.ZoneMap() - idxs = append(idxs, idx) tp := types.T(col.DataType()).ToType() + if col.DataType() == uint8(types.T_TS) && i == len(c.cols)-1 { + idxs = append(idxs, objectio.SEQNUM_COMMITTS) + } else { + idxs = append(idxs, idx) + } typs = append(typs, tp) } diff --git a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go index 9b25a5274cea2..02abe0814d688 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go +++ b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go @@ -579,10 +579,8 @@ func (task *flushTableTailTask) mergeAObjs(ctx context.Context, isTombstone bool } seqnums = append(seqnums, def.SeqNum) } - if isTombstone { - readColIdxs = append(readColIdxs, objectio.SEQNUM_COMMITTS) - seqnums = append(seqnums, objectio.SEQNUM_COMMITTS) - } + readColIdxs = append(readColIdxs, objectio.SEQNUM_COMMITTS) + seqnums = append(seqnums, objectio.SEQNUM_COMMITTS) // read from aobjects readedBats := make([]*containers.Batch, 0, len(objHandles)) diff --git a/pkg/vm/engine/tae/tables/jobs/mergeobjects.go b/pkg/vm/engine/tae/tables/jobs/mergeobjects.go index 633b377166f80..e7a3ce9108850 100644 --- a/pkg/vm/engine/tae/tables/jobs/mergeobjects.go +++ b/pkg/vm/engine/tae/tables/jobs/mergeobjects.go @@ -152,10 +152,8 @@ func NewMergeObjectsTask( task.idxs = append(task.idxs, def.Idx) task.attrs = append(task.attrs, def.Name) } - if isTombstone { - task.idxs = append(task.idxs, objectio.SEQNUM_COMMITTS) - task.attrs = append(task.attrs, objectio.TombstoneAttr_CommitTs_Attr) - } + task.idxs = append(task.idxs, objectio.SEQNUM_COMMITTS) + task.attrs = append(task.attrs, objectio.TombstoneAttr_CommitTs_Attr) task.BaseTask = tasks.NewBaseTask(task, tasks.DataCompactionTask, ctx) if task.GetTotalSize() > 300*common.Const1MBytes { @@ -369,9 +367,7 @@ func (task *mergeObjectsTask) PrepareNewWriter() *ioutil.BlockWriter { } seqnums = append(seqnums, def.SeqNum) } - if task.isTombstone { - seqnums = append(seqnums, objectio.SEQNUM_COMMITTS) - } + seqnums = append(seqnums, objectio.SEQNUM_COMMITTS) sortkeyIsPK := false sortkeyPos := -1 diff --git a/pkg/vm/engine/tae/tables/pnode.go b/pkg/vm/engine/tae/tables/pnode.go index a53008a8bec9e..1a197b75f0ea8 100644 --- a/pkg/vm/engine/tae/tables/pnode.go +++ b/pkg/vm/engine/tae/tables/pnode.go @@ -21,12 +21,14 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" + "go.uber.org/zap" ) var _ NodeT = (*persistedNode)(nil) @@ -102,6 +104,20 @@ func (node *persistedNode) Scan( vecs, deletes, err := LoadPersistedColumnDatas( ctx, readSchema, node.object.rt, id, colIdxes, location, mp, tsForAppendable, ) + replaceCommitts := func(vecs []containers.Vector, i int) { + stats := node.object.meta.Load().GetObjectStats() + createTS := node.object.meta.Load().GetCreatedAt() + length := vecs[0].Length() + logutil.Info("committs replace", + zap.Bool("createdByCN", stats.GetCNCreated()), + zap.Bool("appendable", stats.GetAppendable()), + zap.String("createdTS", createTS.String()), + zap.Int("length", length), + ) + vecs[i].Close() + vecs[i] = node.object.rt.VectorPool.Transient.GetVector(&objectio.TSType) + vector.AppendMultiFixed(vecs[i].GetDownstreamVector(), createTS, false, length, mp) + } if err != nil { return err } @@ -113,12 +129,10 @@ func (node *persistedNode) Scan( var attr string if idx == objectio.SEQNUM_COMMITTS { attr = objectio.TombstoneAttr_CommitTs_Attr - if vecs[i].GetType().Oid != types.T_TS { - vecs[i].Close() - vecs[i] = node.object.rt.VectorPool.Transient.GetVector(&objectio.TSType) - createTS := node.object.meta.Load().GetCreatedAt() - vector.AppendMultiFixed(vecs[i].GetDownstreamVector(), createTS, false, vecs[0].Length(), mp) + if vecs[i].IsConstNull() { + replaceCommitts(vecs, i) } + /// TODO: Read old version of nonappendable block? } else { attr = readSchema.ColDefs[idx].Name } @@ -137,11 +151,8 @@ func (node *persistedNode) Scan( var attr string if idx == objectio.SEQNUM_COMMITTS { attr = objectio.TombstoneAttr_CommitTs_Attr - if vecs[i].GetType().Oid != types.T_TS { - vecs[i].Close() - vecs[i] = node.object.rt.VectorPool.Transient.GetVector(&objectio.TSType) - createTS := node.object.meta.Load().GetCreatedAt() - vector.AppendMultiFixed(vecs[i].GetDownstreamVector(), createTS, false, vecs[0].Length(), mp) + if vecs[i].IsConstNull() { + replaceCommitts(vecs, i) } } else { attr = readSchema.ColDefs[idx].Name diff --git a/pkg/vm/engine/tae/tables/utils.go b/pkg/vm/engine/tae/tables/utils.go index 1a1e94ab496a1..ce81ba07c84a4 100644 --- a/pkg/vm/engine/tae/tables/utils.go +++ b/pkg/vm/engine/tae/tables/utils.go @@ -88,11 +88,15 @@ func LoadPersistedColumnDatas( typs := make([]types.Type, 0) vectors := make([]containers.Vector, len(colIdxs)) phyAddIdx := -1 + committsIdx := -1 + assignedCommitts := false var deletes *nulls.Nulls for i, colIdx := range colIdxs { if colIdx == objectio.SEQNUM_COMMITTS { cols = append(cols, objectio.SEQNUM_COMMITTS) typs = append(typs, objectio.TSType) + committsIdx = i + assignedCommitts = true continue } def := schema.ColDefs[colIdx] @@ -113,10 +117,15 @@ func LoadPersistedColumnDatas( } if tsForAppendable != nil { deletes = nulls.NewWithSize(1024) - cols = append(cols, objectio.SEQNUM_COMMITTS) - defer func() { - cols = cols[:len(cols)-1] - }() + if committsIdx == -1 { + cols = append(cols, objectio.SEQNUM_COMMITTS) + typs = append(typs, types.T_TS.ToType()) + committsIdx = len(cols) - 1 + defer func() { + cols = cols[:len(cols)-1] + typs = typs[:len(typs)-1] + }() + } } var vecs []containers.Vector var err error @@ -134,14 +143,16 @@ func LoadPersistedColumnDatas( return nil, deletes, err } if tsForAppendable != nil { - commits := vector.MustFixedColNoTypeCheck[types.TS](vecs[len(vecs)-1].GetDownstreamVector()) - for i := 0; i < len(commits); i++ { + commits := vector.MustFixedColNoTypeCheck[types.TS](vecs[committsIdx].GetDownstreamVector()) + for i := range commits { if commits[i].GT(tsForAppendable) { deletes.Add(uint64(i)) } } - vecs[len(vecs)-1].Close() - vecs = vecs[:len(vecs)-1] + if !assignedCommitts { + vecs[committsIdx].Close() + vecs = vecs[:len(vecs)-1] + } } for i, vec := range vecs { idx := i