Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/master' into retention-lease-pe…
Browse files Browse the repository at this point in the history
…rsistence

* elastic/master:
  Fix PrimaryAllocationIT Race Condition (elastic#37355)
  SQL: Make `FULL` non-reserved keyword in the grammar (elastic#37377)
  SQL: [Tests] Fix and enable internalClusterTests (elastic#37300)
  ML: Fix testMigrateConfigs  (elastic#37373)
  Fix RollupDocumentation test to wait for job to stop
  • Loading branch information
jasontedor committed Jan 12, 2019
2 parents 5c226d9 + 63fe3c6 commit d516cd3
Show file tree
Hide file tree
Showing 22 changed files with 198 additions and 107 deletions.
Expand Up @@ -261,6 +261,14 @@ public void testStartRollupJob() throws Exception {
} catch (Exception e) {
// Swallow any exception, this test does not test actually cancelling.
}
// stop job to prevent spamming exceptions on next start request
StopRollupJobRequest stopRequest = new StopRollupJobRequest(id);
stopRequest.waitForCompletion();
stopRequest.timeout(TimeValue.timeValueSeconds(10));

StopRollupJobResponse response = client.rollup().stopRollupJob(stopRequest, RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());

// tag::rollup-start-job-execute-listener
ActionListener<StartRollupJobResponse> listener = new ActionListener<StartRollupJobResponse>() {
@Override
Expand All @@ -282,7 +290,8 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));

// stop job so it can correctly be deleted by the test teardown
rc.stopRollupJob(new StopRollupJobRequest(id), RequestOptions.DEFAULT);
response = rc.stopRollupJob(stopRequest, RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());
}

@SuppressWarnings("unused")
Expand Down
Expand Up @@ -113,7 +113,11 @@ private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListene
for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
final String index = entry.getKey();
final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
assert indexStatus != null;
if (indexStatus == null) {
// The index in the stale primary allocation request was green and hence filtered out by the store status
// request. We ignore it here since the relevant exception will be thrown by the reroute action later on.
continue;
}
for (AbstractAllocateAllocationCommand command : entry.getValue()) {
final List<IndicesShardStoresResponse.StoreStatus> shardStatus =
indexStatus.get(command.shardId());
Expand Down
Expand Up @@ -265,7 +265,6 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
assertThat(newHistoryUUIds, hasSize(1));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37345")
public void testForceStaleReplicaToBePromotedToPrimaryOnWrongNode() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);
Expand All @@ -275,7 +274,10 @@ public void testForceStaleReplicaToBePromotedToPrimaryOnWrongNode() throws Excep
.put("index.number_of_replicas", 1)).get());
ensureGreen();
createStaleReplicaScenario(master);
internalCluster().startDataOnlyNodes(2);
// Ensure the stopped primary's data is deleted so that it doesn't get picked up by the next datanode we start
internalCluster().wipePendingDataDirectories();
internalCluster().startDataOnlyNodes(1);
ensureStableCluster(3, master);
final int shardId = 0;
final List<String> nodeNames = new ArrayList<>(Arrays.asList(internalCluster().getNodeNames()));
nodeNames.remove(master);
Expand All @@ -292,6 +294,25 @@ public void testForceStaleReplicaToBePromotedToPrimaryOnWrongNode() throws Excep
equalTo("No data for shard [" + shardId + "] of index [" + idxName + "] found on node [" + nodeWithoutData + ']'));
}

public void testForceStaleReplicaToBePromotedForGreenIndex() {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
final String idxName = "test";
assertAcked(client().admin().indices().prepareCreate(idxName)
.setSettings(Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)).get());
ensureGreen();
final String nodeWithoutData = randomFrom(dataNodes);
final int shardId = 0;
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareReroute()
.add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, nodeWithoutData, true)).get());
assertThat(
iae.getMessage(),
equalTo("[allocate_stale_primary] primary [" + idxName+ "][" + shardId + "] is already assigned"));
}

public void testForceStaleReplicaToBePromotedForMissingIndex() {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
final String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Expand Up @@ -1398,8 +1398,7 @@ private void randomlyResetClients() {
}
}

