Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/container/types/txnts.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ func (ts TS) ToString() string {
return fmt.Sprintf("%d-%d", ts.Physical(), ts.Logical())
}

// format interface
func (ts TS) String() string {
return ts.ToString()
}

func (ts TS) Valid() bool {
return ts.Physical() >= 0
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/container/vector/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ func TestShuffle(t *testing.T) {
require.NoError(t, err)
v.Shuffle([]int64{1, 2}, mp)
require.Equal(t, vs[1:3], MustFixedColWithTypeCheck[types.TS](v))
require.Equal(t, "[[0 0 0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0 0 0]]", v.String())
require.Equal(t, "[0-0 0-0]", v.String())
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
}
Expand Down Expand Up @@ -2893,7 +2893,7 @@ func TestRowToString(t *testing.T) {
v := NewVec(types.T_TS.ToType())
err := AppendFixedList(v, vs, nil, mp)
require.NoError(t, err)
require.Equal(t, "[0 0 0 0 0 0 0 0 0 0 0 0]", v.RowToString(1))
require.Equal(t, "0-0", v.RowToString(1))
v.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
}
Expand Down
43 changes: 27 additions & 16 deletions pkg/objectio/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,38 +147,49 @@ func ReadOneBlockWithMeta(
}

var filledEntries []fileservice.IOEntry
putFillHolder := func(i int, seqnum uint16) {
if filledEntries == nil {
filledEntries = make([]fileservice.IOEntry, len(seqnums))
}
filledEntries[i] = fileservice.IOEntry{
Size: int64(seqnum), // a marker, it can not be zero
}
}

blkmeta := meta.GetBlockMeta(uint32(blk))
maxSeqnum := blkmeta.GetMaxSeqnum()
for i, seqnum := range seqnums {
// special columns
if seqnum >= SEQNUM_UPPER {
metaColCnt := blkmeta.GetMetaColumnCount()
// read appendable block file, the last columns is commits and abort
if seqnum == SEQNUM_COMMITTS {
switch seqnum {
case SEQNUM_COMMITTS:
seqnum = metaColCnt - 1
} else if seqnum == SEQNUM_ABORT {
case SEQNUM_ABORT:
panic("not support")
} else {
default:
panic(fmt.Sprintf("bad path to read special column %d", seqnum))
}
// if the last column is not commits, do not read it
// 1. created by cn
// 2. old version tn nonappendable block
col := blkmeta.ColumnMeta(seqnum)
ext := col.Location()
ioVec.Entries = append(ioVec.Entries, fileservice.IOEntry{
Offset: int64(ext.Offset()),
Size: int64(ext.Length()),
ToCacheData: factory(int64(ext.OriginSize()), ext.Alg()),
})
if col.DataType() != uint8(types.T_TS) {
putFillHolder(i, seqnum)
} else {
ext := col.Location()
ioVec.Entries = append(ioVec.Entries, fileservice.IOEntry{
Offset: int64(ext.Offset()),
Size: int64(ext.Length()),
ToCacheData: factory(int64(ext.OriginSize()), ext.Alg()),
})
}
continue
}

// need fill vector
if seqnum > maxSeqnum || blkmeta.ColumnMeta(seqnum).DataType() == 0 {
if filledEntries == nil {
filledEntries = make([]fileservice.IOEntry, len(seqnums))
}
filledEntries[i] = fileservice.IOEntry{
Size: int64(seqnum), // a marker, it can not be zero
}
putFillHolder(i, seqnum)
continue
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/vm/engine/tae/rpc/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,9 +762,12 @@ func (c *objGetArg) GetData(ctx context.Context) (res string, err error) {
return
}
col := blk.ColumnMeta(idx)
col.ZoneMap()
idxs = append(idxs, idx)
tp := types.T(col.DataType()).ToType()
if col.DataType() == uint8(types.T_TS) && i == len(c.cols)-1 {
idxs = append(idxs, objectio.SEQNUM_COMMITTS)
} else {
idxs = append(idxs, idx)
}
typs = append(typs, tp)
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/vm/engine/tae/tables/jobs/flushTableTail.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,8 @@ func (task *flushTableTailTask) mergeAObjs(ctx context.Context, isTombstone bool
}
seqnums = append(seqnums, def.SeqNum)
}
if isTombstone {
readColIdxs = append(readColIdxs, objectio.SEQNUM_COMMITTS)
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)
}
readColIdxs = append(readColIdxs, objectio.SEQNUM_COMMITTS)
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)

// read from aobjects
readedBats := make([]*containers.Batch, 0, len(objHandles))
Expand Down
10 changes: 3 additions & 7 deletions pkg/vm/engine/tae/tables/jobs/mergeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,8 @@ func NewMergeObjectsTask(
task.idxs = append(task.idxs, def.Idx)
task.attrs = append(task.attrs, def.Name)
}
if isTombstone {
task.idxs = append(task.idxs, objectio.SEQNUM_COMMITTS)
task.attrs = append(task.attrs, objectio.TombstoneAttr_CommitTs_Attr)
}
task.idxs = append(task.idxs, objectio.SEQNUM_COMMITTS)
task.attrs = append(task.attrs, objectio.TombstoneAttr_CommitTs_Attr)
task.BaseTask = tasks.NewBaseTask(task, tasks.DataCompactionTask, ctx)

if task.GetTotalSize() > 300*common.Const1MBytes {
Expand Down Expand Up @@ -369,9 +367,7 @@ func (task *mergeObjectsTask) PrepareNewWriter() *ioutil.BlockWriter {
}
seqnums = append(seqnums, def.SeqNum)
}
if task.isTombstone {
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)
}
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)
sortkeyIsPK := false
sortkeyPos := -1

