Skip to content

Commit

Permalink
Enable metric extraction for batch dataflow worker (#1)
Browse files Browse the repository at this point in the history
Co-authored-by: steve <sniemitz@twitter.com>
Co-authored-by: Kanishk Karanawat <kkaranawat@twitter.com>
  • Loading branch information
3 people authored and steveniemitz committed Nov 6, 2020
1 parent 6876c27 commit af7aa71
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 3 deletions.
Expand Up @@ -29,7 +29,7 @@ plugins { id 'org.apache.beam.module' }
// by adding -Pdataflow.version=<version> in Gradle command. Otherwise,
// 'google_clients_version' defined in BeamModulePlugin will be used as default.
def DATAFLOW_VERSION = "dataflow.version"
def DATAFLOW_WORKER_REV = "20200203"
def DATAFLOW_WORKER_REV = "20200504"

// Get full dependency of 'com.google.apis:google-api-services-dataflow'
def google_api_services_dataflow = project.hasProperty(DATAFLOW_VERSION) ? "com.google.apis:google-api-services-dataflow:" + getProperty(DATAFLOW_VERSION) : library.java.google_api_services_dataflow
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.*;

import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.WorkItem;
import java.io.Closeable;
Expand All @@ -30,6 +32,8 @@
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness;
import org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
import org.apache.beam.runners.dataflow.worker.graph.CreateExecutableStageNodeFunction;
Expand Down Expand Up @@ -144,9 +148,15 @@ public class BatchDataflowWorker implements Closeable {
private final SdkHarnessRegistry sdkHarnessRegistry;
private final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork;

private final CounterSet memoryCounter;
private final MemoryMonitor memoryMonitor;
private final Thread memoryMonitorThread;

private static final CounterName BATCH_WORK_ITEM_SUCCESS_COUNTER_NAME =
CounterName.named("work_item_success");
private static final CounterName BATCH_WORK_ITEM_FAILURE_COUNTER_NAME =
CounterName.named("work_item_failure");

/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
Expand Down Expand Up @@ -209,7 +219,12 @@ protected BatchDataflowWorker(
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.build();

this.memoryMonitor = MemoryMonitor.fromOptions(options);
this.memoryCounter = new CounterSet();
this.memoryMonitor =
MemoryMonitor.fromOptions(
options,
memoryCounter.longSum(MEMORY_MONITOR_NUM_PUSHBACKS.counterName()),
memoryCounter.longSum(MEMORY_MONITOR_IS_THRASHING.counterName()));
this.statusPages =
WorkerStatusPages.create(
DEFAULT_STATUS_PORT, this.memoryMonitor, sdkHarnessRegistry::sdkHarnessesAreHealthy);
Expand Down Expand Up @@ -324,6 +339,11 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr

DataflowWorkExecutor worker = null;
SdkWorkerHarness sdkWorkerHarness = sdkHarnessRegistry.getAvailableWorkerAndAssignWork();
CounterSet counterSet = new CounterSet();
Counter<Long, Long> workItemsReceived = counterSet.longSum(WORK_ITEMS_RECEIVED.counterName());
Counter<Long, Long> workItemSuccess = counterSet.longSum(BATCH_WORK_ITEM_SUCCESS_COUNTER_NAME);
Counter<Long, Long> workItemFailure = counterSet.longSum(BATCH_WORK_ITEM_FAILURE_COUNTER_NAME);

try {
// Populate PipelineOptions with data from work unit.
options.setProject(workItem.getProjectId());
Expand All @@ -337,10 +357,10 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr
throw new RuntimeException("Unknown kind of work item: " + workItem.toString());
}

CounterSet counterSet = new CounterSet();
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.create(
counterSet,
this.memoryCounter,
sideInputDataCache,
sideInputWeakReferenceCache,
readerRegistry,
Expand Down Expand Up @@ -383,11 +403,15 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr

DataflowWorkProgressUpdater progressUpdater =
new DataflowWorkProgressUpdater(workItemStatusClient, workItem, worker);

workItemsReceived.addValue(1L);
executeWork(worker, progressUpdater);
workItemSuccess.addValue(1L);
workItemStatusClient.reportSuccess();
return true;

} catch (Throwable e) {
workItemFailure.addValue(1L);
workItemStatusClient.reportError(e);
return false;

Expand Down
Expand Up @@ -34,6 +34,8 @@
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
Expand Down Expand Up @@ -61,6 +63,7 @@ public class BatchModeExecutionContext
protected final Cache<?, ?> logicalReferenceCache;
protected final PipelineOptions options;
protected final ReaderFactory readerFactory;
private final CounterSet memoryDeltaCounter;
private Object key;

private final MetricsContainerRegistry<MetricsContainerImpl> containerRegistry;
Expand All @@ -76,6 +79,7 @@ public class BatchModeExecutionContext

private BatchModeExecutionContext(
CounterFactory counterFactory,
CounterSet memoryDeltaCounter,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
Expand All @@ -88,6 +92,7 @@ private BatchModeExecutionContext(
executionStateTracker,
executionStateRegistry,
Long.MAX_VALUE);
this.memoryDeltaCounter = memoryDeltaCounter;
this.logicalReferenceCache = logicalReferenceCache;
this.readerFactory = readerFactory;
this.options = options;
Expand All @@ -110,6 +115,7 @@ public static BatchModeExecutionContext forTesting(
BatchModeExecutionStateRegistry stateRegistry = new BatchModeExecutionStateRegistry();
return new BatchModeExecutionContext(
counterFactory,
new CounterSet(),
CacheBuilder.newBuilder()
.maximumWeight(1_000_000) // weights are in bytes
.weigher(Weighers.fixedWeightKeys(8))
Expand Down Expand Up @@ -220,6 +226,7 @@ protected DataflowOperationContext.DataflowExecutionState createState(

public static BatchModeExecutionContext create(
CounterFactory counterFactory,
CounterSet memoryCounter,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
Expand All @@ -229,6 +236,7 @@ public static BatchModeExecutionContext create(
BatchModeExecutionStateRegistry executionStateRegistry = new BatchModeExecutionStateRegistry();
return new BatchModeExecutionContext(
counterFactory,
memoryCounter,
dataCache,
logicalReferenceCache,
readerFactory,
Expand Down Expand Up @@ -320,6 +328,10 @@ public <K, V> Cache<K, V> getLogicalReferenceCache() {
return rval;
}

public Iterable<CounterUpdate> extractMemoryCounters() {
return memoryDeltaCounter.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE);
}

/** {@link DataflowStepContext} used in batch mode. */
public class StepContext extends DataflowExecutionContext.DataflowStepContext {

Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.WorkerMetricsReceiver;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
Expand All @@ -48,6 +49,7 @@
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -69,6 +71,7 @@ public class WorkItemStatusClient {
private final WorkUnitClient workUnitClient;
private @Nullable DataflowWorkExecutor worker;
private Long nextReportIndex;
private final Iterable<WorkerMetricsReceiver> workerMetricReceivers;

private transient String uniqueWorkId = null;
private boolean finalStateSent = false;
Expand All @@ -88,6 +91,7 @@ public WorkItemStatusClient(WorkUnitClient workUnitClient, WorkItem workItem) {
this.workItem = workItem;
this.nextReportIndex =
checkNotNull(workItem.getInitialReportIndex(), "WorkItem missing initial report index");
this.workerMetricReceivers = ReflectHelpers.loadServicesOrdered(WorkerMetricsReceiver.class);
}

public String uniqueWorkId() {
Expand Down Expand Up @@ -302,13 +306,27 @@ synchronized void populateCounterUpdates(WorkItemStatus status) {
// MSec counters reported in worker
extractMsecCounters(isFinalUpdate).forEach(appendCounterUpdate);

// Extract memory metrics in worker
extractMemoryCounters().forEach(appendCounterUpdate);

// Metrics reported in SDK runner.
// This includes all different kinds of metrics coming from SDK.
// Keep in mind that these metrics might contain different types of counter names:
// i.e. structuredNameAndMetadata and nameAndKind
worker.extractMetricUpdates().forEach(appendCounterUpdate);

status.setCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values()));
publishCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values()));
}

private void publishCounterUpdates(List<CounterUpdate> updates) {
try {
for (WorkerMetricsReceiver receiver : workerMetricReceivers) {
receiver.receiverCounterUpdates(updates);
}
} catch (Exception e) {
LOG.error("Error publishing counter updates", e);
}
}

private synchronized Iterable<CounterUpdate> extractCounters(@Nullable CounterSet counters) {
Expand Down Expand Up @@ -343,6 +361,12 @@ public Iterable<CounterUpdate> extractMsecCounters(boolean isFinalUpdate) {
: executionContext.extractMsecCounters(isFinalUpdate);
}

public Iterable<CounterUpdate> extractMemoryCounters() {
return executionContext == null
? Collections.emptyList()
: executionContext.extractMemoryCounters();
}

public long extractThrottleTime() {
return executionContext == null ? 0L : executionContext.extractThrottleTime();
}
Expand Down

0 comments on commit af7aa71

Please sign in to comment.