Skip to content

Commit

Permalink
[ML] improve reliability of job stats in larger clusters (elastic#86305
Browse files Browse the repository at this point in the history
…) (elastic#86310)

When gather job stats for closed jobs, we may be inadvertently executing on a transport thread. Typically, this is acceptable. But, when there are many jobs and many indices, this has a cascading effect and may cause the cluster to enter a troubling state.

This is main due to how slow security checks can be for search requests when the cluster has many indices.

To alleviate, gathering information about closed jobs is forked to the ML utility thread pool

related: elastic#82255
(cherry picked from commit 4e481c3)
  • Loading branch information
benwtrent committed Apr 29, 2022
1 parent 16b10b9 commit cca353b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 51 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/86305.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86305
summary: Improve reliability of job stats in larger clusters
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
Expand Down Expand Up @@ -64,6 +64,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<
private final AutodetectProcessManager processManager;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final ThreadPool threadPool;

@Inject
public TransportGetJobsStatsAction(
Expand All @@ -72,7 +73,8 @@ public TransportGetJobsStatsAction(
ClusterService clusterService,
AutodetectProcessManager processManager,
JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider
JobConfigProvider jobConfigProvider,
ThreadPool threadPool
) {
super(
GetJobsStatsAction.NAME,
Expand All @@ -88,6 +90,7 @@ public TransportGetJobsStatsAction(
this.processManager = processManager;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.threadPool = threadPool;
}

@Override
Expand Down Expand Up @@ -141,7 +144,7 @@ protected void taskOperation(GetJobsStatsAction.Request request, JobTask task, A
JobState jobState = MlTasks.getJobState(jobId, tasks);
String assignmentExplanation = pTask.getAssignment().getExplanation();
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task));
gatherForecastStats(jobId, forecastStats -> {
jobResultsProvider.getForecastStats(jobId, forecastStats -> {
JobStats jobStats = new JobStats(
jobId,
dataCounts,
Expand Down Expand Up @@ -187,55 +190,54 @@ void gatherStatsForClosedJobs(
};

PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
String jobId = closedJobIds.get(i);
gatherForecastStats(jobId, forecastStats -> {
gatherDataCountsModelSizeStatsAndTimingStats(jobId, (dataCounts, modelSizeStats, timingStats) -> {
JobState jobState = MlTasks.getJobState(jobId, tasks);
PersistentTasksCustomMetadata.PersistentTask<?> pTask = MlTasks.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(
slot,
new JobStats(
jobId,
dataCounts,
modelSizeStats,
forecastStats,
jobState,
null,
assignmentExplanation,
null,
timingStats
)
);
if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
// there was an error
listener.onFailure(searchException.get());
return;
}
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(
new GetJobsStatsAction.Response(
response.getTaskFailures(),
response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)
)
);
}
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
String jobId = closedJobIds.get(i);
jobResultsProvider.getForecastStats(jobId, forecastStats -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(() -> gatherDataCountsModelSizeStatsAndTimingStats(jobId, (dataCounts, modelSizeStats, timingStats) -> {
JobState jobState = MlTasks.getJobState(jobId, tasks);
PersistentTasksCustomMetadata.PersistentTask<?> pTask = MlTasks.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(
slot,
new JobStats(
jobId,
dataCounts,
modelSizeStats,
forecastStats,
jobState,
null,
assignmentExplanation,
null,
timingStats
)
);
if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
// there was an error
listener.onFailure(searchException.get());
return;
}
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(
new GetJobsStatsAction.Response(
response.getTaskFailures(),
response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)
)
);
}
}, errorHandler));
}, errorHandler);
}, errorHandler);
}
}

void gatherForecastStats(String jobId, Consumer<ForecastStats> handler, Consumer<Exception> errorHandler) {
jobResultsProvider.getForecastStats(jobId, handler, errorHandler);
}
});
}

void gatherDataCountsModelSizeStatsAndTimingStats(
Expand Down

0 comments on commit cca353b

Please sign in to comment.