Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc/hamcrest 2.0 #2

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ httpcore = 4.4.5
httpasyncclient = 4.1.2
commonslogging = 1.1.3
commonscodec = 1.10
hamcrest = 1.3
hamcrest = 2.0.0.0
securemock = 1.2
# When updating mocksocket, please also update server/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy
mocksocket = 1.2
Expand Down
4 changes: 3 additions & 1 deletion client/rest-high-level/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dependencies {
testCompile "org.elasticsearch.test:framework:${version}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}"
testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}"
//this is needed to make RestHighLevelClientTests#testApiNamingConventions work from IDEs
testCompile "org.elasticsearch:rest-api-spec:${version}"
}
Expand All @@ -70,6 +70,8 @@ dependencyLicenses {
}
}

compileTestJava.options.compilerArgs << "-Xlint:-unchecked"

forbiddenApisMain {
// core does not depend on the httpclient for compile so we add the signatures here. We don't add them for test as they are already
// specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
Params parameters = new Params(request);
parameters.withTimeout(bulkRequest.timeout());
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());

parameters.withPipeline(bulkRequest.pipeline());
// Bulk API only supports newline delimited JSON or Smile. Before executing
// the bulk, we need to check that all requests have the same content-type
// and this content-type is supported by the Bulk API.
Expand Down Expand Up @@ -237,6 +237,13 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
return request;
}

private static String orDefaultToGlobal(String value, String globalDefault) {
if(Strings.isNullOrEmpty(value)){
return globalDefault;
}
return value;
}

static Request exists(GetRequest getRequest) {
return getStyleRequest(HttpHead.METHOD_NAME, getRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -44,10 +48,19 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import java.util.stream.IntStream;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
Expand All @@ -57,7 +70,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {

private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener);
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener);
}

public void testThatBulkProcessorCountIsCorrect() throws Exception {
Expand All @@ -66,10 +79,10 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception {

int numDocs = randomIntBetween(10, 100);
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.build()) {
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.build()) {

MultiGetRequest multiGetRequest = indexDocs(processor, numDocs);

Expand All @@ -90,9 +103,9 @@ public void testBulkProcessorFlush() throws Exception {
int numDocs = randomIntBetween(10, 100);

try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that this bulk won't be automatically flushed
.setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100))
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
//let's make sure that this bulk won't be automatically flushed
.setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100))
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {

MultiGetRequest multiGetRequest = indexDocs(processor, numDocs);

Expand Down Expand Up @@ -125,9 +138,9 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
MultiGetRequest multiGetRequest;

try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {

multiGetRequest = indexDocs(processor, numDocs);

Expand Down Expand Up @@ -165,11 +178,11 @@ public void testBulkProcessorWaitOnClose() throws Exception {

int numDocs = randomIntBetween(10, 100);
BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
RandomPicks.randomFrom(random(), ByteSizeUnit.values())))
.build();
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
RandomPicks.randomFrom(random(), ByteSizeUnit.values())))
.build();

MultiGetRequest multiGetRequest = indexDocs(processor, numDocs);
assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
Expand Down Expand Up @@ -220,20 +233,20 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);

try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {

for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
testDocs++;
processor.add(new IndexRequest("test", "test", Integer.toString(testDocs))
.source(XContentType.JSON, "field", "value"));
.source(XContentType.JSON, "field", "value"));
multiGetRequest.add("test", "test", Integer.toString(testDocs));
} else {
testReadOnlyDocs++;
processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs))
.source(XContentType.JSON, "field", "value"));
.source(XContentType.JSON, "field", "value"));
}
}
}
Expand Down Expand Up @@ -268,23 +281,86 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
public void testGlobalParametersAndBulkProcessor() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");

int numDocs = randomIntBetween(10, 10);
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setDefaultIndex("test")
.setDefaultType("test")
.setDefaultRouting("routing")
.setDefaultPipeline("pipeline_id")
.build()) {

indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test"))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}

@SuppressWarnings("unchecked")
private Matcher<SearchHit>[] expectedIds(int numDocs) {
return IntStream.rangeClosed(1, numDocs)
.boxed()
.map(n -> hasId(n.toString()))
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest("test", "test", Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
+ Strings.toString(JsonXContent.contentBuilder()
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n";
processor.add(new BytesArray(source), null, null, XContentType.JSON);
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
}
multiGetRequest.add("test", "test", Integer.toString(i));
multiGetRequest.add(localIndex, localType, Integer.toString(i));
}
return multiGetRequest;
}

private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
String action = Strings.toString(jsonBuilder()
.startObject()
.startObject("index")
.field("_index", localIndex)
.field("_type", localType)
.field("_id", Integer.toString(id))
.endObject()
.endObject()
);
String source = Strings.toString(jsonBuilder()
.startObject()
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
.endObject()
);

String request = action + "\n" + source + "\n";
return new BytesArray(request);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", "test", null, null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
Expand All @@ -293,7 +369,7 @@ private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses
assertThat(bulkItemResponse.getType(), equalTo("test"));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
bulkItemResponse.isFailed(), equalTo(false));
bulkItemResponse.isFailed(), equalTo(false));
}
}

Expand Down
Loading