Skip to content

Commit

Permalink
This is an automated cherry-pick of #1663
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
liuzix authored and ti-chi-bot committed May 8, 2021
1 parent 82c75ff commit 7f86548
Show file tree
Hide file tree
Showing 18 changed files with 748 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ check_third_party_binary:
integration_test_build: check_failpoint_ctl
./scripts/fix_lib_zstd.sh
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covemode=atomic \
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc \
|| { $(FAILPOINT_DISABLE); exit 1; }
Expand Down
6 changes: 5 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,11 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
p.sendError(errors.Trace(err))
return nil
}
sorter = psorter.NewUnifiedSorter(p.changefeed.SortDir, p.changefeedID, tableName, tableID, util.CaptureAddrFromCtx(ctx))
sorter, err = psorter.NewUnifiedSorter(p.changefeed.SortDir, p.changefeedID, tableName, tableID, util.CaptureAddrFromCtx(ctx))
if err != nil {
p.sendError(errors.Trace(err))
return nil
}
default:
p.sendError(cerror.ErrUnknownSortEngine.GenWithStackByArgs(p.changefeed.Engine))
return nil
Expand Down
5 changes: 4 additions & 1 deletion cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
if err != nil {
return errors.Trace(err)
}
sorter = psorter.NewUnifiedSorter(n.sortDir, n.changeFeedID, n.tableName, n.tableID, ctx.Vars().CaptureAddr)
sorter, err = psorter.NewUnifiedSorter(n.sortDir, n.changeFeedID, n.tableName, n.tableID, ctx.Vars().CaptureAddr)
if err != nil {
return errors.Trace(err)
}
default:
return cerror.ErrUnknownSortEngine.GenWithStackByArgs(n.sortEngine)
}
Expand Down
105 changes: 99 additions & 6 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@ import (
"time"
"unsafe"

"github.com/pingcap/ticdc/pkg/util"

"github.com/mackerelio/go-osstat/memory"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filelock"
"github.com/pingcap/ticdc/pkg/util"
"go.uber.org/zap"
)

const (
backgroundJobInterval = time.Second * 15
backgroundJobInterval = time.Second * 15
sortDirLockFileName = "ticdc_lock"
sortDirDataFileMagicPrefix = "sort"
)

var (
Expand All @@ -55,20 +57,37 @@ type backEndPool struct {
dir string
filePrefix string

// to prevent `dir` from being accidentally used by another TiCDC server process.
fileLock *filelock.FileLock

// cancelCh needs to be unbuffered to prevent races
cancelCh chan struct{}
// cancelRWLock protects cache against races when the backEnd is exiting
cancelRWLock sync.RWMutex
isTerminating bool
}

func newBackEndPool(dir string, captureAddr string) *backEndPool {
func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) {
ret := &backEndPool{
memoryUseEstimate: 0,
fileNameCounter: 0,
dir: dir,
cancelCh: make(chan struct{}),
filePrefix: fmt.Sprintf("%s/sort-%d-", dir, os.Getpid()),
filePrefix: fmt.Sprintf("%s/%s-%d-", dir, sortDirDataFileMagicPrefix, os.Getpid()),
}

err := ret.lockSortDir()
if err != nil {
log.Warn("failed to lock file prefix",
zap.String("prefix", ret.filePrefix),
zap.Error(err))
return nil, errors.Trace(err)
}

err = ret.cleanUpStaleFiles()
if err != nil {
log.Warn("Unified Sorter: failed to clean up stale temporary files. Report a bug if you believe this is unexpected", zap.Error(err))
return nil, errors.Trace(err)
}

go func() {
Expand Down Expand Up @@ -140,7 +159,7 @@ func newBackEndPool(dir string, captureAddr string) *backEndPool {
}
}()

return ret
return ret, nil
}

func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
Expand Down Expand Up @@ -228,6 +247,16 @@ func (p *backEndPool) dealloc(backEnd backEnd) error {
}

func (p *backEndPool) terminate() {
defer func() {
if p.fileLock == nil {
return
}
err := p.unlockSortDir()
if err != nil {
log.Warn("failed to unlock file prefix", zap.String("prefix", p.filePrefix))
}
}()

p.cancelCh <- struct{}{}
defer close(p.cancelCh)
// the background goroutine can be considered terminated here
Expand Down Expand Up @@ -283,3 +312,67 @@ func (p *backEndPool) memoryPressure() int32 {
})
return atomic.LoadInt32(&p.memPressure)
}

