Skip to content

Commit

Permalink
c2c: add CreateForSpanConfigs to the stream client
Browse files Browse the repository at this point in the history
This patch adds a new call to the stream client and the new
`crdb_internal.start_span_config_replication_stream()` sql call which will
begin a replication stream on the _system_ tenant's span configuration table to
stream updates specific to the passed in application tenant. The ForSpanConfigs
flag is now persisted to the producer job record to indicate this flavour of
replication stream.

A future PR will plumb this info through the StreamPartition spec.

Informs cockroachdb#106823

Release note: None
  • Loading branch information
msbutler committed Jul 24, 2023
1 parent 724f6b9 commit d521615
Show file tree
Hide file tree
Showing 16 changed files with 217 additions and 78 deletions.
10 changes: 8 additions & 2 deletions pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,16 @@ func (rh *ReplicationHelper) TableSpan(codec keys.SQLCodec, table string) roachp
// StartReplicationStream reaches out to the system tenant to start the
// replication stream from the source tenant.
func (rh *ReplicationHelper) StartReplicationStream(
t *testing.T, sourceTenantName roachpb.TenantName,
t *testing.T, sourceTenantName roachpb.TenantName, forSpanConfigs bool,
) streampb.ReplicationProducerSpec {
var rawReplicationProducerSpec []byte
row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.start_replication_stream($1)`, sourceTenantName)
var row *sqlutils.Row
if forSpanConfigs {
row = rh.SysSQL.QueryRow(t, `SELECT crdb_internal.start_span_configs_replication_stream($1)`,
sourceTenantName)
} else {
row = rh.SysSQL.QueryRow(t, `SELECT crdb_internal.start_replication_stream($1)`, sourceTenantName)
}
row.Scan(&rawReplicationProducerSpec)
var replicationProducerSpec streampb.ReplicationProducerSpec
err := protoutil.Unmarshal(rawReplicationProducerSpec, &replicationProducerSpec)
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type Client interface {
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)

// CreateForSpanConfigs initializes a stream to stream the span configs that apply to the
// passed in tenant.
CreateForSpanConfigs(ctx context.Context, tenant roachpb.TenantName) (streampb.
ReplicationProducerSpec, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error

Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func (sc testStreamClient) Create(
}, nil
}

// CreateForSpanConfigs implements the Client interface.
func (sc testStreamClient) CreateForSpanConfigs(
_ context.Context, _ roachpb.TenantName,
) (streampb.ReplicationProducerSpec, error) {
panic("not implemented")
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(_ context.Context, _ streampb.StreamID) (Topology, error) {
return Topology{
Expand Down
28 changes: 21 additions & 7 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,13 @@ func NewPartitionedStreamClient(

var _ Client = &partitionedStreamClient{}

// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
func (p *partitionedStreamClient) executeCreateCmd(
ctx context.Context, tenantName roachpb.TenantName, cmd string,
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var rawReplicationProducerSpec []byte
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName)
row := p.mu.srcConn.QueryRow(ctx, cmd, tenantName)
err := row.Scan(&rawReplicationProducerSpec)
if err != nil {
return streampb.ReplicationProducerSpec{}, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName)
Expand All @@ -101,6 +97,24 @@ func (p *partitionedStreamClient) Create(
return replicationProducerSpec, err
}

// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()
return p.executeCreateCmd(ctx, tenantName, `SELECT crdb_internal.start_replication_stream($1)`)
}

func (p *partitionedStreamClient) CreateForSpanConfigs(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.CreateForSpanConfigs")
defer sp.Finish()
return p.executeCreateCmd(ctx, tenantName,
`SELECT crdb_internal.start_span_configs_replication_stream($1)`)
}

// Dial implements Client interface.
func (p *partitionedStreamClient) Dial(ctx context.Context) error {
p.mu.Lock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@ func (m *RandomStreamClient) Create(
}, nil
}

// CreateForSpanConfigs implements the Client interface.
func (m *RandomStreamClient) CreateForSpanConfigs(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.ReplicationProducerSpec, error) {
panic("CreateForSpanConfigs not implemented")
}

// Heartbeat implements the Client interface.
func (m *RandomStreamClient) Heartbeat(
ctx context.Context, _ streampb.StreamID, ts hlc.Timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func (m *mockStreamClient) Create(
panic("unimplemented")
}

// CreateForSpanConfigs implements the Client interface.
func (m *mockStreamClient) CreateForSpanConfigs(
_ context.Context, _ roachpb.TenantName,
) (streampb.ReplicationProducerSpec, error) {
panic("unimplemented")
}

// Dial implements the Client interface.
func (m *mockStreamClient) Dial(_ context.Context) error {
panic("unimplemented")
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ go_library(
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand Down Expand Up @@ -96,6 +99,7 @@ go_test(
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/distsql",
"//pkg/sql/isql",
"//pkg/sql/sem/eval",
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,21 @@ func makeTenantSpan(tenantID uint64) roachpb.Span {
func makeProducerJobRecord(
registry *jobs.Registry,
tenantID uint64,
span roachpb.Span,
timeout time.Duration,
user username.SQLUsername,
ptsID uuid.UUID,
forSpanConfigs bool,
) jobs.Record {
return jobs.Record{
JobID: registry.MakeJobID(),
Description: fmt.Sprintf("stream replication for tenant %d", tenantID),
Username: user,
Details: jobspb.StreamReplicationDetails{
ProtectedTimestampRecordID: ptsID,
Spans: []roachpb.Span{makeTenantSpan(tenantID)},
Spans: roachpb.Spans{span},
TenantID: roachpb.MustMakeTenantID(tenantID),
ForSpanConfigs: forSpanConfigs,
},
Progress: jobspb.StreamReplicationProgress{
Expiration: timeutil.Now().Add(timeout),
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (c coordinatedResumer) OnFailOrCancel(
return err
}

// TODO(msbutler): completely rewrite this test.
func TestStreamReplicationProducerJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -185,7 +186,8 @@ func TestStreamReplicationProducerJob(t *testing.T) {
{ // Job times out at the beginning
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
ptsID := uuid.MakeV4()
jr := makeProducerJobRecord(registry, 10, timeout, usr, ptsID)
jr := makeProducerJobRecord(registry, uint64(10), makeTenantSpan(10), timeout, usr, ptsID,
false)
defer jobs.ResetConstructors()()

mt, timeGiven, waitForTimeRequest, waitJobFinishReverting := registerConstructor()
Expand Down Expand Up @@ -217,8 +219,8 @@ func TestStreamReplicationProducerJob(t *testing.T) {
ptsTime := timeutil.Now()
ts := hlc.Timestamp{WallTime: ptsTime.UnixNano()}
ptsID := uuid.MakeV4()

jr := makeProducerJobRecord(registry, 20, timeout, usr, ptsID)
jr := makeProducerJobRecord(registry, uint64(20), makeTenantSpan(20), timeout, usr, ptsID,
false)
defer jobs.ResetConstructors()()
mt, timeGiven, waitForTimeRequest, waitJobFinishReverting :=
registerConstructor()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type replicationStreamManagerImpl struct {

// StartReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StartReplicationStream(
ctx context.Context, tenantName roachpb.TenantName,
ctx context.Context, tenantName roachpb.TenantName, forSpanConfig bool,
) (streampb.ReplicationProducerSpec, error) {
return startReplicationProducerJob(ctx, r.evalCtx, r.txn, tenantName)
return startReplicationProducerJob(ctx, r.evalCtx, r.txn, tenantName, forSpanConfig)
}

// HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface.
Expand Down
87 changes: 45 additions & 42 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -247,54 +249,55 @@ func TestReplicationStreamInitialization(t *testing.T) {
defer cleanup()
testTenantName := roachpb.TenantName("test-tenant")
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
tenantPrefix := keys.MakeTenantPrefix(srcTenant.ID)
defer cleanupTenant()

// Makes the stream time out really soon
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10ms'")
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '1ms'")
t.Run("failed-after-timeout", func(t *testing.T) {
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
streamID := replicationProducerSpec.StreamID

h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{"failed"}})
testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_INACTIVE)
})

// Make sure the stream does not time out within the test timeout
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'")
t.Run("continuously-running-within-timeout", func(t *testing.T) {
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
streamID := replicationProducerSpec.StreamID
testutils.RunTrueAndFalse(t, "for-span-configs", func(t *testing.T,
forSpanConfigs bool) {
t.Run("failed-after-timeout", func(t *testing.T) {
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10ms'")
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, forSpanConfigs)
streamID := replicationProducerSpec.StreamID

h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{"running"}})
jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(streamID))
testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_INACTIVE)
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'")
})
t.Run("continuously-running-within-timeout", func(t *testing.T) {
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, forSpanConfigs)
streamID := replicationProducerSpec.StreamID

// Ensures the job is continuously running for 3 seconds.
testDuration, now := 3*time.Second, timeutil.Now()
for start, end := now, now.Add(testDuration); start.Before(end); start = start.Add(300 * time.Millisecond) {
h.SysSQL.CheckQueryResults(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{"running"}})
jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_ACTIVE)
}

// Get a replication stream spec
spec, rawSpec := &streampb.ReplicationStreamSpec{}, make([]byte, 0)
row := h.SysSQL.QueryRow(t, "SELECT crdb_internal.replication_stream_spec($1)", streamID)
row.Scan(&rawSpec)
require.NoError(t, protoutil.Unmarshal(rawSpec, spec))

// Ensures the processor spec tracks the tenant span
require.Equal(t, 1, len(spec.Partitions))
require.Equal(t, 1, len(spec.Partitions[0].PartitionSpec.Spans))
tenantPrefix := keys.MakeTenantPrefix(srcTenant.ID)
require.Equal(t, roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()},
spec.Partitions[0].PartitionSpec.Spans[0])
spec, rawSpec := &streampb.ReplicationStreamSpec{}, make([]byte, 0)
row := h.SysSQL.QueryRow(t, "SELECT crdb_internal.replication_stream_spec($1)", streamID)
row.Scan(&rawSpec)
require.NoError(t, protoutil.Unmarshal(rawSpec, spec))

// Ensures the processor spec tracks the tenant span
require.Equal(t, 1, len(spec.Partitions))
require.Equal(t, 1, len(spec.Partitions[0].PartitionSpec.Spans))

expectedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()}
if forSpanConfigs {
var spanConfigTableID uint32
h.SysSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`,
systemschema.SpanConfigurationsTableName.Table()).Scan(&spanConfigTableID)
codec := keys.MakeSQLCodec(roachpb.SystemTenantID)
spanConfigKey := codec.TablePrefix(spanConfigTableID)
expectedSpan = roachpb.Span{Key: spanConfigKey, EndKey: spanConfigKey.PrefixEnd()}
}
require.Equal(t, expectedSpan, spec.Partitions[0].PartitionSpec.Spans[0])
})
t.Run("nonexistent-replication-stream-has-inactive-status", func(t *testing.T) {
testStreamReplicationStatus(t, h.SysSQL, streampb.StreamID(123), streampb.StreamReplicationStatus_STREAM_INACTIVE)
})
})

