Skip to content

Commit

Permalink
sqlliveness: add timeouts to heartbeats
Browse files Browse the repository at this point in the history
Previously, sqlliveness heartbeat operations could block on the transactions
that were involved. This change introduces some timeouts of the length of the
heartbeat during the create and refresh operations.

Resolves cockroachdb#85541

Release note: None

Release justification: low-risk bugfix to existing functionality
  • Loading branch information
dhartunian authored and Yevgeniy Miretskiy committed Sep 28, 2022
1 parent dc3139f commit 4baef9c
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 19 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func TestSQLLivenessExemption(t *testing.T) {
// Make the tenant heartbeat like crazy.
ctx := context.Background()
//slinstance.DefaultTTL.Override(ctx, &st.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond)

_, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: tenantID,
Expand Down Expand Up @@ -960,7 +960,7 @@ func TestSQLLivenessExemption(t *testing.T) {

// Verify that heartbeats can go through and update the expiration time.
val := livenessValue()
time.Sleep(2 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
testutils.SucceedsSoon(
t,
func() error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -18,6 +19,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
55 changes: 42 additions & 13 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -151,15 +153,21 @@ func (l *Instance) setSession(s *session) {
}

func (l *Instance) clearSession(ctx context.Context) {
l.checkExpiry(ctx)
l.mu.Lock()
defer l.mu.Unlock()
l.mu.s = nil
l.mu.blockCh = make(chan struct{})
}

func (l *Instance) checkExpiry(ctx context.Context) {
l.mu.Lock()
defer l.mu.Unlock()
if expiration := l.mu.s.Expiration(); expiration.Less(l.clock.Now()) {
// If the session has expired, invoke the session expiry callbacks
// associated with the session.
l.mu.s.invokeSessionExpiryCallbacks(ctx)
}
l.mu.s = nil
l.mu.blockCh = make(chan struct{})
}

// createSession tries until it can create a new session and returns an error
Expand Down Expand Up @@ -253,8 +261,13 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Read = true
s, _ := l.getSessionOrBlockCh()
if s == nil {
newSession, err := l.createSession(ctx)
if err != nil {
var newSession *session
if err := contextutil.RunWithTimeout(ctx, "sqlliveness create session", l.hb(), func(ctx context.Context) error {
var err error
newSession, err = l.createSession(ctx)
return err
}); err != nil {
log.Errorf(ctx, "sqlliveness failed to create new session: %v", err)
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -270,21 +283,37 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Reset(l.hb())
continue
}
found, err := l.extendSession(ctx, s)
if err != nil {
var found bool
err := contextutil.RunWithTimeout(ctx, "sqlliveness extend session", l.hb(), func(ctx context.Context) error {
var err error
found, err = l.extendSession(ctx, s)
return err
})
switch {
case errors.HasType(err, (*contextutil.TimeoutError)(nil)):
// Retry without clearing the session because we don't know the current status.
l.checkExpiry(ctx)
t.Reset(0)
continue
case err != nil && ctx.Err() == nil:
log.Errorf(ctx, "sqlliveness failed to extend session: %v", err)
fallthrough
case err != nil:
// TODO(ajwerner): Decide whether we actually should exit the heartbeat loop here if the context is not
// canceled. Consider the case of an ambiguous result error: shouldn't we try again?
l.clearSession(ctx)
return
}
if !found {
case !found:
// No existing session found, immediately create one.
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
continue
}
if log.V(2) {
log.Infof(ctx, "extended SQL liveness session %s", s.ID())
default:
if log.V(2) {
log.Infof(ctx, "extended SQL liveness session %s", s.ID())
}
t.Reset(l.hb())
}
t.Reset(l.hb())
}
}
}
Expand Down
116 changes: 114 additions & 2 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package slinstance_test

import (
"context"
"sync/atomic"
"testing"
"time"

Expand All @@ -20,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -40,8 +42,8 @@ func TestSQLInstance(t *testing.T) {
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
Expand Down Expand Up @@ -91,3 +93,113 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by blocking the fake
// storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()

sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that we do not create a session
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err == nil
},
100*time.Millisecond, 10*time.Millisecond,
)
}

// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the
// create and extend session operations. This tests the case where the session is
// successfully created first and then blocks indefinitely.
func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

mt := timeutil.NewManualTime(timeutil.Unix(0, 42))
clock := hlc.NewClock(mt, time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that eventually session is created successfully
testutils.SucceedsSoon(
t,
func() error {
_, err := sqlInstance.Session(ctx)
return err
},
)

// verify that session is also extended successfully a few times
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err != nil
},
100*time.Millisecond, 10*time.Millisecond,
)

// register a callback for verification that this session expired
var sessionExpired atomic.Bool
s, _ := sqlInstance.Session(ctx)
s.RegisterCallbackForSessionExpiry(func(ctx context.Context) {
sessionExpired.Store(true)
})

// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()
// advance manual clock so that session expires
mt.Advance(20 * time.Millisecond)

// expect session to expire
require.Eventually(
t,
func() bool {
return sessionExpired.Load()
},
testutils.DefaultSucceedsSoonDuration, 10*time.Millisecond,
)
}
41 changes: 39 additions & 2 deletions pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type FakeStorage struct {
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp
blockCh chan struct{}
}
}

Expand All @@ -46,8 +47,16 @@ func (s *FakeStorage) IsAlive(

// Insert implements the sqlliveness.Storage interface.
func (s *FakeStorage) Insert(
_ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
) error {
if ch := s.getBlockCh(); ch != nil {
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
}
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.mu.sessions[sid]; ok {
Expand All @@ -59,8 +68,16 @@ func (s *FakeStorage) Insert(

// Update implements the sqlliveness.Storage interface.
func (s *FakeStorage) Update(
_ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
) (bool, error) {
if ch := s.getBlockCh(); ch != nil {
select {
case <-ch:
break
case <-ctx.Done():
return false, ctx.Err()
}
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.mu.sessions[sid]; !ok {
Expand All @@ -77,3 +94,23 @@ func (s *FakeStorage) Delete(_ context.Context, sid sqlliveness.SessionID) error
delete(s.mu.sessions, sid)
return nil
}

// SetBlockCh is used to block the storage for testing purposes
func (s *FakeStorage) SetBlockCh() {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.blockCh = make(chan struct{})
}

// CloseBlockCh is used to unblock the storage for testing purposes
func (s *FakeStorage) CloseBlockCh() {
s.mu.Lock()
defer s.mu.Unlock()
close(s.mu.blockCh)
}

func (s *FakeStorage) getBlockCh() chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.blockCh
}

0 comments on commit 4baef9c

Please sign in to comment.