From bf73671e11f1486ffb84e6a01075ad7e61fc4b83 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 6 Feb 2019 13:14:41 +0000 Subject: [PATCH 1/5] Fix the clock resolution to millis in ScheduledEventTests (#38506) The clock resolution changed from jdk8 to jdk10, hence the test is passing in jdk8 but failing in jdk10. Scheduled events are serialised and deserialised with millisecond precision, making test fail in jdk 10 and higher. Fixes a problem introduced by #38415 and the fix is identical to the one that was made in #38405. --- .../xpack/core/ml/calendars/ScheduledEventTests.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEventTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEventTests.java index 05209628542fe..6508ee5cb2054 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEventTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEventTests.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.time.Clock; +import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.EnumSet; @@ -28,7 +29,7 @@ public class ScheduledEventTests extends AbstractSerializingTestCase { public static ScheduledEvent createScheduledEvent(String calendarId) { - ZonedDateTime start = Clock.systemUTC().instant().atZone(ZoneOffset.UTC); + ZonedDateTime start = nowWithMillisResolution(); return new ScheduledEvent(randomAlphaOfLength(10), start, start.plusSeconds(randomIntBetween(1, 10000)), calendarId, null); } @@ -119,4 +120,8 @@ public void testLenientParser() throws IOException { ScheduledEvent.LENIENT_PARSER.apply(parser, null); } } + + private static ZonedDateTime nowWithMillisResolution() { + return Instant.ofEpochMilli(Clock.systemUTC().millis()).atZone(ZoneOffset.UTC); + } } From 861eee7ad2492e3694f6a718238866bd2e085d27 Mon Sep 17 00:00:00 2001 From: Marios Trivyzas Date: Wed, 6 Feb 2019 16:19:21 +0200 Subject: [PATCH 2/5] SQL: Fix issue with IN not resolving to underlying keyword field (#38440) - Add resolution to the exact keyword field (if exists) for text fields. - Add proper verification and error message if underlying keyword doesn'texist. - Move check for field attribute in the comparison list to the `resolveType()` method of `IN`. Fixes: #38424 --- .../predicate/operator/comparison/In.java | 23 ++++++++++++++++ .../xpack/sql/planner/QueryTranslator.java | 15 +++-------- .../analyzer/VerifierErrorMessagesTests.java | 27 +++++++++++++------ .../sql/planner/QueryTranslatorTests.java | 21 ++++++++------- 4 files changed, 56 insertions(+), 30 deletions(-) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/predicate/operator/comparison/In.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/predicate/operator/comparison/In.java index f8f0bb35b504e..f76523eaf0cd9 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/predicate/operator/comparison/In.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/predicate/operator/comparison/In.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.sql.expression.predicate.operator.comparison; +import org.elasticsearch.xpack.sql.analysis.index.MappingException; import org.elasticsearch.xpack.sql.expression.Expression; import org.elasticsearch.xpack.sql.expression.Expressions; +import org.elasticsearch.xpack.sql.expression.FieldAttribute; import org.elasticsearch.xpack.sql.expression.Foldables; import org.elasticsearch.xpack.sql.expression.Nullability; import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunction; @@ -105,6 +107,27 @@ protected Pipe makePipe() { return new InPipe(source(), this, children().stream().map(Expressions::pipe).collect(Collectors.toList())); } + @Override + protected TypeResolution resolveType() { + if (value instanceof FieldAttribute) { + try { + ((FieldAttribute) value).exactAttribute(); + } catch (MappingException e) { + return new TypeResolution(format(null, "[{}] cannot operate on field of data type [{}]: {}", + functionName(), value().dataType().esType, e.getMessage())); + } + } + + for (Expression ex : list) { + if (ex.foldable() == false) { + return new TypeResolution(format(null, "Comparisons against variables are not (currently) supported; offender [{}] in [{}]", + Expressions.name(ex), + name())); + } + } + return super.resolveType(); + } + @Override public int hashCode() { return Objects.hash(value, list); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryTranslator.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryTranslator.java index de529b2e4ca61..73e9ff57f379e 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryTranslator.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryTranslator.java @@ -105,7 +105,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.function.Supplier; import static java.util.Collections.singletonList; @@ -708,16 +707,6 @@ static class InComparisons extends ExpressionTranslator { @Override protected QueryTranslation asQuery(In in, boolean onAggs) { - Optional firstNotFoldable = in.list().stream().filter(expression -> !expression.foldable()).findFirst(); - - if (firstNotFoldable.isPresent()) { - throw new SqlIllegalArgumentException( - "Line {}:{}: Comparisons against variables are not (currently) supported; offender [{}] in [{}]", - firstNotFoldable.get().sourceLocation().getLineNumber(), - firstNotFoldable.get().sourceLocation().getColumnNumber(), - Expressions.name(firstNotFoldable.get()), - in.name()); - } if (in.value() instanceof NamedExpression) { NamedExpression ne = (NamedExpression) in.value(); @@ -735,7 +724,9 @@ protected QueryTranslation asQuery(In in, boolean onAggs) { else { Query q = null; if (in.value() instanceof FieldAttribute) { - q = new TermsQuery(in.source(), ne.name(), in.list()); + FieldAttribute fa = (FieldAttribute) in.value(); + // equality should always be against an exact match (which is important for strings) + q = new TermsQuery(in.source(), fa.isInexact() ? fa.exactAttribute().name() : fa.name(), in.list()); } else { q = new ScriptQuery(in.source(), in.asScript()); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java index 415472bfe3521..558d92351b069 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/analyzer/VerifierErrorMessagesTests.java @@ -385,23 +385,34 @@ public void testInNestedWithDifferentDataTypesFromLeftValue_SelectClause() { } public void testInWithDifferentDataTypes_WhereClause() { - assertEquals("1:49: expected data type [text], value provided is of type [integer]", - error("SELECT * FROM test WHERE text IN ('foo', 'bar', 4)")); + assertEquals("1:52: expected data type [keyword], value provided is of type [integer]", + error("SELECT * FROM test WHERE keyword IN ('foo', 'bar', 4)")); } public void testInNestedWithDifferentDataTypes_WhereClause() { - assertEquals("1:60: expected data type [text], value provided is of type [integer]", - error("SELECT * FROM test WHERE int = 1 OR text IN ('foo', 'bar', 2)")); + assertEquals("1:63: expected data type [keyword], value provided is of type [integer]", + error("SELECT * FROM test WHERE int = 1 OR keyword IN ('foo', 'bar', 2)")); } public void testInWithDifferentDataTypesFromLeftValue_WhereClause() { - assertEquals("1:35: expected data type [text], value provided is of type [integer]", - error("SELECT * FROM test WHERE text IN (1, 2)")); + assertEquals("1:38: expected data type [keyword], value provided is of type [integer]", + error("SELECT * FROM test WHERE keyword IN (1, 2)")); } public void testInNestedWithDifferentDataTypesFromLeftValue_WhereClause() { - assertEquals("1:46: expected data type [text], value provided is of type [integer]", - error("SELECT * FROM test WHERE int = 1 OR text IN (1, 2)")); + assertEquals("1:49: expected data type [keyword], value provided is of type [integer]", + error("SELECT * FROM test WHERE int = 1 OR keyword IN (1, 2)")); + } + + public void testInWithFieldInListOfValues() { + assertEquals("1:26: Comparisons against variables are not (currently) supported; offender [int] in [int IN (1, int)]", + error("SELECT * FROM test WHERE int IN (1, int)")); + } + + public void testInOnFieldTextWithNoKeyword() { + assertEquals("1:26: [IN] cannot operate on field of data type [text]: " + + "No keyword/multi-field defined exact matches for [text]; define one or use MATCH/QUERY instead", + error("SELECT * FROM test WHERE text IN ('foo', 'bar')")); } public void testNotSupportedAggregateOnDate() { diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/planner/QueryTranslatorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/planner/QueryTranslatorTests.java index ef7cdf54b89ab..2d94e7660e122 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/planner/QueryTranslatorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/planner/QueryTranslatorTests.java @@ -309,8 +309,8 @@ public void testTranslateInExpression_WhereClause() { tq.asBuilder().toString().replaceAll("\\s", "")); } - public void testTranslateInExpression_WhereClauseAndNullHandling() { - LogicalPlan p = plan("SELECT * FROM test WHERE keyword IN ('foo', null, 'lala', null, 'foo', concat('la', 'la'))"); + public void testTranslateInExpression_WhereClause_TextFieldWithKeyword() { + LogicalPlan p = plan("SELECT * FROM test WHERE some.string IN ('foo', 'bar', 'lala', 'foo', concat('la', 'la'))"); assertTrue(p instanceof Project); assertTrue(p.children().get(0) instanceof Filter); Expression condition = ((Filter) p.children().get(0)).condition(); @@ -319,21 +319,22 @@ public void testTranslateInExpression_WhereClauseAndNullHandling() { Query query = translation.query; assertTrue(query instanceof TermsQuery); TermsQuery tq = (TermsQuery) query; - assertEquals("{\"terms\":{\"keyword\":[\"foo\",\"lala\"],\"boost\":1.0}}", + assertEquals("{\"terms\":{\"some.string.typical\":[\"foo\",\"bar\",\"lala\"],\"boost\":1.0}}", tq.asBuilder().toString().replaceAll("\\s", "")); } - public void testTranslateInExpressionInvalidValues_WhereClause() { - LogicalPlan p = plan("SELECT * FROM test WHERE keyword IN ('foo', 'bar', keyword)"); + public void testTranslateInExpression_WhereClauseAndNullHandling() { + LogicalPlan p = plan("SELECT * FROM test WHERE keyword IN ('foo', null, 'lala', null, 'foo', concat('la', 'la'))"); assertTrue(p instanceof Project); assertTrue(p.children().get(0) instanceof Filter); Expression condition = ((Filter) p.children().get(0)).condition(); assertFalse(condition.foldable()); - SqlIllegalArgumentException ex = expectThrows(SqlIllegalArgumentException.class, () -> QueryTranslator.toQuery(condition, false)); - assertEquals( - "Line 1:52: Comparisons against variables are not (currently) supported; " - + "offender [keyword] in [keyword IN ('foo', 'bar', keyword)]", - ex.getMessage()); + QueryTranslation translation = QueryTranslator.toQuery(condition, false); + Query query = translation.query; + assertTrue(query instanceof TermsQuery); + TermsQuery tq = (TermsQuery) query; + assertEquals("{\"terms\":{\"keyword\":[\"foo\",\"lala\"],\"boost\":1.0}}", + tq.asBuilder().toString().replaceAll("\\s", "")); } public void testTranslateInExpression_WhereClause_Painless() { From a5c35f9da9773da3c30b06166904e49dbe93c28f Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Wed, 6 Feb 2019 15:44:43 +0100 Subject: [PATCH 3/5] Fix HistoryIntegrationTests timestamp comparsion (#38505) When the millisecond part of a timestamp is 0 the toString representation in java-time is omitting the millisecond part (joda was not). The Search response is returning timestamps formatted with WatcherDateTimeUtils, therefore comparisons of strings should be done with the same formatter relates #27330 --- .../integration/HistoryIntegrationTests.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HistoryIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HistoryIntegrationTests.java index 5c9dafeaca001..947bf2e210081 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HistoryIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HistoryIntegrationTests.java @@ -17,12 +17,15 @@ import org.elasticsearch.xpack.core.watcher.actions.ActionStatus; import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder; import org.elasticsearch.xpack.core.watcher.input.Input; +import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; +import org.hamcrest.Matcher; +import java.time.ZonedDateTime; import java.util.Locale; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -172,10 +175,10 @@ public void testThatHistoryContainsStatus() throws Exception { assertThat(active, is(status.state().isActive())); String timestamp = source.getValue("status.state.timestamp"); - assertThat(timestamp, is(status.state().getTimestamp().toString())); + assertThat(timestamp, isSameDate(status.state().getTimestamp())); String lastChecked = source.getValue("status.last_checked"); - assertThat(lastChecked, is(status.lastChecked().toString())); + assertThat(lastChecked, isSameDate(status.lastChecked())); Integer version = source.getValue("status.version"); int expectedVersion = (int) (status.version() - 1); @@ -196,4 +199,14 @@ public void testThatHistoryContainsStatus() throws Exception { assertThat(mappingSource.getValue("doc.properties.status.properties.status"), is(nullValue())); assertThat(mappingSource.getValue("doc.properties.status.properties.status.properties.active"), is(nullValue())); } + + + private Matcher isSameDate(ZonedDateTime zonedDateTime) { + /* + When comparing timestamps returned from _search/.watcher-history* the same format of date has to be used + during serialisation to json on index time. + The toString of ZonedDateTime is omitting the millisecond part when is 0. This was not the case in joda. + */ + return is(WatcherDateTimeUtils.formatDate(zonedDateTime)); + } } From 645db34e0e30d9616d3300a5e0acc27de922dd6c Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 6 Feb 2019 13:02:02 -0700 Subject: [PATCH 4/5] bad formatted JSON object (#38515) (#38525) It just need to replace the wrong " , " to " : " Backport of #38515 --- docs/reference/ingest/processors/foreach.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/ingest/processors/foreach.asciidoc b/docs/reference/ingest/processors/foreach.asciidoc index 3a341f60470bb..69bf2d85f0d8e 100644 --- a/docs/reference/ingest/processors/foreach.asciidoc +++ b/docs/reference/ingest/processors/foreach.asciidoc @@ -141,8 +141,8 @@ block to send the document to the 'failure_index' index for later inspection: "on_failure" : [ { "set" : { - "field", "_index", - "value", "failure_index" + "field": "_index", + "value": "failure_index" } } ] From 3bf8e2400e291a540bd656abfeff49a8ff94ec71 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 6 Feb 2019 14:39:24 -0600 Subject: [PATCH 5/5] ML: update set_upgrade_mode, add logging (#38372) * ML: update set_upgrade_mode, add logging * Attempt to fix datafeed isolation Also renamed a few methods/variables for clarity and added some comments --- .../xpack/core/ml/job/messages/Messages.java | 1 + .../xpack/ml/MlLifeCycleService.java | 2 +- .../action/TransportSetUpgradeModeAction.java | 4 ++ .../xpack/ml/datafeed/DatafeedManager.java | 32 +++++++------ .../test/ml/set_upgrade_mode.yml | 47 +++++++++++++++++-- 5 files changed, 67 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index a877b72bee0da..77ae8cb26eae9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -79,6 +79,7 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped"; + public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated"; public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''"; public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}"; public static final String JOB_AUDIT_DELETED = "Job deleted"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java index 302d9a7611d96..8005912107ad9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java @@ -45,7 +45,7 @@ public synchronized void stop() { // datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds // could fail if they send data to a dead autodetect process. if (datafeedManager != null) { - datafeedManager.isolateAllDatafeedsOnThisNode(); + datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown(); } NativeController nativeController = NativeControllerHolder.getNativeController(environment); if (nativeController != null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index edc31f1e896b7..d16f9e18421d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -263,6 +263,9 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe .sorted(Comparator.comparing(PersistentTask::getTaskName)) .collect(Collectors.toList()); + logger.info("Un-assigning persistent tasks : " + + datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]"))); + TypedChainTaskExecutor> chainTaskExecutor = new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, @@ -287,6 +290,7 @@ private void isolateDatafeeds(PersistentTasksCustomMetaData tasksCustomMetaData, ActionListener> listener) { Set datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData); + logger.info("Isolating datafeeds: " + datafeedsToIsolate.toString()); TypedChainTaskExecutor isolateDatafeedsExecutor = new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 409d15182d96a..53568c3705a8d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -64,7 +64,6 @@ public class DatafeedManager { private final DatafeedJobBuilder datafeedJobBuilder; private final TaskRunner taskRunner = new TaskRunner(); private final AutodetectProcessManager autodetectProcessManager; - private volatile boolean isolated; public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, Supplier currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) { @@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) { * This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves * the datafeed tasks in the "started" state, so that they get restarted on a different node. */ - public void isolateAllDatafeedsOnThisNode() { - isolated = true; + public void isolateAllDatafeedsOnThisNodeBeforeShutdown() { Iterator iter = runningDatafeedsOnThisNode.values().iterator(); while (iter.hasNext()) { Holder next = iter.next(); next.isolateDatafeed(); - next.setRelocating(); + // TODO: it's not ideal that this "isolate" method does something a bit different to the one below + next.setNodeIsShuttingDown(); iter.remove(); } } public void isolateDatafeed(long allocationId) { + // This calls get() rather than remove() because we expect that the persistent task will + // be removed shortly afterwards and that operation needs to be able to find the holder Holder holder = runningDatafeedsOnThisNode.get(allocationId); if (holder != null) { holder.isolateDatafeed(); @@ -195,7 +196,7 @@ protected void doRun() { holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); return; } - if (isolated == false) { + if (holder.isIsolated() == false) { if (next != null) { doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder); } else { @@ -298,7 +299,7 @@ public class Holder { private final ProblemTracker problemTracker; private final Consumer finishHandler; volatile Scheduler.Cancellable cancellable; - private volatile boolean isRelocating; + private volatile boolean isNodeShuttingDown; Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer finishHandler) { @@ -324,7 +325,7 @@ boolean isIsolated() { } public void stop(String source, TimeValue timeout, Exception e) { - if (isRelocating) { + if (isNodeShuttingDown) { return; } @@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) { if (cancellable != null) { cancellable.cancel(); } - auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); + auditor.info(datafeedJob.getJobId(), + Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED)); finishHandler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); - if (autoCloseJob) { + if (autoCloseJob && isIsolated() == false) { closeJob(); } if (acquired) { @@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) { } /** - * This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called - * immediately prior to shutting down a node. Then the datafeed task can remain "started", and be - * relocated to a different node. Calling this method at any other time will ruin the datafeed. + * This stops a datafeed WITHOUT updating the corresponding persistent task. When called it + * will stop the datafeed from sending data to its job as quickly as possible. The caller + * must do something sensible with the corresponding persistent task. If the node is shutting + * down the task will automatically get reassigned. Otherwise the caller must take action to + * remove or reassign the persistent task, or the datafeed will be left in limbo. */ public void isolateDatafeed() { datafeedJob.isolate(); } - public void setRelocating() { - isRelocating = true; + public void setNodeIsShuttingDown() { + isNodeShuttingDown = true; } private Long executeLookBack(long startTime, Long endTime) throws Exception { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml index be1e0203a92c7..9b33af5f48bb0 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml @@ -6,6 +6,10 @@ setup: indices.create: index: airline-data body: + settings: + index: + number_of_replicas: 0 + number_of_shards: 1 mappings: properties: time: @@ -53,10 +57,9 @@ setup: job_id: set-upgrade-mode-job - do: - headers: - Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser - ml.start_datafeed: - datafeed_id: set-upgrade-mode-job-datafeed + cluster.health: + index: airline-data + wait_for_status: green --- teardown: @@ -70,6 +73,10 @@ teardown: --- "Test setting upgrade_mode to false when it is already false": + - do: + ml.start_datafeed: + datafeed_id: set-upgrade-mode-job-datafeed + - do: ml.set_upgrade_mode: enabled: false @@ -92,6 +99,22 @@ teardown: --- "Setting upgrade_mode to enabled": + - do: + ml.start_datafeed: + datafeed_id: set-upgrade-mode-job-datafeed + + - do: + cat.tasks: {} + - match: + $body: | + /.+job.+/ + + - do: + cat.tasks: {} + - match: + $body: | + /.+datafeed.+/ + - do: ml.info: {} - match: { upgrade_mode: false } @@ -125,6 +148,22 @@ teardown: --- "Setting upgrade mode to disabled from enabled": + - do: + ml.start_datafeed: + datafeed_id: set-upgrade-mode-job-datafeed + + - do: + cat.tasks: {} + - match: + $body: | + /.+job.+/ + + - do: + cat.tasks: {} + - match: + $body: | + /.+datafeed.+/ + - do: ml.set_upgrade_mode: enabled: true