Skip to content

Commit

Permalink
[GOBBLIN-2066] Report dataset Metrics Summary on Temporal (#3912)
Browse files Browse the repository at this point in the history
Report dataset metrics summary on temporal
  • Loading branch information
Will-Lo committed May 23, 2024
1 parent fdfa097 commit f39228d
Show file tree
Hide file tree
Showing 18 changed files with 274 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.gobblin.runtime;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import org.apache.gobblin.metrics.DatasetMetric;

Expand All @@ -27,11 +30,13 @@
* that can be reported as a single event in the commit phase.
*/
@Data
@RequiredArgsConstructor
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class DatasetTaskSummary {
private final String datasetUrn;
private final long recordsWritten;
private final long bytesWritten;
private final boolean successfullyCommitted;
@NonNull private String datasetUrn;
@NonNull private long recordsWritten;
@NonNull private long bytesWritten;
@NonNull private boolean successfullyCommitted;

/**
* Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class JobContext implements Closeable {
private final String jobId;
private final String jobSequence;
private final JobState jobState;
@Getter(AccessLevel.PACKAGE)
@Getter
private final JobCommitPolicy jobCommitPolicy;
// A job commit semantic where we want partially successful tasks to commit their data, but still fail the job
// WARNING: this is for Gobblin jobs that do not record their watermark, hence this would not lead to duplicate work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;


Expand All @@ -32,5 +34,5 @@ public interface CommitActivity {
* @return number of workunits committed
*/
@ActivityMethod
int commit(WUProcessingSpec workSpec);
CommitStats commit(WUProcessingSpec workSpec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.ddm.activity.impl;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -34,6 +35,7 @@
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.temporal.failure.ApplicationFailure;

Expand All @@ -44,25 +46,29 @@
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;


@Slf4j
public class CommitActivityImpl implements CommitActivity {

Expand All @@ -71,7 +77,7 @@ public class CommitActivityImpl implements CommitActivity {
static String UNDEFINED_JOB_NAME = "<job_name_stub>";

@Override
public int commit(WUProcessingSpec workSpec) {
public CommitStats commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
Optional<String> optJobName = Optional.empty();
Expand All @@ -84,11 +90,20 @@ public int commit(WUProcessingSpec workSpec) {
troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, numDeserializationThreads);
if (!taskStates.isEmpty()) {
JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
commitTaskStates(jobState, taskStates, jobContext);
if (taskStates.isEmpty()) {
return CommitStats.createEmpty();
}
return taskStates.size();

JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList());
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
commitTaskStates(jobState, datasetStatesByUrns, jobContext);

boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");

Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks);
return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
Expand All @@ -106,17 +121,11 @@ public int commit(WUProcessingSpec workSpec) {
/**
* Commit task states to the dataset state store.
* @param jobState
* @param taskStates
* @param datasetStatesByUrns
* @param jobContext
* @throws IOException
*/
private void commitTaskStates(JobState jobState, List<TaskState> taskStates, JobContext jobContext) throws IOException {
if (!taskStates.isEmpty()) {
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
}
//TODO: handle skipped tasks?
Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
private void commitTaskStates(JobState jobState, Map<String, JobState.DatasetState> datasetStatesByUrns, JobContext jobContext) throws IOException {
final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
//TODO: Make this configurable
Expand Down Expand Up @@ -149,13 +158,11 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry

IteratorExecutor.logFailures(result, null, 10);

Set<String> failedDatasetUrns = new HashSet<>();
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
// Set the overall job state to FAILED if the job failed to process any dataset
if (datasetState.getState() == JobState.RunningState.FAILED) {
failedDatasetUrns.add(datasetState.getDatasetUrn());
}
}
Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
.filter(datasetState -> datasetState.getState() == JobState.RunningState.FAILED)
.map(JobState.DatasetState::getDatasetUrn)
.collect(Collectors.toCollection(HashSet::new));

if (!failedDatasetUrns.isEmpty()) {
String allFailedDatasets = String.join(", ", failedDatasetUrns);
log.error("Failed to commit dataset state for dataset(s) {}" + allFailedDatasets);
Expand Down Expand Up @@ -194,6 +201,37 @@ private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem fs,
});
}

private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState.DatasetState> datasetStatesByUrns, JobCommitPolicy commitPolicy, boolean shouldIncludeFailedTasks) {
Map<String, DatasetStats> datasetTaskStats = new HashMap<>();
// Only process successful datasets unless configuration to process failed datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED
&& commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
long totalBytesWritten = 0;
long totalRecordsWritten = 0;
int totalCommittedTasks = 0;
for (TaskState taskState : datasetState.getTaskStates()) {
// Certain writers may omit these metrics e.g. CompactionLauncherWriter
if (taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED || shouldIncludeFailedTasks) {
if (taskState.getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
totalCommittedTasks++;
}
totalBytesWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
totalRecordsWritten += taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
}
}
log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: %d)", datasetState.getDatasetUrn(),
totalRecordsWritten, totalBytesWritten));
datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats(totalRecordsWritten, totalBytesWritten, true, totalCommittedTasks));
} else if (datasetState.getState() == JobState.RunningState.FAILED && commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
// Check if config is turned on for submitting writer metrics on failure due to non-atomic write semantics
log.info("Due to task failure, will report that no records or bytes were written for " + datasetState.getDatasetUrn());
datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats( 0, 0, false, 0));
}
}
return datasetTaskStats;
}

