Skip to content

Commit

Permalink
Merge pull request apache#17422 from [BEAM-14344]: remove tracing fro…
Browse files Browse the repository at this point in the history
…m spannerio change streams

Removes distributed tracing from spannerio change streams. The
distributed tracing is not currently adding any value to the debugging
or understanding of the process execution. We remove tracing here in
order to simplify the code.
  • Loading branch information
thiagotnunes authored and jrmccluskey committed May 3, 2022
1 parent 63be015 commit b823e5d
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 455 deletions.
1 change: 0 additions & 1 deletion sdks/java/io/google-cloud-platform/build.gradle
Expand Up @@ -143,7 +143,6 @@ dependencies {
implementation library.java.arrow_vector

implementation "org.threeten:threetenbp:1.4.4"
implementation "io.opencensus:opencensus-api:0.31.0"

testImplementation library.java.arrow_memory_netty
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down
Expand Up @@ -49,13 +49,6 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import io.opencensus.common.Scope;
import io.opencensus.trace.Sampler;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.config.TraceConfig;
import io.opencensus.trace.config.TraceParams;
import io.opencensus.trace.samplers.Samplers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -1388,6 +1381,8 @@ public abstract static class ReadChangeStream

abstract @Nullable RpcPriority getRpcPriority();

/** @deprecated This configuration has no effect, as tracing is not available */
@Deprecated
abstract @Nullable Double getTraceSampleProbability();

abstract Builder toBuilder();
Expand Down Expand Up @@ -1489,7 +1484,12 @@ public ReadChangeStream withRpcPriority(RpcPriority rpcPriority) {
return toBuilder().setRpcPriority(rpcPriority).build();
}

/** Specifies the sample probability of tracing requests. */
/**
* Specifies the sample probability of tracing requests.
*
* @deprecated This configuration has no effect, as tracing is not available.
*/
@Deprecated
public ReadChangeStream withTraceSampleProbability(Double probability) {
return toBuilder().setTraceSampleProbability(probability).build();
}
Expand Down Expand Up @@ -1544,102 +1544,84 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));

if (getTraceSampleProbability() != null) {
TraceConfig globalTraceConfig = Tracing.getTraceConfig();
final Sampler sampler = Samplers.probabilitySampler(getTraceSampleProbability());
globalTraceConfig.updateActiveTraceParams(
TraceParams.DEFAULT.toBuilder().setSampler(sampler).build());
SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
// Set default retryable errors for ReadChangeStream
if (changeStreamSpannerConfig.getRetryableCodes() == null) {
ImmutableSet<Code> defaultRetryableCodes = ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED);
changeStreamSpannerConfig =
changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build();
}
Tracer tracer = Tracing.getTracer();
try (Scope scope =
tracer
.spanBuilder("SpannerIO.ReadChangeStream.expand")
.setRecordEvents(true)
.startScopedSpan()) {
SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
// Set default retryable errors for ReadChangeStream
if (changeStreamSpannerConfig.getRetryableCodes() == null) {
ImmutableSet<Code> defaultRetryableCodes =
ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED);
changeStreamSpannerConfig =
changeStreamSpannerConfig
.toBuilder()
.setRetryableCodes(defaultRetryableCodes)
.build();
}
// Set default retry timeouts for ReadChangeStream
if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) {
changeStreamSpannerConfig =
changeStreamSpannerConfig
.toBuilder()
.setExecuteStreamingSqlRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5))
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
.build())
.build();
}
final SpannerConfig partitionMetadataSpannerConfig =
// Set default retry timeouts for ReadChangeStream
if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) {
changeStreamSpannerConfig =
changeStreamSpannerConfig
.toBuilder()
.setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
.setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
.setExecuteStreamingSqlRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5))
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
.build())
.build();
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
// interval into a closed-open in the read change stream restriction (prevents overflow)
final Timestamp endTimestamp =
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
? MAX_INCLUSIVE_END_AT
: getInclusiveEndAt();
final MapperFactory mapperFactory = new MapperFactory();
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
final ThroughputEstimator throughputEstimator = new ThroughputEstimator();
final RpcPriority rpcPriority =
MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
final DaoFactory daoFactory =
new DaoFactory(
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
rpcPriority,
input.getPipeline().getOptions().getJobName());
final ActionFactory actionFactory = new ActionFactory();

final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(
daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
input
.getPipeline()
.getOptions()
.as(SpannerChangeStreamOptions.class)
.setMetadataTable(partitionMetadataTableName);

PCollection<byte[]> impulseOut = input.apply(Impulse.create());
PCollection<DataChangeRecord> results =
impulseOut
.apply("Initialize the connector", ParDo.of(initializeDoFn))
.apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
.apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn))
.apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn));

impulseOut
.apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
.apply(Wait.on(results))
.apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
return results;
}
final SpannerConfig partitionMetadataSpannerConfig =
changeStreamSpannerConfig
.toBuilder()
.setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
.setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
.build();
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
// interval into a closed-open in the read change stream restriction (prevents overflow)
final Timestamp endTimestamp =
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
? MAX_INCLUSIVE_END_AT
: getInclusiveEndAt();
final MapperFactory mapperFactory = new MapperFactory();
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
final ThroughputEstimator throughputEstimator = new ThroughputEstimator();
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
final DaoFactory daoFactory =
new DaoFactory(
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
rpcPriority,
input.getPipeline().getOptions().getJobName());
final ActionFactory actionFactory = new ActionFactory();

final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(
daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
input
.getPipeline()
.getOptions()
.as(SpannerChangeStreamOptions.class)
.setMetadataTable(partitionMetadataTableName);

PCollection<byte[]> impulseOut = input.apply(Impulse.create());
PCollection<DataChangeRecord> results =
impulseOut
.apply("Initialize the connector", ParDo.of(initializeDoFn))
.apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
.apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn))
.apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn));

impulseOut
.apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
.apply(Wait.on(results))
.apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
return results;
}
}

Expand Down
Expand Up @@ -32,12 +32,6 @@ public class ChangeStreamMetrics implements Serializable {

private static final long serialVersionUID = 8187140831756972470L;

// ----
// Tracing Labels

/** Cloud Tracing label for Partition Tokens. */
public static final String PARTITION_ID_ATTRIBUTE_LABEL = "PartitionID";

// ------------------------
// Partition record metrics

Expand Down

0 comments on commit b823e5d

Please sign in to comment.