Skip to content

Commit

Permalink
SAMZA-2251: Minor diagnostics manager change to emit additional job d…
Browse files Browse the repository at this point in the history
…etails

Author: Ray Matharu <rmatharu@linkedin.com>

Reviewers: Cameron Lee<calee@linkedin.com>

Closes apache#1083 from rmatharu/test-diagnostics-improvements
  • Loading branch information
rmatharu authored and jagadish-v0 committed Jun 25, 2019
1 parent 6e1301b commit 6b0c20a
Show file tree
Hide file tree
Showing 10 changed files with 862 additions and 51 deletions.
Expand Up @@ -146,7 +146,7 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
String jobId = new JobConfig(config).getJobId();
Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, METRICS_SOURCE_NAME, execEnvContainerId, config);
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, state.jobModelManager.jobModel(), METRICS_SOURCE_NAME, execEnvContainerId, config);

if (diagnosticsManagerReporterPair.isPresent()) {
diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
Expand Down Expand Up @@ -308,11 +308,13 @@ public void onResourceAllocated(SamzaResource resource) {
public void onResourceCompleted(SamzaResourceStatus resourceStatus) {
String containerId = resourceStatus.getContainerId();
String processorId = null;
String hostName = null;
for (Map.Entry<String, SamzaResource> entry: state.runningProcessors.entrySet()) {
if (entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) {
log.info("Container ID: {} matched running Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost());

processorId = entry.getKey();
hostName = entry.getValue().getHost();
break;
}
}
Expand Down Expand Up @@ -435,6 +437,10 @@ public void onResourceCompleted(SamzaResourceStatus resourceStatus) {
}

}

if (this.diagnosticsManager.isDefined()) {
this.diagnosticsManager.get().addProcessorStopEvent(processorId, resourceStatus.getContainerId(), hostName, exitStatus);
}
}

@Override
Expand Down
Expand Up @@ -205,4 +205,12 @@ public boolean hasDurableStores() {
Config subConfig = subset(STORE_PREFIX, true);
return subConfig.keySet().stream().anyMatch(key -> key.endsWith(CHANGELOG_SUFFIX));
}

/**
* Helper method to get the number of stores configured with a changelog.
*/
public int getNumStoresWithChangelog() {
Config subConfig = subset(STORE_PREFIX, true);
return new Long(subConfig.keySet().stream().filter(key -> key.endsWith(CHANGELOG_SUFFIX)).count()).intValue();
}
}
Expand Up @@ -341,7 +341,7 @@ SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
String jobName = new JobConfig(config).getName().get();
String jobId = new JobConfig(config).getJobId();
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, processorId, Optional.empty(), config);
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, processorId, Optional.empty(), config);
Option<DiagnosticsManager> diagnosticsManager = Option.empty();
if (diagnosticsManagerReporterPair.isPresent()) {
diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
Expand Down
Expand Up @@ -99,7 +99,7 @@ private static void run(
Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);

// Creating diagnostics manager and reporter, and wiring it respectively
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, containerId, execEnvContainerId, config);
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
Option<DiagnosticsManager> diagnosticsManager = Option.empty();
if (diagnosticsManagerReporterPair.isPresent()) {
diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
Expand Down
35 changes: 23 additions & 12 deletions samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
Expand Up @@ -24,13 +24,16 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
Expand All @@ -50,7 +53,6 @@
public class DiagnosticsUtil {
private static final Logger log = LoggerFactory.getLogger(DiagnosticsUtil.class);


// Write a file in the samza.log.dir named {exec-env-container-id}.metadata that contains
// metadata about the container such as containerId, jobName, jobId, hostname, timestamp, version info, and others.
// The file contents are serialized using {@link JsonSerde}.
Expand All @@ -61,9 +63,9 @@ public static void writeMetadataFile(String jobName, String jobId, String contai

if (metadataFile.isDefined()) {
MetricsHeader metricsHeader =
new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(),
Util.getTaskClassVersion(config), Util.getSamzaVersion(), Util.getLocalHost().getHostName(),
System.currentTimeMillis(), System.currentTimeMillis());
new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""),
LocalContainerRunner.class.getName(), Util.getTaskClassVersion(config), Util.getSamzaVersion(),
Util.getLocalHost().getHostName(), System.currentTimeMillis(), System.currentTimeMillis());

class MetadataFileContents {
public final String version;
Expand All @@ -76,26 +78,30 @@ public MetadataFileContents(String version, String metricsSnapshot) {
}

MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics());
MetadataFileContents metadataFileContents = new MetadataFileContents("1", new String(new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
MetadataFileContents metadataFileContents =
new MetadataFileContents("1", new String(new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
FileUtil.writeToTextFile(metadataFile.get(), new String(new JsonSerde<>().toBytes(metadataFileContents)), false);
} else {
log.info("Skipping writing metadata file.");
}
}


/**
* Create a pair of DiagnosticsManager and Reporter for the given jobName, jobId, containerId, and execEnvContainerId,
* if diagnostics is enabled.
* execEnvContainerId is the ID assigned to the container by the cluster manager (e.g., YARN).
*/
public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildDiagnosticsManager(String jobName, String jobId,
String containerId, Optional<String> execEnvContainerId, Config config) {
public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildDiagnosticsManager(String jobName,
String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {

Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = Optional.empty();

if (new JobConfig(config).getDiagnosticsEnabled()) {

ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
int containerNumCores = clusterManagerConfig.getNumCores();

// Diagnostic stream, producer, and reporter related parameters
String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
MetricsConfig metricsConfig = new MetricsConfig(config);
Expand All @@ -111,16 +117,21 @@ public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildD

SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());

Optional<String> diagnosticsSystemFactoryName = new SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
Optional<String> diagnosticsSystemFactoryName =
new SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
if (!diagnosticsSystemFactoryName.isPresent()) {
throw new SamzaException("Missing factory in config for system " + diagnosticsSystemStream.getSystem());
}

// Create a systemProducer for giving to diagnostic-reporter and diagnosticsManager
SystemFactory systemFactory = Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
SystemProducer systemProducer = systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap());
DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
samzaVersion, hostName, diagnosticsSystemStream, systemProducer, Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
SystemProducer systemProducer =
systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap());
DiagnosticsManager diagnosticsManager =
new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), containerMemoryMb, containerNumCores,
new StorageConfig(config).getNumStoresWithChangelog(), containerId, execEnvContainerId.orElse(""),
taskClassVersion, samzaVersion, hostName, diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));

Option<String> blacklist = ScalaJavaUtil.JavaOptionals$.MODULE$.toRichOptional(
metricsConfig.getMetricsSnapshotReporterBlacklist(diagnosticsReporterName)).toOption();
Expand Down
Expand Up @@ -19,25 +19,23 @@

package org.apache.samza.diagnostics;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,8 +53,6 @@ public class DiagnosticsManager {
// Period size for pushing data to the diagnostic stream

private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager Thread-%d";
private static final String METRICS_GROUP_NAME = "org.apache.samza.container.SamzaContainerMetrics";
// Using SamzaContainerMetrics as the group name to maintain compatibility with existing diagnostics

// Parameters used for populating the MetricHeader when sending diagnostic-stream messages
private final String jobName;
Expand All @@ -68,31 +64,58 @@ public class DiagnosticsManager {
private final String hostname;
private final Instant resetTime;

private SystemProducer systemProducer; // SystemProducer for writing diagnostics data
private BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent
// Job-related params
private final Integer containerMemoryMb;
private final Integer containerNumCores;
private final Integer numStoresWithChangelog;
private final Map<String, ContainerModel> containerModels;
private boolean jobParamsEmitted = false;

private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data
private final BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent
private final ConcurrentLinkedQueue<ProcessorStopEvent> processorStopEvents;
// A BoundedList for storing DiagnosticExceptionEvent
private final ScheduledExecutorService scheduler; // Scheduler for pushing data to the diagnostic stream
private final Duration terminationDuration; // duration to wait when terminating the scheduler
private final SystemStream diagnosticSystemStream;

public DiagnosticsManager(String jobName, String jobId, String containerId, String executionEnvContainerId,
String taskClassVersion, String samzaVersion, String hostname, SystemStream diagnosticSystemStream,
SystemProducer systemProducer, Duration terminationDuration) {
public DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel> containerModels,
Integer containerMemoryMb, Integer containerNumCores, Integer numStoresWithChangelog, String containerId,
String executionEnvContainerId, String taskClassVersion, String samzaVersion, String hostname,
SystemStream diagnosticSystemStream, SystemProducer systemProducer, Duration terminationDuration) {

this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numStoresWithChangelog, containerId,
executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
terminationDuration, Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
}

@VisibleForTesting
DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel> containerModels,
int containerMemoryMb, int containerNumCores, int numStoresWithChangelog, String containerId,
String executionEnvContainerId, String taskClassVersion, String samzaVersion, String hostname,
SystemStream diagnosticSystemStream, SystemProducer systemProducer, Duration terminationDuration,
ScheduledExecutorService executorService) {
this.jobName = jobName;
this.jobId = jobId;
this.containerModels = containerModels;
this.containerMemoryMb = containerMemoryMb;
this.containerNumCores = containerNumCores;
this.numStoresWithChangelog = numStoresWithChangelog;
this.containerId = containerId;
this.executionEnvContainerId = executionEnvContainerId;
this.taskClassVersion = taskClassVersion;
this.samzaVersion = samzaVersion;
this.hostname = hostname;
resetTime = Instant.now();

this.systemProducer = systemProducer;
this.diagnosticSystemStream = diagnosticSystemStream;
this.systemProducer = systemProducer;
this.terminationDuration = terminationDuration;

this.processorStopEvents = new ConcurrentLinkedQueue<>();
this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build());
this.terminationDuration = terminationDuration;
this.scheduler = executorService;

resetTime = Instant.now();

try {

Expand Down Expand Up @@ -141,36 +164,59 @@ public void addExceptionEvent(DiagnosticsExceptionEvent diagnosticsExceptionEven
this.exceptions.add(diagnosticsExceptionEvent);
}

public void addProcessorStopEvent(String processorId, String resourceId, String host, int exitStatus) {
this.processorStopEvents.add(new ProcessorStopEvent(processorId, resourceId, host, exitStatus));
LOG.info("Added stop event for Container Id: {}, resource Id: {}, host: {}, exitStatus: {}", processorId,
resourceId, host, exitStatus);
}

private class DiagnosticsStreamPublisher implements Runnable {

@Override
public void run() {
// Publish exception events if there are any
Collection<DiagnosticsExceptionEvent> exceptionList = exceptions.getValues();
try {
DiagnosticsStreamMessage diagnosticsStreamMessage =
new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId,
taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli());

// Add job-related params to the message (if not already published)
if (!jobParamsEmitted) {
diagnosticsStreamMessage.addContainerMb(containerMemoryMb);
diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
diagnosticsStreamMessage.addContainerModels(containerModels);
}

if (!exceptionList.isEmpty()) {
// Add stop event list to the message
diagnosticsStreamMessage.addProcessorStopEvents(new ArrayList(processorStopEvents));

// Create the metricHeader
MetricsHeader metricsHeader = new MetricsHeader(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId,
DiagnosticsUtil.class.getName(), taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(),
resetTime.toEpochMilli());
// Add exception events to the message
diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptions.getValues());

Map<String, Map<String, Object>> metricsMessage = new HashMap<>();
metricsMessage.putIfAbsent(METRICS_GROUP_NAME, new HashMap<>());
metricsMessage.get(METRICS_GROUP_NAME).put(exceptions.getName(), exceptionList);
MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricsMessage));
if (!diagnosticsStreamMessage.isEmpty()) {

try {
systemProducer.send(DiagnosticsManager.class.getName(),
new OutgoingMessageEnvelope(diagnosticSystemStream, metricsHeader.getHost(), null,
new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
new OutgoingMessageEnvelope(diagnosticSystemStream, hostname, null,
new MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot())));
systemProducer.flush(DiagnosticsManager.class.getName());

// Remove stop events from list after successful publish
if (diagnosticsStreamMessage.getProcessorStopEvents() != null) {
processorStopEvents.removeAll(diagnosticsStreamMessage.getProcessorStopEvents());
}

// Remove exceptions from list after successful publish to diagnostics stream
exceptions.remove(exceptionList);
} catch (Exception e) {
LOG.error("Exception when flushing exceptions", e);
if (diagnosticsStreamMessage.getExceptionEvents() != null) {
exceptions.remove(diagnosticsStreamMessage.getExceptionEvents());
}

// Emit jobParams once
jobParamsEmitted = true;
}
} catch (Exception e) {
LOG.error("Exception when flushing diagnosticsStreamMessage", e);
}
}
}

}

0 comments on commit 6b0c20a

Please sign in to comment.