t.Run("nonexistent-replication-stream-has-inactive-status", func(t *testing.T) {
testStreamReplicationStatus(t, h.SysSQL, streampb.StreamID(123), streampb.StreamReplicationStatus_STREAM_INACTIVE)
})
}

func spansForTables(db *kv.DB, codec keys.SQLCodec, tables ...string) []roachpb.Span {
Expand Down Expand Up @@ -361,7 +364,7 @@ USE d;
`)

ctx := context.Background()
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID
initialScanTimestamp := replicationProducerSpec.ReplicationStartTime

Expand Down Expand Up @@ -509,7 +512,7 @@ USE d;
`)

ctx := context.Background()
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
Expand Down Expand Up @@ -597,7 +600,7 @@ func TestCompleteStreamReplication(t *testing.T) {
"SET CLUSTER SETTING stream_replication.job_liveness_timeout = '2s';",
"SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '2s';")

replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
timedOutStreamID := replicationProducerSpec.StreamID
jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(timedOutStreamID))

Expand All @@ -609,7 +612,7 @@ func TestCompleteStreamReplication(t *testing.T) {
timedOutStreamID, successfulIngestion)

// Create a new replication stream and complete it.
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID
jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)",
Expand Down Expand Up @@ -673,7 +676,7 @@ USE d;
`)

ctx := context.Background()
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID
initialScanTimestamp := replicationProducerSpec.ReplicationStartTime
streamResumeTimestamp := h.SysServer.Clock().Now()
Expand Down
Loading

0 comments on commit d521615

Please sign in to comment.