private void wipePendingDataDirectories() {
assert Thread.holdsLock(this);
public synchronized void wipePendingDataDirectories() {
if (!dataDirToClean.isEmpty()) {
try {
for (Path path : dataDirToClean) {
Expand Down
Expand Up @@ -137,7 +137,8 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
case INDEXING:
case STOPPING:
case ABORTING:
logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running.");
logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running " +
"(with state [" + currentState + "]");
return false;

case STOPPED:
Expand Down Expand Up @@ -381,8 +382,7 @@ private boolean checkState(IndexerState currentState) {

case STOPPING:
logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer.");
doSaveState(finishAndSetState(), getPosition(), () -> {
});
doSaveState(finishAndSetState(), getPosition(), () -> {});
return false;

case STOPPED:
Expand Down
Expand Up @@ -8,9 +8,11 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -455,8 +457,9 @@ public void testVerifyIndicesPrimaryShardsAreActive() {

metaData = new MetaData.Builder(cs.metaData());
routingTable = new RoutingTable.Builder(cs.routingTable());

String indexToRemove = randomFrom(TransportOpenJobAction.indicesOfInterest(".ml-anomalies-shared"));
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
String indexToRemove = randomFrom(indexNameExpressionResolver.concreteIndexNames(cs, IndicesOptions.lenientExpandOpen(),
TransportOpenJobAction.indicesOfInterest(".ml-anomalies-shared")));
if (randomBoolean()) {
routingTable.remove(indexToRemove);
} else {
Expand Down
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;

public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Expand Down Expand Up @@ -399,8 +400,9 @@ public void testMlStateAndResultsIndicesNotAvailable() throws Exception {
String detailedMessage = detail.getMessage();
assertTrue(detailedMessage,
detailedMessage.startsWith("Could not open job because no suitable nodes were found, allocation explanation"));
assertTrue(detailedMessage, detailedMessage.endsWith("because not all primary shards are active for the following indices " +
"[.ml-state,.ml-anomalies-shared]]"));
assertThat(detailedMessage, containsString("because not all primary shards are active for the following indices"));
assertThat(detailedMessage, containsString(".ml-state"));
assertThat(detailedMessage, containsString(".ml-anomalies-shared"));

logger.info("Start data node");
String nonMlNode = internalCluster().startNode(Settings.builder()
Expand Down
Expand Up @@ -120,7 +120,6 @@ public void testWriteConfigToIndex() throws InterruptedException {
assertNull(alreadyMigratedJob.getCustomSettings());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37374")
public void testMigrateConfigs() throws InterruptedException, IOException {
// and jobs and datafeeds clusterstate
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Expand Down Expand Up @@ -311,7 +310,7 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int
}

public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException {
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobStateIndexPattern()).execute();
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobStateIndexPattern()).get();
SearchResponse searchResponse = client()
.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setTypes(ElasticsearchMappings.DOC_TYPE)
Expand Down
Expand Up @@ -81,10 +81,15 @@ private static ActionListener<StopRollupJobAction.Response> maybeWrapWithBlockin
listener.onResponse(response);
} else {
listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep()
+ "] while waiting for rollup job [" + request.getId() + "] to stop"));
+ "] while waiting for rollup job [" + request.getId() + "] to stop. State was ["
+ ((RollupJobStatus) jobTask.getStatus()).getIndexerState() + "]"));
}
} catch (InterruptedException e) {
listener.onFailure(e);
} catch (Exception e) {
listener.onFailure(new ElasticsearchTimeoutException("Encountered unexpected error while waiting for " +
"rollup job [" + request.getId() + "] to stop. State was ["
+ ((RollupJobStatus) jobTask.getStatus()).getIndexerState() + "].", e));
}
});

Expand Down
7 changes: 5 additions & 2 deletions x-pack/plugin/sql/build.gradle
Expand Up @@ -21,10 +21,13 @@ archivesBaseName = 'x-pack-sql'
integTest.enabled = false

task internalClusterTest(type: RandomizedTestingTask,
group: JavaBasePlugin.VERIFICATION_GROUP
) {
group: JavaBasePlugin.VERIFICATION_GROUP,
dependsOn: unitTest.dependsOn) {
include '**/*IT.class'
systemProperty 'es.set.netty.runtime.available.processors', 'false'
}
check.dependsOn internalClusterTest
internalClusterTest.mustRunAfter test

dependencies {
// "org.elasticsearch.plugin:x-pack-core:${version}" doesn't work with idea because the testArtifacts are also here
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/sql/src/main/antlr/SqlBase.g4
Expand Up @@ -342,7 +342,7 @@ nonReserved
| CATALOGS | COLUMNS | CURRENT
| DAY | DEBUG
| EXECUTABLE | EXPLAIN
| FIRST | FORMAT | FUNCTIONS
| FIRST | FORMAT | FULL | FUNCTIONS
| GRAPHVIZ
| HOUR
| INTERVAL
Expand Down

0 comments on commit d516cd3

Please sign in to comment.