func (p *backEndPool) lockSortDir() error {
lockFileName := fmt.Sprintf("%s/%s", p.dir, sortDirLockFileName)
fileLock, err := filelock.NewFileLock(lockFileName)
if err != nil {
return cerrors.ErrSortDirLockError.Wrap(err).GenWithStackByCause()
}

err = fileLock.Lock()
if err != nil {
if cerrors.ErrConflictingFileLocks.Equal(err) {
log.Warn("TiCDC failed to lock sorter temporary file directory. "+
"Make sure that another instance of TiCDC, or any other program, is not using the directory. "+
"If you believe you should not see this error, try deleting the lock file and resume the changefeed. "+
"Report a bug or contact support if the problem persists.",
zap.String("lock-file", lockFileName))
return errors.Trace(err)
}
return cerrors.ErrSortDirLockError.Wrap(err).GenWithStackByCause()
}

p.fileLock = fileLock
return nil
}

func (p *backEndPool) unlockSortDir() error {
err := p.fileLock.Unlock()
if err != nil {
return cerrors.ErrSortDirLockError.Wrap(err).FastGenWithCause()
}
return nil
}

func (p *backEndPool) cleanUpStaleFiles() error {
if p.dir == "" {
// guard against programmer error. Must be careful when we are deleting user files.
log.Panic("unexpected sort-dir", zap.String("sort-dir", p.dir))
}

files, err := filepath.Glob(filepath.Join(p.dir, fmt.Sprintf("%s-*", sortDirDataFileMagicPrefix)))
if err != nil {
return errors.Trace(err)
}

for _, toRemoveFilePath := range files {
log.Info("Removing stale sorter temporary file", zap.String("file", toRemoveFilePath))
err := os.Remove(toRemoveFilePath)
if err != nil {
// In production, we do not want an error here to interfere with normal operation,
// because in most situations, failure to remove files only indicates non-fatal misconfigurations
// such as permission problems, rather than fatal errors.
// If the directory is truly unusable, other errors would be raised when we try to write to it.
log.Warn("failed to remove file",
zap.String("file", toRemoveFilePath),
zap.Error(err))
// For fail-fast in integration tests
failpoint.Inject("sorterDebug", func() {
log.Panic("panicking", zap.Error(err))
})
}
}

return nil
}
154 changes: 151 additions & 3 deletions cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ package sorter

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filelock"
"github.com/pingcap/ticdc/pkg/util/testleak"
)

Expand All @@ -46,7 +48,8 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

backEndPool := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool("/tmp/sorter", "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()

Expand Down Expand Up @@ -95,7 +98,37 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
c.Assert(os.IsNotExist(err), check.IsTrue)
}

