Skip to content

Commit

Permalink
c2c: plumb forSpanConfigs arg through streamclient.Create()
Browse files Browse the repository at this point in the history
This patch plumbs the forSpanConfigs arg through the streamclient.Create()
interface and modifies producer job creation to return a specification for
streaming the spanConfiguration table if that arg is passed.

A future PR will plumb this info through the StreamPartition spec.

It's worth noting that we don't necessarily need this plumbing: if we see that
tenant Id is for the system tenant, then we could automatically set up a span
config stream. I think this extra plumbing is worth it because it makes
everything a bit easier to read.

Informs cockroachdb#106823

Release note: None
  • Loading branch information
msbutler committed Jul 14, 2023
1 parent 9763d92 commit 298eca7
Show file tree
Hide file tree
Showing 20 changed files with 149 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func streamPartition(ctx context.Context, streamAddr *url.URL) error {
return err
}

replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant))
replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant), false)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,15 @@ func (rh *ReplicationHelper) TableSpan(codec keys.SQLCodec, table string) roachp

// StartReplicationStream reaches out to the system tenant to start the
// replication stream from the source tenant.
//
// If the forSpanConfigs arg is passed, the replication stream will only
// replicate the tenant's system.span_configuration table.
func (rh *ReplicationHelper) StartReplicationStream(
t *testing.T, sourceTenantName roachpb.TenantName,
t *testing.T, sourceTenantName roachpb.TenantName, forSpanConfigs bool,
) streampb.ReplicationProducerSpec {
var rawReplicationProducerSpec []byte
row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.start_replication_stream($1)`, sourceTenantName)
row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.start_replication_stream($1,$2)`,
sourceTenantName, forSpanConfigs)
row.Scan(&rawReplicationProducerSpec)
var replicationProducerSpec streampb.ReplicationProducerSpec
err := protoutil.Unmarshal(rawReplicationProducerSpec, &replicationProducerSpec)
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)
//
// If the forSpanConfigs arg is passed, the replication stream will only
// replicate the tenant's system.span_configuration table.
Create(ctx context.Context, tenant roachpb.TenantName,
forSpanConfigs bool) (streampb.ReplicationProducerSpec, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (sc testStreamClient) Dial(_ context.Context) error {

// Create implements the Client interface.
func (sc testStreamClient) Create(
_ context.Context, _ roachpb.TenantName,
_ context.Context, _ roachpb.TenantName, _ bool,
) (streampb.ReplicationProducerSpec, error) {
return streampb.ReplicationProducerSpec{
StreamID: streampb.StreamID(1),
Expand Down Expand Up @@ -191,7 +191,7 @@ func ExampleClient() {
_ = client.Close(ctx)
}()

prs, err := client.Create(ctx, "system")
prs, err := client.Create(ctx, "system", false)
if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,16 @@ var _ Client = &partitionedStreamClient{}

// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
ctx context.Context, tenantName roachpb.TenantName, forSpanConfigs bool,
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var rawReplicationProducerSpec []byte
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName)
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1,$2)`,
tenantName, forSpanConfigs)
err := row.Scan(&rawReplicationProducerSpec)
if err != nil {
return streampb.ReplicationProducerSpec{}, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ INSERT INTO d.t2 VALUES (2);
[][]string{{string(status)}})
}

rps, err := client.Create(ctx, testTenantName)
rps, err := client.Create(ctx, testTenantName, false)
require.NoError(t, err)
streamID := rps.StreamID
// We can create multiple replication streams for the same tenant.
_, err = client.Create(ctx, testTenantName)
_, err = client.Create(ctx, testTenantName, false)
require.NoError(t, err)

top, err := client.Plan(ctx, streamID)
Expand Down Expand Up @@ -249,7 +249,7 @@ INSERT INTO d.t2 VALUES (2);
h.SysSQL.Exec(t, `
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms';
`)
rps, err = client.Create(ctx, testTenantName)
rps, err = client.Create(ctx, testTenantName, false)
require.NoError(t, err)
streamID = rps.StreamID
require.NoError(t, client.Complete(ctx, streamID, true))
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,13 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top

// Create implements the Client interface.
func (m *RandomStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
ctx context.Context, tenantName roachpb.TenantName, forSpanConfigs bool,
) (streampb.ReplicationProducerSpec, error) {
log.Infof(ctx, "creating random stream for tenant %s", tenantName)
return streampb.ReplicationProducerSpec{
StreamID: streampb.StreamID(1),
ReplicationStartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
ForSpanConfigs: forSpanConfigs,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func ingestionPlanHook(
// Create the producer job first for the purpose of observability, user is
// able to know the producer job id immediately after executing
// CREATE VIRTUAL CLUSTER ... FROM REPLICATION.
replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(sourceTenant))
replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(sourceTenant), false)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var _ streamclient.Client = &mockStreamClient{}

// Create implements the Client interface.
func (m *mockStreamClient) Create(
_ context.Context, _ roachpb.TenantName,
_ context.Context, _ roachpb.TenantName, _ bool,
) (streampb.ReplicationProducerSpec, error) {
panic("unimplemented")
}
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestRandomClientGeneration(t *testing.T) {

randomStreamClient, ok := streamClient.(*streamclient.RandomStreamClient)
require.True(t, ok)
rps, err := randomStreamClient.Create(ctx, tenantName)
rps, err := randomStreamClient.Create(ctx, tenantName, false)
require.NoError(t, err)

topo, err := randomStreamClient.Plan(ctx, rps.StreamID)
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ go_library(
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand Down Expand Up @@ -94,6 +97,7 @@ go_test(
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/distsql",
"//pkg/sql/isql",
"//pkg/sql/sem/eval",
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,21 @@ func makeTenantSpan(tenantID uint64) roachpb.Span {
func makeProducerJobRecord(
registry *jobs.Registry,
tenantID uint64,
span roachpb.Span,
timeout time.Duration,
user username.SQLUsername,
ptsID uuid.UUID,
forSpanConfigs bool,
) jobs.Record {
return jobs.Record{
JobID: registry.MakeJobID(),
Description: fmt.Sprintf("stream replication for tenant %d", tenantID),
Username: user,
Details: jobspb.StreamReplicationDetails{
ProtectedTimestampRecordID: ptsID,
Spans: []roachpb.Span{makeTenantSpan(tenantID)},
Spans: roachpb.Spans{span},
TenantID: roachpb.MustMakeTenantID(tenantID),
ForSpanConfigs: forSpanConfigs,
},
Progress: jobspb.StreamReplicationProgress{
Expiration: timeutil.Now().Add(timeout),
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (c coordinatedResumer) OnFailOrCancel(
return err
}

// TODO(msbutler): completely rewrite this test.
func TestStreamReplicationProducerJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -185,7 +186,8 @@ func TestStreamReplicationProducerJob(t *testing.T) {
{ // Job times out at the beginning
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
ptsID := uuid.MakeV4()
jr := makeProducerJobRecord(registry, 10, timeout, usr, ptsID)
jr := makeProducerJobRecord(registry, uint64(10), makeTenantSpan(10), timeout, usr, ptsID,
false)
defer jobs.ResetConstructors()()

mt, timeGiven, waitForTimeRequest, waitJobFinishReverting := registerConstructor()
Expand Down Expand Up @@ -217,8 +219,8 @@ func TestStreamReplicationProducerJob(t *testing.T) {
ptsTime := timeutil.Now()
ts := hlc.Timestamp{WallTime: ptsTime.UnixNano()}
ptsID := uuid.MakeV4()

jr := makeProducerJobRecord(registry, 20, timeout, usr, ptsID)
jr := makeProducerJobRecord(registry, uint64(20), makeTenantSpan(20), timeout, usr, ptsID,
false)
defer jobs.ResetConstructors()()
mt, timeGiven, waitForTimeRequest, waitJobFinishReverting :=
registerConstructor()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type replicationStreamManagerImpl struct {

// StartReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StartReplicationStream(
ctx context.Context, tenantName roachpb.TenantName,
ctx context.Context, tenantName roachpb.TenantName, forSpanConfig bool,
) (streampb.ReplicationProducerSpec, error) {
return startReplicationProducerJob(ctx, r.evalCtx, r.txn, tenantName)
return startReplicationProducerJob(ctx, r.evalCtx, r.txn, tenantName, forSpanConfig)
}

// HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface.
Expand Down
93 changes: 51 additions & 42 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -247,54 +249,61 @@ func TestReplicationStreamInitialization(t *testing.T) {
defer cleanup()
testTenantName := roachpb.TenantName("test-tenant")
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
tenantPrefix := keys.MakeTenantPrefix(srcTenant.ID)
defer cleanupTenant()

// Makes the stream time out really soon
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10ms'")
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '1ms'")
t.Run("failed-after-timeout", func(t *testing.T) {
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
streamID := replicationProducerSpec.StreamID

h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{"failed"}})
testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_INACTIVE)
})

// Make sure the stream does not time out within the test timeout
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'")
t.Run("continuously-running-within-timeout", func(t *testing.T) {
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
streamID := replicationProducerSpec.StreamID
testutils.RunTrueAndFalse(t, "for-span-configs", func(t *testing.T,
forSpanConfigs bool) {
if forSpanConfigs {
testTenantName = "system"
tenantPrefix = keys.MakeTenantPrefix(roachpb.SystemTenantID)
}
t.Run("failed-after-timeout", func(t *testing.T) {
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10ms'")
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, forSpanConfigs)
streamID := replicationProducerSpec.StreamID
require.Equal(t, forSpanConfigs, replicationProducerSpec.ForSpanConfigs)

h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{"running"}})
jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(streamID))
testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_INACTIVE)
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'")
})
t.Run("continuously-running-within-timeout", func(t *testing.T) {
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, forSpanConfigs)
streamID := replicationProducerSpec.StreamID
require.Equal(t, forSpanConfigs, replicationProducerSpec.ForSpanConfigs)

// Ensures the job is continuously running for 3 seconds.
testDuration, now := 3*time.Second, timeutil.Now()
for start, end := now, now.Add(testDuration); start.Before(end); start = start.Add(300 * time.Millisecond) {
h.SysSQL.CheckQueryResults(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{"running"}})
jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_ACTIVE)
}

// Get a replication stream spec
spec, rawSpec := &streampb.ReplicationStreamSpec{}, make([]byte, 0)
row := h.SysSQL.QueryRow(t, "SELECT crdb_internal.replication_stream_spec($1)", streamID)
row.Scan(&rawSpec)
require.NoError(t, protoutil.Unmarshal(rawSpec, spec))

// Ensures the processor spec tracks the tenant span
require.Equal(t, 1, len(spec.Partitions))
require.Equal(t, 1, len(spec.Partitions[0].PartitionSpec.Spans))
tenantPrefix := keys.MakeTenantPrefix(srcTenant.ID)
require.Equal(t, roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()},
spec.Partitions[0].PartitionSpec.Spans[0])
spec, rawSpec := &streampb.ReplicationStreamSpec{}, make([]byte, 0)
row := h.SysSQL.QueryRow(t, "SELECT crdb_internal.replication_stream_spec($1)", streamID)
row.Scan(&rawSpec)
require.NoError(t, protoutil.Unmarshal(rawSpec, spec))

// Ensures the processor spec tracks the tenant span
require.Equal(t, 1, len(spec.Partitions))
require.Equal(t, 1, len(spec.Partitions[0].PartitionSpec.Spans))

expectedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()}
if forSpanConfigs {
var spanConfigTableID uint32
h.SysSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`,
systemschema.SpanConfigurationsTableName.Table()).Scan(&spanConfigTableID)
codec := keys.MakeSQLCodec(roachpb.SystemTenantID)
spanConfigKey := codec.TablePrefix(spanConfigTableID)
expectedSpan = roachpb.Span{Key: spanConfigKey, EndKey: spanConfigKey.PrefixEnd()}
}
require.Equal(t, expectedSpan, spec.Partitions[0].PartitionSpec.Spans[0])
})
t.Run("nonexistent-replication-stream-has-inactive-status", func(t *testing.T) {
testStreamReplicationStatus(t, h.SysSQL, streampb.StreamID(123), streampb.StreamReplicationStatus_STREAM_INACTIVE)
})
})