/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
Expand Down Expand Up @@ -85,8 +86,8 @@ public void submitJob(List<WorkUnit> workunits) {
EventSubmitterContext eventSubmitterContext = new EventSubmitterContext.Builder(eventSubmitter)
.withGaaSJobProps(this.jobProps)
.build();
int numWorkUnits = workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), eventSubmitterContext);
log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
ExecGobblinStats execGobblinStats = workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), eventSubmitterContext);
log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", execGobblinStats);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.work;

import java.util.HashMap;
import java.util.Map;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;


/**
* Data structure representing the stats for a committed dataset, and the total number of committed workunits in the Gobblin Temporal job
* Return type of {@link org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow#process(WUProcessingSpec)}
* and {@link org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
*/
@Data
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class CommitStats {
@NonNull private Map<String, DatasetStats> datasetStats;
@NonNull private int numCommittedWorkUnits;

public static CommitStats createEmpty() {
return new CommitStats(new HashMap<>(), 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.work;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;


/**
* Stats for a dataset that was committed.
*/
@Data
@NonNull
@RequiredArgsConstructor
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class DatasetStats {
@NonNull private long recordsWritten;
@NonNull private long bytesWritten;
@NonNull private boolean successfullyCommitted;
@NonNull private int numCommittedWorkunits;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.work;

import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;


/** Capture details (esp. for the temporal UI) of a {@link org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */
@Data
@RequiredArgsConstructor
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class ExecGobblinStats {
@NonNull private int numWorkUnits;
@NonNull private int numCommitted;
@NonNull private String user;
@NonNull private Map<String, DatasetStats> stats;
}

Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ public static String qualifyNamePerExecWithoutFlowExecId(String name, Config wor
return name + "_" + calcPerExecQualifier(workerConfig);
}

/** @return execution-specific name, incorporating any {@link ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
public static String qualifyNamePerExecWithFlowExecId(String name, Config jobProps) {
Optional<String> optFlowExecId = Optional.ofNullable(ConfigUtils.getString(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, jobProps);
/** @return execution-specific name, incorporating any {@link ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `config` */
public static String qualifyNamePerExecWithFlowExecId(String name, Config config) {
Optional<String> optFlowExecId = Optional.ofNullable(ConfigUtils.getString(config, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, config);
}

public static String calcPerExecQualifierWithOptFlowExecId(Optional<String> optFlowExecId, Config workerConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;


Expand All @@ -33,5 +35,5 @@ public interface CommitStepWorkflow {
* @return number of workunits committed
*/
@WorkflowMethod
int commit(WUProcessingSpec workSpec);
CommitStats commit(WUProcessingSpec workSpec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.workflow.WorkflowMethod;

import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;


Expand All @@ -37,5 +38,5 @@
public interface ExecuteGobblinWorkflow {
/** @return the number of {@link WorkUnit}s discovered and successfully processed */
@WorkflowMethod
int execute(Properties props, EventSubmitterContext eventSubmitterContext);
ExecGobblinStats execute(Properties props, EventSubmitterContext eventSubmitterContext);
}
Loading

0 comments on commit f39228d

Please sign in to comment.