Skip to content

Commit

Permalink
Merge branch 'master' into eight-dot-oh
Browse files Browse the repository at this point in the history
* master:
  ML: update set_upgrade_mode, add logging (elastic#38372)
  bad formatted JSON object (elastic#38515) (elastic#38525)
  Fix HistoryIntegrationTests timestamp comparsion (elastic#38505)
  SQL: Fix issue with IN not resolving to underlying keyword field (elastic#38440)
  Fix the clock resolution to millis in ScheduledEventTests (elastic#38506)
  • Loading branch information
jasontedor committed Feb 6, 2019
2 parents 95c2796 + 3bf8e24 commit bde5654
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 54 deletions.
4 changes: 2 additions & 2 deletions docs/reference/ingest/processors/foreach.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ block to send the document to the 'failure_index' index for later inspection:
"on_failure" : [
{
"set" : {
"field", "_index",
"value", "failure_index"
"field": "_index",
"value": "failure_index"
}
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class Messages {
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated";
public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''";
public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}";
public static final String JOB_AUDIT_DELETED = "Job deleted";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.EnumSet;
Expand All @@ -28,7 +29,7 @@
public class ScheduledEventTests extends AbstractSerializingTestCase<ScheduledEvent> {

public static ScheduledEvent createScheduledEvent(String calendarId) {
ZonedDateTime start = Clock.systemUTC().instant().atZone(ZoneOffset.UTC);
ZonedDateTime start = nowWithMillisResolution();
return new ScheduledEvent(randomAlphaOfLength(10), start, start.plusSeconds(randomIntBetween(1, 10000)),
calendarId, null);
}
Expand Down Expand Up @@ -119,4 +120,8 @@ public void testLenientParser() throws IOException {
ScheduledEvent.LENIENT_PARSER.apply(parser, null);
}
}

private static ZonedDateTime nowWithMillisResolution() {
return Instant.ofEpochMilli(Clock.systemUTC().millis()).atZone(ZoneOffset.UTC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public synchronized void stop() {
// datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds
// could fail if they send data to a dead autodetect process.
if (datafeedManager != null) {
datafeedManager.isolateAllDatafeedsOnThisNode();
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
}
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
if (nativeController != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe
.sorted(Comparator.comparing(PersistentTask::getTaskName))
.collect(Collectors.toList());

logger.info("Un-assigning persistent tasks : " +
datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));

TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
r -> true,
Expand All @@ -287,6 +290,7 @@ private void isolateDatafeeds(PersistentTasksCustomMetaData tasksCustomMetaData,
ActionListener<List<IsolateDatafeedAction.Response>> listener) {
Set<String> datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData);

logger.info("Isolating datafeeds: " + datafeedsToIsolate.toString());
TypedChainTaskExecutor<IsolateDatafeedAction.Response> isolateDatafeedsExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class DatafeedManager {
private final DatafeedJobBuilder datafeedJobBuilder;
private final TaskRunner taskRunner = new TaskRunner();
private final AutodetectProcessManager autodetectProcessManager;
private volatile boolean isolated;

public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
Expand Down Expand Up @@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) {
* This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves
* the datafeed tasks in the "started" state, so that they get restarted on a different node.
*/
public void isolateAllDatafeedsOnThisNode() {
isolated = true;
public void isolateAllDatafeedsOnThisNodeBeforeShutdown() {
Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator();
while (iter.hasNext()) {
Holder next = iter.next();
next.isolateDatafeed();
next.setRelocating();
// TODO: it's not ideal that this "isolate" method does something a bit different to the one below
next.setNodeIsShuttingDown();
iter.remove();
}
}

public void isolateDatafeed(long allocationId) {
// This calls get() rather than remove() because we expect that the persistent task will
// be removed shortly afterwards and that operation needs to be able to find the holder
Holder holder = runningDatafeedsOnThisNode.get(allocationId);
if (holder != null) {
holder.isolateDatafeed();
Expand Down Expand Up @@ -195,7 +196,7 @@ protected void doRun() {
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
return;
}
if (isolated == false) {
if (holder.isIsolated() == false) {
if (next != null) {
doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder);
} else {
Expand Down Expand Up @@ -298,7 +299,7 @@ public class Holder {
private final ProblemTracker problemTracker;
private final Consumer<Exception> finishHandler;
volatile Scheduler.Cancellable cancellable;
private volatile boolean isRelocating;
private volatile boolean isNodeShuttingDown;

Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob,
ProblemTracker problemTracker, Consumer<Exception> finishHandler) {
Expand All @@ -324,7 +325,7 @@ boolean isIsolated() {
}

public void stop(String source, TimeValue timeout, Exception e) {
if (isRelocating) {
if (isNodeShuttingDown) {
return;
}

Expand All @@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) {
if (cancellable != null) {
cancellable.cancel();
}
auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
auditor.info(datafeedJob.getJobId(),
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
finishHandler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");
if (autoCloseJob) {
if (autoCloseJob && isIsolated() == false) {
closeJob();
}
if (acquired) {
Expand All @@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) {
}

/**
* This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called
* immediately prior to shutting down a node. Then the datafeed task can remain "started", and be
* relocated to a different node. Calling this method at any other time will ruin the datafeed.
* This stops a datafeed WITHOUT updating the corresponding persistent task. When called it
* will stop the datafeed from sending data to its job as quickly as possible. The caller
* must do something sensible with the corresponding persistent task. If the node is shutting
* down the task will automatically get reassigned. Otherwise the caller must take action to
* remove or reassign the persistent task, or the datafeed will be left in limbo.
*/
public void isolateDatafeed() {
datafeedJob.isolate();
}

public void setRelocating() {
isRelocating = true;
public void setNodeIsShuttingDown() {
isNodeShuttingDown = true;
}

private Long executeLookBack(long startTime, Long endTime) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.sql.expression.predicate.operator.comparison;

import org.elasticsearch.xpack.sql.analysis.index.MappingException;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.Expressions;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.expression.Foldables;
import org.elasticsearch.xpack.sql.expression.Nullability;
import org.elasticsearch.xpack.sql.expression.function.scalar.ScalarFunction;
Expand Down Expand Up @@ -105,6 +107,27 @@ protected Pipe makePipe() {
return new InPipe(source(), this, children().stream().map(Expressions::pipe).collect(Collectors.toList()));
}

@Override
protected TypeResolution resolveType() {
if (value instanceof FieldAttribute) {
try {
((FieldAttribute) value).exactAttribute();
} catch (MappingException e) {
return new TypeResolution(format(null, "[{}] cannot operate on field of data type [{}]: {}",
functionName(), value().dataType().esType, e.getMessage()));
}
}

for (Expression ex : list) {
if (ex.foldable() == false) {
return new TypeResolution(format(null, "Comparisons against variables are not (currently) supported; offender [{}] in [{}]",
Expressions.name(ex),
name()));
}
}
return super.resolveType();
}

@Override
public int hashCode() {
return Objects.hash(value, list);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Supplier;

import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -708,16 +707,6 @@ static class InComparisons extends ExpressionTranslator<In> {

@Override
protected QueryTranslation asQuery(In in, boolean onAggs) {
Optional<Expression> firstNotFoldable = in.list().stream().filter(expression -> !expression.foldable()).findFirst();

if (firstNotFoldable.isPresent()) {
throw new SqlIllegalArgumentException(
"Line {}:{}: Comparisons against variables are not (currently) supported; offender [{}] in [{}]",
firstNotFoldable.get().sourceLocation().getLineNumber(),
firstNotFoldable.get().sourceLocation().getColumnNumber(),
Expressions.name(firstNotFoldable.get()),
in.name());
}

if (in.value() instanceof NamedExpression) {
NamedExpression ne = (NamedExpression) in.value();
Expand All @@ -735,7 +724,9 @@ protected QueryTranslation asQuery(In in, boolean onAggs) {
else {
Query q = null;
if (in.value() instanceof FieldAttribute) {
q = new TermsQuery(in.source(), ne.name(), in.list());
FieldAttribute fa = (FieldAttribute) in.value();
// equality should always be against an exact match (which is important for strings)
q = new TermsQuery(in.source(), fa.isInexact() ? fa.exactAttribute().name() : fa.name(), in.list());
} else {
q = new ScriptQuery(in.source(), in.asScript());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,23 +385,34 @@ public void testInNestedWithDifferentDataTypesFromLeftValue_SelectClause() {
}

public void testInWithDifferentDataTypes_WhereClause() {
assertEquals("1:49: expected data type [text], value provided is of type [integer]",
error("SELECT * FROM test WHERE text IN ('foo', 'bar', 4)"));
assertEquals("1:52: expected data type [keyword], value provided is of type [integer]",
error("SELECT * FROM test WHERE keyword IN ('foo', 'bar', 4)"));
}

public void testInNestedWithDifferentDataTypes_WhereClause() {
assertEquals("1:60: expected data type [text], value provided is of type [integer]",
error("SELECT * FROM test WHERE int = 1 OR text IN ('foo', 'bar', 2)"));
assertEquals("1:63: expected data type [keyword], value provided is of type [integer]",
error("SELECT * FROM test WHERE int = 1 OR keyword IN ('foo', 'bar', 2)"));
}

public void testInWithDifferentDataTypesFromLeftValue_WhereClause() {
assertEquals("1:35: expected data type [text], value provided is of type [integer]",
error("SELECT * FROM test WHERE text IN (1, 2)"));
assertEquals("1:38: expected data type [keyword], value provided is of type [integer]",
error("SELECT * FROM test WHERE keyword IN (1, 2)"));
}

public void testInNestedWithDifferentDataTypesFromLeftValue_WhereClause() {
assertEquals("1:46: expected data type [text], value provided is of type [integer]",
error("SELECT * FROM test WHERE int = 1 OR text IN (1, 2)"));
assertEquals("1:49: expected data type [keyword], value provided is of type [integer]",
error("SELECT * FROM test WHERE int = 1 OR keyword IN (1, 2)"));
}

public void testInWithFieldInListOfValues() {
assertEquals("1:26: Comparisons against variables are not (currently) supported; offender [int] in [int IN (1, int)]",
error("SELECT * FROM test WHERE int IN (1, int)"));
}

public void testInOnFieldTextWithNoKeyword() {
assertEquals("1:26: [IN] cannot operate on field of data type [text]: " +
"No keyword/multi-field defined exact matches for [text]; define one or use MATCH/QUERY instead",
error("SELECT * FROM test WHERE text IN ('foo', 'bar')"));
}

public void testNotSupportedAggregateOnDate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ public void testTranslateInExpression_WhereClause() {
tq.asBuilder().toString().replaceAll("\\s", ""));
}

public void testTranslateInExpression_WhereClauseAndNullHandling() {
LogicalPlan p = plan("SELECT * FROM test WHERE keyword IN ('foo', null, 'lala', null, 'foo', concat('la', 'la'))");
public void testTranslateInExpression_WhereClause_TextFieldWithKeyword() {
LogicalPlan p = plan("SELECT * FROM test WHERE some.string IN ('foo', 'bar', 'lala', 'foo', concat('la', 'la'))");
assertTrue(p instanceof Project);
assertTrue(p.children().get(0) instanceof Filter);
Expression condition = ((Filter) p.children().get(0)).condition();
Expand All @@ -319,21 +319,22 @@ public void testTranslateInExpression_WhereClauseAndNullHandling() {
Query query = translation.query;
assertTrue(query instanceof TermsQuery);
TermsQuery tq = (TermsQuery) query;
assertEquals("{\"terms\":{\"keyword\":[\"foo\",\"lala\"],\"boost\":1.0}}",
assertEquals("{\"terms\":{\"some.string.typical\":[\"foo\",\"bar\",\"lala\"],\"boost\":1.0}}",
tq.asBuilder().toString().replaceAll("\\s", ""));
}

public void testTranslateInExpressionInvalidValues_WhereClause() {
LogicalPlan p = plan("SELECT * FROM test WHERE keyword IN ('foo', 'bar', keyword)");
public void testTranslateInExpression_WhereClauseAndNullHandling() {
LogicalPlan p = plan("SELECT * FROM test WHERE keyword IN ('foo', null, 'lala', null, 'foo', concat('la', 'la'))");
assertTrue(p instanceof Project);
assertTrue(p.children().get(0) instanceof Filter);
Expression condition = ((Filter) p.children().get(0)).condition();
assertFalse(condition.foldable());
SqlIllegalArgumentException ex = expectThrows(SqlIllegalArgumentException.class, () -> QueryTranslator.toQuery(condition, false));
assertEquals(
"Line 1:52: Comparisons against variables are not (currently) supported; "
+ "offender [keyword] in [keyword IN ('foo', 'bar', keyword)]",
ex.getMessage());
QueryTranslation translation = QueryTranslator.toQuery(condition, false);
Query query = translation.query;
assertTrue(query instanceof TermsQuery);
TermsQuery tq = (TermsQuery) query;
assertEquals("{\"terms\":{\"keyword\":[\"foo\",\"lala\"],\"boost\":1.0}}",
tq.asBuilder().toString().replaceAll("\\s", ""));
}

public void testTranslateInExpression_WhereClause_Painless() {
Expand Down
Loading

0 comments on commit bde5654

Please sign in to comment.