Skip to content

Commit

Permalink
Fix metadata commit sequence bug (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuPeng-SH committed Nov 4, 2021
1 parent 459d923 commit d8f0ebf
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 28 deletions.
8 changes: 2 additions & 6 deletions pkg/vm/engine/aoe/storage/db/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ func TestDBReplay(t *testing.T) {
testutils.WaitExpect(200, func() bool {
return uint64(insertCnt) == inst.GetShardCheckpointId(0)
})
segmentedIdx, err := inst.GetSegmentedId(*dbi.NewTabletSegmentedIdCtx(tableInfo.Name))
assert.Nil(t, err)
segmentedIdx := inst.GetShardCheckpointId(0)
t.Logf("SegmentedIdx: %d", segmentedIdx)
assert.Equal(t, uint64(insertCnt), segmentedIdx)
assert.Equal(t, uint64(insertCnt), inst.GetShardCheckpointId(0))

t.Logf("Row count: %d", tbl.GetRowCount())
assert.Equal(t, rows*uint64(insertCnt), tbl.GetRowCount())
Expand Down Expand Up @@ -168,11 +166,9 @@ func TestDBReplay(t *testing.T) {
return preSegmentedIdx+uint64(insertCnt)-1 == inst.GetShardCheckpointId(0)
})

segmentedIdx, err = inst.GetSegmentedId(*dbi.NewTabletSegmentedIdCtx(tableInfo.Name))
assert.Nil(t, err)
segmentedIdx = inst.GetShardCheckpointId(0)
t.Logf("SegmentedIdx: %d", segmentedIdx)
assert.Equal(t, preSegmentedIdx+uint64(insertCnt)-1, segmentedIdx)
assert.Equal(t, preSegmentedIdx+uint64(insertCnt)-1, inst.GetShardCheckpointId(0))

inst.Close()
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/vm/engine/aoe/storage/db/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ func TestReplay1(t *testing.T) {
inst = initDB(wal.BrokerRole)

t.Log(inst.Store.Catalog.PString(metadata.PPL1))
segmentedIdx, err := inst.GetSegmentedId(*dbi.NewTabletSegmentedIdCtx(meta.Schema.Name))
assert.Nil(t, err)
meta = inst.Opts.Meta.Catalog.SimpleGetTable(tid)
segmentedIdx := inst.GetShardCheckpointId(0)
assert.Equal(t, common.GetGlobalSeqNum(), segmentedIdx)
assert.Equal(t, common.GetGlobalSeqNum(), inst.GetShardCheckpointId(0))

meta = inst.Opts.Meta.Catalog.SimpleGetTable(tid)

rel, err = inst.Relation(meta.Schema.Name)
assert.Nil(t, err)
Expand Down Expand Up @@ -941,10 +940,8 @@ func TestReplay12(t *testing.T) {
assert.Equal(t, stat1.GetSize(), stat2.GetSize())
assert.Equal(t, stat1.GetCount(), stat2.GetCount())

segmentedIdx, err := inst.GetSegmentedId(*dbi.NewTabletSegmentedIdCtx(meta.Schema.Name))
assert.Nil(t, err)
segmentedIdx := inst.GetShardCheckpointId(0)
assert.Equal(t, common.GetGlobalSeqNum()-1, segmentedIdx)
assert.Equal(t, common.GetGlobalSeqNum()-1, inst.GetShardCheckpointId(0))

rel, err = inst.Relation(meta.Schema.Name)
t.Log(rel.Rows())
Expand Down
5 changes: 3 additions & 2 deletions pkg/vm/engine/aoe/storage/logstore/rotational.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (
"fmt"
"io"
"io/ioutil"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/common"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/common"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion pkg/vm/engine/aoe/storage/logstore/syncbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package logstore

import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/common"
"sync/atomic"

"github.com/matrixorigin/matrixone/pkg/vm/engine/aoe/storage/common"
)

type syncBase struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/vm/engine/aoe/storage/metadata/v1/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,6 @@ func (catalog *Catalog) prepareCreateTable(ctx *createTableCtx) (LogEntry, error
var err error
entry := NewTableEntry(catalog, ctx.schema, ctx.tranId, ctx.exIndex)
logEntry := entry.ToLogEntry(ETCreateTable)
catalog.commitMu.Lock()
defer catalog.commitMu.Unlock()
catalog.Lock()
if err = catalog.onNewTable(entry); err != nil {
catalog.Unlock()
Expand Down Expand Up @@ -622,6 +620,8 @@ func (catalog *Catalog) onCommitRequest(ctx interface{}) error {
}

func (catalog *Catalog) prepareCommitLog(entry IEntry, logEntry LogEntry) {
catalog.commitMu.Lock()
defer catalog.commitMu.Unlock()
commitId := catalog.NextCommitId()
entry.Lock()
entry.CommitLocked(commitId)
Expand All @@ -634,6 +634,8 @@ func (catalog *Catalog) prepareCommitLog(entry IEntry, logEntry LogEntry) {
}

func (catalog *Catalog) prepareCommitEntry(entry IEntry, eType LogEntryType, locker sync.Locker) LogEntry {
catalog.commitMu.Lock()
defer catalog.commitMu.Unlock()
commitId := catalog.NextCommitId()
var logEntry LogEntry
if locker == nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/vm/engine/aoe/storage/metadata/v1/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ func (e *Segment) prepareCreateBlock(ctx *createBlockCtx) (LogEntry, error) {
e.Lock()
e.onNewBlock(be)
e.Unlock()
e.Table.Catalog.commitMu.Lock()
defer e.Table.Catalog.commitMu.Unlock()
e.Table.Catalog.prepareCommitLog(be, logEntry)
ctx.block = be
return logEntry, nil
Expand Down Expand Up @@ -396,8 +394,6 @@ func (e *Segment) prepareUpgrade(ctx *upgradeSegmentCtx) (LogEntry, error) {
}
}
e.onNewCommit(cInfo)
e.Table.Catalog.commitMu.Lock()
defer e.Table.Catalog.commitMu.Unlock()
logEntry := e.Table.Catalog.prepareCommitEntry(e, ETUpgradeSegment, e)
return logEntry, nil
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/vm/engine/aoe/storage/metadata/v1/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ func (e *Table) prepareHardDelete(ctx *deleteTableCtx) (LogEntry, error) {
Op: OpHardDelete,
SSLLNode: *common.NewSSLLNode(),
}
e.Catalog.commitMu.Lock()
defer e.Catalog.commitMu.Unlock()
e.Lock()
defer e.Unlock()
if e.IsHardDeletedLocked() {
Expand Down Expand Up @@ -267,8 +265,6 @@ func (e *Table) prepareSoftDelete(ctx *dropTableCtx) (LogEntry, error) {
Op: OpSoftDelete,
SSLLNode: *common.NewSSLLNode(),
}
e.Catalog.commitMu.Lock()
defer e.Catalog.commitMu.Unlock()
e.Lock()
defer e.Unlock()
if e.IsSoftDeletedLocked() {
Expand Down Expand Up @@ -478,8 +474,6 @@ func (e *Table) SimpleGetSegmentCount() int {
func (e *Table) prepareCreateSegment(ctx *createSegmentCtx) (LogEntry, error) {
se := newSegmentEntry(e.Catalog, e, ctx.tranId, ctx.exIndex)
logEntry := se.ToLogEntry(ETCreateSegment)
e.Catalog.commitMu.Lock()
defer e.Catalog.commitMu.Unlock()
e.Lock()
e.onNewSegment(se)
e.Unlock()
Expand Down

0 comments on commit d8f0ebf

Please sign in to comment.