Skip to content

Commit

Permalink
couchbase: wait until query engine knows about bucket before creating… (
Browse files Browse the repository at this point in the history
#2662)

The motivation for this change is that in slow environments, it could be that the
query engine is not aware of the just created bucket yet and so the create primary
index will take longer than the 10s read timeout for the subsequent http request.

As a result, this is just another precaution to minimize the chances that in
slow environments the create primary index call times out with a read timeout.
  • Loading branch information
daschl committed May 29, 2020
1 parent 6e1f8a0 commit 835ac71
Showing 1 changed file with 30 additions and 15 deletions.
Expand Up @@ -19,12 +19,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import lombok.Cleanup;
import okhttp3.Credentials;
import okhttp3.FormBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
Expand All @@ -36,6 +38,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -224,7 +227,7 @@ private void waitUntilNodeIsOnline() {
private void renameNode() {
logger().debug("Renaming Couchbase Node from localhost to {}", getHost());

Response response = doHttpRequest(MGMT_PORT, "/node/controller/rename", "POST", new FormBody.Builder()
@Cleanup Response response = doHttpRequest(MGMT_PORT, "/node/controller/rename", "POST", new FormBody.Builder()
.add("hostname", getInternalIpAddress())
.build(), false
);
Expand All @@ -248,7 +251,7 @@ private void initializeServices() {
}
}).collect(Collectors.joining(","));

Response response = doHttpRequest(MGMT_PORT, "/node/controller/setupServices", "POST", new FormBody.Builder()
@Cleanup Response response = doHttpRequest(MGMT_PORT, "/node/controller/setupServices", "POST", new FormBody.Builder()
.add("services", services)
.build(), false
);
Expand All @@ -264,7 +267,7 @@ private void initializeServices() {
private void configureAdminUser() {
logger().debug("Configuring couchbase admin user with username: \"{}\"", username);

Response response = doHttpRequest(MGMT_PORT, "/settings/web", "POST", new FormBody.Builder()
@Cleanup Response response = doHttpRequest(MGMT_PORT, "/settings/web", "POST", new FormBody.Builder()
.add("username", username)
.add("password", password)
.add("port", Integer.toString(MGMT_PORT))
Expand Down Expand Up @@ -305,7 +308,7 @@ private void configureExternalPorts() {
builder.add("ftsSSL", Integer.toString(getMappedPort(SEARCH_SSL_PORT)));
}

final Response response = doHttpRequest(
@Cleanup Response response = doHttpRequest(
MGMT_PORT,
"/node/controller/setupAlternateAddresses/external",
"PUT",
Expand All @@ -322,7 +325,7 @@ private void configureExternalPorts() {
private void configureIndexer() {
logger().debug("Configuring the indexer service");

Response response = doHttpRequest(MGMT_PORT, "/settings/indexes", "POST", new FormBody.Builder()
@Cleanup Response response = doHttpRequest(MGMT_PORT, "/settings/indexes", "POST", new FormBody.Builder()
.add("storageMode", "memory_optimized")
.build(), true
);
Expand All @@ -339,7 +342,7 @@ private void createBuckets() {
for (BucketDefinition bucket : buckets) {
logger().debug("Creating bucket \"" + bucket.getName() + "\"");

Response response = doHttpRequest(MGMT_PORT, "/pools/default/buckets", "POST", new FormBody.Builder()
@Cleanup Response response = doHttpRequest(MGMT_PORT, "/pools/default/buckets", "POST", new FormBody.Builder()
.add("name", bucket.getName())
.add("ramQuotaMB", Integer.toString(bucket.getQuota()))
.build(), true);
Expand All @@ -353,9 +356,27 @@ private void createBuckets() {
.forStatusCode(200)
.waitUntilReady(this);

if (enabledServices.contains(CouchbaseService.QUERY)) {
// If the query service is enabled, make sure that we only proceed if the query engine also
// knows about the bucket in its metadata configuration.
Unreliables.retryUntilTrue(1, TimeUnit.MINUTES, () -> {
@Cleanup Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "SELECT COUNT(*) > 0 as present FROM system:keyspaces WHERE name = \"" + bucket.getName() + "\"")
.build(), true);

String body = queryResponse.body() != null ? queryResponse.body().string() : null;
checkSuccessfulResponse(queryResponse, "Could not poll query service state for bucket: " + bucket.getName());

return Optional.of(MAPPER.readTree(body))
.map(n -> n.at("/results/0/present"))
.map(JsonNode::asBoolean)
.orElse(false);
});
}

if (bucket.hasPrimaryIndex()) {
if (enabledServices.contains(CouchbaseService.QUERY)) {
Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
@Cleanup Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "CREATE PRIMARY INDEX on `" + bucket.getName() + "`")
.build(), true);

Expand Down Expand Up @@ -384,14 +405,8 @@ private String getInternalIpAddress() {
* @param message the message that should be part of the exception of not successful.
*/
private void checkSuccessfulResponse(final Response response, final String message) {
try {
if (!response.isSuccessful()) {
throw new IllegalStateException(message + ": " + response.toString());
}
} finally {
if (response.body() != null) {
response.body().close();
}
if (!response.isSuccessful()) {
throw new IllegalStateException(message + ": " + response.toString());
}
}

Expand Down

0 comments on commit 835ac71

Please sign in to comment.