t.Run("nonexistent-replication-stream-has-inactive-status", func(t *testing.T) {
testStreamReplicationStatus(t, h.SysSQL, streampb.StreamID(123), streampb.StreamReplicationStatus_STREAM_INACTIVE)
})
}

func encodeSpec(
Expand Down Expand Up @@ -348,7 +357,7 @@ USE d;
`)

ctx := context.Background()
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID
initialScanTimestamp := replicationProducerSpec.ReplicationStartTime

Expand Down Expand Up @@ -496,7 +505,7 @@ USE d;
`)

ctx := context.Background()
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
Expand Down Expand Up @@ -584,7 +593,7 @@ func TestCompleteStreamReplication(t *testing.T) {
"SET CLUSTER SETTING stream_replication.job_liveness_timeout = '2s';",
"SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '2s';")

replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
timedOutStreamID := replicationProducerSpec.StreamID
jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(timedOutStreamID))

Expand All @@ -596,7 +605,7 @@ func TestCompleteStreamReplication(t *testing.T) {
timedOutStreamID, successfulIngestion)

// Create a new replication stream and complete it.
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID
jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)",
Expand Down Expand Up @@ -673,7 +682,7 @@ USE d;
`)

ctx := context.Background()
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
replicationProducerSpec := h.StartReplicationStream(t, testTenantName, false)
streamID := replicationProducerSpec.StreamID
initialScanTimestamp := replicationProducerSpec.ReplicationStartTime

Expand Down
Loading

0 comments on commit 298eca7

Please sign in to comment.