Skip to content

Commit

Permalink
Merge branch '1.2-dev' into 1.2-fix_force_flush_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed May 24, 2024
2 parents 307954d + 918a356 commit 29f96fc
Show file tree
Hide file tree
Showing 23 changed files with 670 additions and 1,761 deletions.
6 changes: 6 additions & 0 deletions pkg/fileservice/io_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ import (
"bytes"
"io"
"os"
"time"

"github.com/matrixorigin/matrixone/pkg/fileservice/memorycache"
metric "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
)

func (i *IOEntry) setCachedData() error {
t0 := time.Now()
defer func() {
metric.FSReadDurationSetCachedData.Observe(time.Since(t0).Seconds())
}()
if i.ToCacheData == nil {
return nil
}
Expand Down
27 changes: 23 additions & 4 deletions pkg/fileservice/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,26 +327,37 @@ func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error) {

for _, cache := range vector.Caches {
cache := cache
if err := readCache(ctx, cache, vector); err != nil {
t0 := time.Now()
err := readCache(ctx, cache, vector)
metric.FSReadDurationReadVectorCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}

defer func() {
if err != nil {
return
}
t0 := time.Now()
err = cache.Update(ctx, vector, false)
metric.FSReadDurationUpdateVectorCache.Observe(time.Since(t0).Seconds())
}()
}

if l.memCache != nil {
if err := readCache(ctx, l.memCache, vector); err != nil {
t0 := time.Now()
err := readCache(ctx, l.memCache, vector)
metric.FSReadDurationReadMemoryCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
defer func() {
if err != nil {
return
}
t0 := time.Now()
err = l.memCache.Update(ctx, vector, l.asyncUpdate)
metric.FSReadDurationUpdateMemoryCache.Observe(time.Since(t0).Seconds())
}()
}

Expand All @@ -356,19 +367,27 @@ func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error) {
}()

if l.diskCache != nil {
if err := readCache(ctx, l.diskCache, vector); err != nil {
t0 := time.Now()
err := readCache(ctx, l.diskCache, vector)
metric.FSReadDurationReadDiskCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
defer func() {
if err != nil {
return
}
t0 := time.Now()
err = l.diskCache.Update(ctx, vector, l.asyncUpdate)
metric.FSReadDurationUpdateDiskCache.Observe(time.Since(t0).Seconds())
}()
}

if l.remoteCache != nil {
if err := readCache(ctx, l.remoteCache, vector); err != nil {
t0 := time.Now()
err := readCache(ctx, l.remoteCache, vector)
metric.FSReadDurationReadRemoteCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
}
Expand Down
51 changes: 44 additions & 7 deletions pkg/fileservice/s3_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,26 +422,38 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error) {

for _, cache := range vector.Caches {
cache := cache
if err := readCache(ctx, cache, vector); err != nil {

t0 := time.Now()
err := readCache(ctx, cache, vector)
metric.FSReadDurationReadVectorCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}

defer func() {
if err != nil {
return
}
t0 := time.Now()
err = cache.Update(ctx, vector, false)
metric.FSReadDurationUpdateVectorCache.Observe(time.Since(t0).Seconds())
}()
}

if s.memCache != nil {
if err := readCache(ctx, s.memCache, vector); err != nil {
t0 := time.Now()
err := readCache(ctx, s.memCache, vector)
metric.FSReadDurationReadMemoryCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
defer func() {
if err != nil {
return
}
t0 := time.Now()
err = s.memCache.Update(ctx, vector, s.asyncUpdate)
metric.FSReadDurationUpdateMemoryCache.Observe(time.Since(t0).Seconds())
}()
}

Expand All @@ -451,7 +463,10 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error) {
}()

if s.diskCache != nil {
if err := readCache(ctx, s.diskCache, vector); err != nil {
t0 := time.Now()
err := readCache(ctx, s.diskCache, vector)
metric.FSReadDurationReadDiskCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
// try to cache IOEntry if not caching the full file
Expand All @@ -460,13 +475,18 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error) {
if err != nil {
return
}
t0 := time.Now()
err = s.diskCache.Update(ctx, vector, s.asyncUpdate)
metric.FSReadDurationUpdateDiskCache.Observe(time.Since(t0).Seconds())
}()
}
}

if s.remoteCache != nil {
if err := readCache(ctx, s.remoteCache, vector); err != nil {
t0 := time.Now()
err := readCache(ctx, s.remoteCache, vector)
metric.FSReadDurationReadRemoteCache.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
}
Expand Down Expand Up @@ -530,6 +550,9 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) (err error) {
// a function to get an io.ReadCloser
getReader := func(ctx context.Context, min *int64, max *int64) (io.ReadCloser, error) {
t0 := time.Now()
defer func() {
metric.FSReadDurationGetReader.Observe(time.Since(t0).Seconds())
}()
bytesCounter := new(atomic.Int64)
ctx, spanR := trace.Start(ctx, "S3FS.read.getReader")
defer spanR.End()
Expand All @@ -545,7 +568,6 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) (err error) {
},
closeFunc: func() error {
s3ReadIODuration := time.Since(t0)

metric.S3ReadIODurationHistogram.Observe(s3ReadIODuration.Seconds())
metric.S3ReadIOBytesHistogram.Observe(float64(bytesCounter.Load()))
return r.Close()
Expand All @@ -558,6 +580,10 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) (err error) {
var contentErr error
var getContentDone bool
getContent := func(ctx context.Context) (bs []byte, err error) {
t0 := time.Now()
defer func() {
metric.FSReadDurationGetContent.Observe(time.Since(t0).Seconds())
}()
ctx, spanC := trace.Start(ctx, "S3FS.read.getContent")
defer spanC.End()
if getContentDone {
Expand Down Expand Up @@ -601,6 +627,10 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) (err error) {

// a function to get entry data lazily
getData := func(ctx context.Context) ([]byte, error) {
t0 := time.Now()
defer func() {
metric.FSReadDurationGetEntryData.Observe(time.Since(t0).Seconds())
}()
ctx, spanD := trace.Start(ctx, "S3FS.reader.getData")
defer spanD.End()
if entry.Size < 0 {
Expand Down Expand Up @@ -639,7 +669,9 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) (err error) {
if err != nil {
return err
}
t0 := time.Now()
_, err = w.Write(data)
metric.FSReadDurationWriteToWriter.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
Expand All @@ -659,7 +691,9 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) (err error) {
var buf []byte
put := ioBufferPool.Get(&buf)
defer put.Put()
t0 := time.Now()
_, err = io.CopyBuffer(w, reader, buf)
metric.FSReadDurationWriteToWriter.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
Expand Down Expand Up @@ -723,9 +757,12 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) (err error) {
len(contentBytes) > 0 &&
s.diskCache != nil &&
!vector.Policy.Any(SkipDiskCacheWrites) {
if err := s.diskCache.SetFile(ctx, vector.FilePath, func(context.Context) (io.ReadCloser, error) {
t0 := time.Now()
err := s.diskCache.SetFile(ctx, vector.FilePath, func(context.Context) (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(contentBytes)), nil
}); err != nil {
})
metric.FSReadDurationSetCachedData.Observe(time.Since(t0).Seconds())
if err != nil {
return err
}
}
Expand Down
12 changes: 2 additions & 10 deletions pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ func (l *localLockTable) lock(
func (l *localLockTable) doLock(
c *lockContext,
blocked bool) {
// deadlock detected, return
if c.txn.deadlockFound {
if c.w != nil {
c.w.disableNotify()
c.w.close()
}
c.done(ErrDeadLockDetected)
return
}
var old *waiter
var err error
table := l.bind.Table
Expand Down Expand Up @@ -156,7 +147,8 @@ func (l *localLockTable) doLock(
!bytes.Equal(c.w.txn.TxnID, oldTxnID)) {
e = ErrTxnNotFound
}
if e != nil {
if e != nil ||
c.txn.deadlockFound {
c.closed = true
if e != ErrTxnNotFound {
c.txn.closeBlockWaiters()
Expand Down
108 changes: 108 additions & 0 deletions pkg/lockservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,114 @@ func TestDeadLockWithIndirectDependsOn(t *testing.T) {
}
}

func TestWaiterAwakeOnDeadLock(t *testing.T) {
for name, runner := range runners {
if name == "local" {
continue
}
t.Run(name, func(t *testing.T) {
table := uint64(10)
runner(
t,
table,
func(
ctx context.Context,
s *service,
lt *localLockTable) {
// row1: txn1 : txn2, txn3

row1 := newTestRows(1)
txn1 := newTestTxnID(1)
txn2 := newTestTxnID(2)
txn3 := newTestTxnID(3)

s.cfg.TxnIterFunc = func(f func([]byte) bool) {
f(txn1)
f(txn2)
f(txn3)
}

mustAddTestLock(t, ctx, s, table, txn1, row1, pb.Granularity_Row)

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()

// row1 wait list: txn2
maybeAddTestLockWithDeadlockWithWaitRetry(
t,
ctx,
s,
table,
txn2,
row1,
pb.Granularity_Row,
time.Second*5,
)
}()

go func() {
defer wg.Done()

// row1 wait list: txn2
waitLocalWaiters(lt, row1[0], 1)

// row1 wait list: txn2, txn3
maybeAddTestLockWithDeadlockWithWaitRetry(
t,
ctx,
s,
table,
txn3,
row1,
pb.Granularity_Row,
time.Second*5,
)

}()

// row1: txn1 : txn2, txn3
waitLocalWaiters(lt, row1[0], 2)

t2 := lt.txnHolder.getActiveTxn(txn2, false, "")
t2.Lock()
t2.deadlockFound = true
t2.Unlock()

require.NoError(t, s.Unlock(ctx, txn1, timestamp.Timestamp{}))

t2.Lock()
for _, w := range t2.blockedWaiters {
w.notify(notifyValue{err: ErrDeadLockDetected})
}
t2.Unlock()

require.NoError(t, s.Unlock(ctx, txn1, timestamp.Timestamp{}))
for {
t3 := lt.txnHolder.getActiveTxn(txn3, false, "")
t3.Lock()
if len(t3.getHoldLocksLocked(0).tableBinds) > 0 {
break
}
t3.Unlock()

select {
case <-ctx.Done():
t3.Lock()
require.True(t, len(t3.getHoldLocksLocked(0).tableBinds) > 0)
t3.Unlock()
return
default:
}
}

wg.Wait()
})
})
}
}

func TestLockSuccWithKeepBindTimeout(t *testing.T) {
runLockServiceTestsWithLevel(
t,
Expand Down
Loading

0 comments on commit 29f96fc

Please sign in to comment.