func (s *backendPoolSuite) TestCleanUp(c *check.C) {
// TestDirectoryBadPermission verifies that no permission to ls the directory does not prevent using it
// as a temporary file directory.
func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
err := os.Chmod(dir, 0o311) // no permission to `ls`
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf.Sorter.MaxMemoryPressure = 0 // force using files

backEndPool, err := newBackEndPool(dir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()

backEnd, err := backEndPool.alloc(context.Background())
c.Assert(err, check.IsNil)
defer backEnd.free() //nolint:errcheck

fileName := backEnd.(*fileBackEnd).fileName
_, err = os.Stat(fileName)
c.Assert(err, check.IsNil) // assert that the file exists

err = backEndPool.dealloc(backEnd)
c.Assert(err, check.IsNil)
}

// TestCleanUpSelf verifies that the backendPool correctly cleans up files used by itself on exit.
func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
Expand All @@ -109,7 +142,8 @@ func (s *backendPoolSuite) TestCleanUp(c *check.C) {
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint", "return(100)")
c.Assert(err, check.IsNil)

backEndPool := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool("/tmp/sorter", "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
Expand Down Expand Up @@ -148,3 +182,117 @@ func (s *backendPoolSuite) TestCleanUp(c *check.C) {
c.Assert(os.IsNotExist(err), check.IsTrue)
}
}

type mockOtherProcess struct {
dir string
prefix string
flock *filelock.FileLock
files []string
}

func newMockOtherProcess(c *check.C, dir string, prefix string) *mockOtherProcess {
prefixLockPath := fmt.Sprintf("%s/%s", dir, sortDirLockFileName)
flock, err := filelock.NewFileLock(prefixLockPath)
c.Assert(err, check.IsNil)

err = flock.Lock()
c.Assert(err, check.IsNil)

return &mockOtherProcess{
dir: dir,
prefix: prefix,
flock: flock,
}
}

func (p *mockOtherProcess) writeMockFiles(c *check.C, num int) {
for i := 0; i < num; i++ {
fileName := fmt.Sprintf("%s%d", p.prefix, i)
f, err := os.Create(fileName)
c.Assert(err, check.IsNil)
_ = f.Close()
p.files = append(p.files, fileName)
}
}

func (p *mockOtherProcess) changeLockPermission(c *check.C, mode os.FileMode) {
prefixLockPath := fmt.Sprintf("%s/%s", p.dir, sortDirLockFileName)
err := os.Chmod(prefixLockPath, mode)
c.Assert(err, check.IsNil)
}

func (p *mockOtherProcess) unlock(c *check.C) {
err := p.flock.Unlock()
c.Assert(err, check.IsNil)
}

func (p *mockOtherProcess) assertFilesExist(c *check.C) {
for _, file := range p.files {
_, err := os.Stat(file)
c.Assert(err, check.IsNil)
}
}

func (p *mockOtherProcess) assertFilesNotExist(c *check.C) {
for _, file := range p.files {
_, err := os.Stat(file)
c.Assert(os.IsNotExist(err), check.IsTrue)
}
}

// TestCleanUpStaleBasic verifies that the backendPool correctly cleans up stale temporary files
// left by other CDC processes that have exited abnormally.
func (s *backendPoolSuite) TestCleanUpStaleBasic(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
prefix := dir + "/sort-1-"

mockP := newMockOtherProcess(c, dir, prefix)
mockP.writeMockFiles(c, 100)
mockP.unlock(c)
mockP.assertFilesExist(c)

backEndPool, err := newBackEndPool(dir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()

mockP.assertFilesNotExist(c)
}

// TestFileLockConflict tests that if two backEndPools were to use the same sort-dir,
// and error would be returned by one of them.
func (s *backendPoolSuite) TestFileLockConflict(c *check.C) {
defer testleak.AfterTest(c)()
dir := c.MkDir()

backEndPool1, err := newBackEndPool(dir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool1, check.NotNil)
defer backEndPool1.terminate()

backEndPool2, err := newBackEndPool(dir, "")
c.Assert(err, check.ErrorMatches, ".*file lock conflict.*")
c.Assert(backEndPool2, check.IsNil)
}

// TestCleanUpStaleBasic verifies that the backendPool correctly cleans up stale temporary files
// left by other CDC processes that have exited abnormally.
func (s *backendPoolSuite) TestCleanUpStaleLockNoPermission(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
prefix := dir + "/sort-1-"

mockP := newMockOtherProcess(c, dir, prefix)
mockP.writeMockFiles(c, 100)
// set a bad permission
mockP.changeLockPermission(c, 0o000)

backEndPool, err := newBackEndPool(dir, "")
c.Assert(err, check.ErrorMatches, ".*permission denied.*")
c.Assert(backEndPool, check.IsNil)

mockP.assertFilesExist(c)
}
Loading

0 comments on commit 7f86548

Please sign in to comment.