Expand Down
31 changes: 21 additions & 10 deletions pkg/vm/engine/tae/tables/pnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"go.uber.org/zap"
)

var _ NodeT = (*persistedNode)(nil)
Expand Down Expand Up @@ -102,6 +104,20 @@ func (node *persistedNode) Scan(
vecs, deletes, err := LoadPersistedColumnDatas(
ctx, readSchema, node.object.rt, id, colIdxes, location, mp, tsForAppendable,
)
replaceCommitts := func(vecs []containers.Vector, i int) {
stats := node.object.meta.Load().GetObjectStats()
createTS := node.object.meta.Load().GetCreatedAt()
length := vecs[0].Length()
logutil.Info("committs replace",
zap.Bool("createdByCN", stats.GetCNCreated()),
zap.Bool("appendable", stats.GetAppendable()),
zap.String("createdTS", createTS.String()),
zap.Int("length", length),
)
vecs[i].Close()
vecs[i] = node.object.rt.VectorPool.Transient.GetVector(&objectio.TSType)
vector.AppendMultiFixed(vecs[i].GetDownstreamVector(), createTS, false, length, mp)
}
if err != nil {
return err
}
Expand All @@ -113,12 +129,10 @@ func (node *persistedNode) Scan(
var attr string
if idx == objectio.SEQNUM_COMMITTS {
attr = objectio.TombstoneAttr_CommitTs_Attr
if vecs[i].GetType().Oid != types.T_TS {
vecs[i].Close()
vecs[i] = node.object.rt.VectorPool.Transient.GetVector(&objectio.TSType)
createTS := node.object.meta.Load().GetCreatedAt()
vector.AppendMultiFixed(vecs[i].GetDownstreamVector(), createTS, false, vecs[0].Length(), mp)
if vecs[i].IsConstNull() {
replaceCommitts(vecs, i)
}
/// TODO: Read old version of nonappendable block?
} else {
attr = readSchema.ColDefs[idx].Name
}
Expand All @@ -137,11 +151,8 @@ func (node *persistedNode) Scan(
var attr string
if idx == objectio.SEQNUM_COMMITTS {
attr = objectio.TombstoneAttr_CommitTs_Attr
if vecs[i].GetType().Oid != types.T_TS {
vecs[i].Close()
vecs[i] = node.object.rt.VectorPool.Transient.GetVector(&objectio.TSType)
createTS := node.object.meta.Load().GetCreatedAt()
vector.AppendMultiFixed(vecs[i].GetDownstreamVector(), createTS, false, vecs[0].Length(), mp)
if vecs[i].IsConstNull() {
replaceCommitts(vecs, i)
}
} else {
attr = readSchema.ColDefs[idx].Name
Expand Down
27 changes: 19 additions & 8 deletions pkg/vm/engine/tae/tables/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ func LoadPersistedColumnDatas(
typs := make([]types.Type, 0)
vectors := make([]containers.Vector, len(colIdxs))
phyAddIdx := -1
committsIdx := -1
assignedCommitts := false
var deletes *nulls.Nulls
for i, colIdx := range colIdxs {
if colIdx == objectio.SEQNUM_COMMITTS {
cols = append(cols, objectio.SEQNUM_COMMITTS)
typs = append(typs, objectio.TSType)
committsIdx = i
assignedCommitts = true
continue
}
def := schema.ColDefs[colIdx]
Expand All @@ -113,10 +117,15 @@ func LoadPersistedColumnDatas(
}
if tsForAppendable != nil {
deletes = nulls.NewWithSize(1024)
cols = append(cols, objectio.SEQNUM_COMMITTS)
defer func() {
cols = cols[:len(cols)-1]
}()
if committsIdx == -1 {
cols = append(cols, objectio.SEQNUM_COMMITTS)
typs = append(typs, types.T_TS.ToType())
committsIdx = len(cols) - 1
defer func() {
cols = cols[:len(cols)-1]
typs = typs[:len(typs)-1]
}()
}
}
var vecs []containers.Vector
var err error
Expand All @@ -134,14 +143,16 @@ func LoadPersistedColumnDatas(
return nil, deletes, err
}
if tsForAppendable != nil {
commits := vector.MustFixedColNoTypeCheck[types.TS](vecs[len(vecs)-1].GetDownstreamVector())
for i := 0; i < len(commits); i++ {
commits := vector.MustFixedColNoTypeCheck[types.TS](vecs[committsIdx].GetDownstreamVector())
for i := range commits {
if commits[i].GT(tsForAppendable) {
deletes.Add(uint64(i))
}
}
vecs[len(vecs)-1].Close()
vecs = vecs[:len(vecs)-1]
if !assignedCommitts {
vecs[committsIdx].Close()
vecs = vecs[:len(vecs)-1]
}
}
for i, vec := range vecs {
idx := i
Expand Down
Loading