Skip to content

Commit

Permalink
refactor: allow for more specific error messages to be sent when the…
Browse files Browse the repository at this point in the history
… server is unavailable (confluentinc#3154)
  • Loading branch information
stevenpyzhang committed Aug 6, 2019
1 parent 6244dfb commit 9d4ce6d
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 31 deletions.
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
Expand Down Expand Up @@ -215,13 +216,13 @@ private KsqlFailedPrecondition(final String message) {

private void checkPreconditions() {
for (final KsqlServerPrecondition precondition : preconditions) {
final Optional<String> error = precondition.checkPrecondition(
final Optional<KsqlErrorMessage> error = precondition.checkPrecondition(
config,
serviceContext
);
if (error.isPresent()) {
serverState.setInitializingReason(error.get());
throw new KsqlFailedPrecondition(error.get());
throw new KsqlFailedPrecondition(error.get().toString());
}
}
}
Expand Down
Expand Up @@ -15,7 +15,9 @@

package io.confluent.ksql.rest.server;

import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.services.ServiceContext;

import java.util.Optional;

public interface KsqlServerPrecondition {
Expand All @@ -24,10 +26,10 @@ public interface KsqlServerPrecondition {
*
* @param config The KSQL server config
* @param serviceContext The KSQL server context for accessing external serivces
* @return Optional.empty() if precondition check passes, non-empty reason string if the
* @return Optional.empty() if precondition check passes, non-empty KsqlErrorMessage object if the
* check does not pass.
*/
Optional<String> checkPrecondition(
Optional<KsqlErrorMessage> checkPrecondition(
KsqlRestConfig config,
ServiceContext serviceContext
);
Expand Down
Expand Up @@ -164,10 +164,10 @@ public static Response serverShuttingDown() {
.build();
}

public static Response serverNotReady(final String reason) {
public static Response serverNotReady(final KsqlErrorMessage error) {
return Response
.status(SERVICE_UNAVAILABLE)
.entity(new KsqlErrorMessage(ERROR_CODE_SERVER_NOT_READY, reason))
.entity(error)
.build();
}
}
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.server.state;

import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.resources.Errors;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -35,23 +36,28 @@ public enum State {
TERMINATING
}

private static final class StateWithReason {
private static final class StateWithErrorMessage {
final State state;
final String reason;
final KsqlErrorMessage errorMessage;

private StateWithReason(final State state, final String reason) {
private StateWithErrorMessage(final State state, final KsqlErrorMessage error) {
this.state = state;
this.reason = reason;
this.errorMessage = error;
}

private StateWithReason(final State state) {
private StateWithErrorMessage(final State state) {
this.state = state;
this.reason = null;
this.errorMessage = null;
}
}

private final AtomicReference<StateWithReason> state = new AtomicReference<>(
new StateWithReason(State.INITIALIZING, "KSQL is not yet ready to serve requests.")
private final AtomicReference<StateWithErrorMessage> state = new AtomicReference<>(
new StateWithErrorMessage(State.INITIALIZING,
new KsqlErrorMessage(
Errors.ERROR_CODE_SERVER_NOT_READY,
"KSQL is not yet ready to serve requests."
)
)
);

/**
Expand All @@ -61,28 +67,29 @@ public ServerState() {
}

/**
* Set a reason string explaining why the server is still in the INITIALIZING state.
* This reason string will be used to add context to the API error indicating that the server
* is not yet ready to serve requests.
* Set a KsqlErrorMessage object containing a reason string explaining why the server
* is still in the INITIALIZING state and an error code. This reason string and corresponding
* error code will be used to add context to the API error indicating that the server is not
* yet ready to serve requests.
*
* @param initializingReason The reason string indicating why the server is still initializing.
* @param error KsqlErrorMessage object containing the error code and the error string.
*/
public void setInitializingReason(final String initializingReason) {
this.state.set(new StateWithReason(State.INITIALIZING, initializingReason));
public void setInitializingReason(final KsqlErrorMessage error) {
this.state.set(new StateWithErrorMessage(State.INITIALIZING, error));
}

/**
* Sets the server state to READY.
*/
public void setReady() {
this.state.set(new StateWithReason(State.READY));
this.state.set(new StateWithErrorMessage(State.READY));
}

/**
* Sets the server state to TERMINATING.
*/
public void setTerminating() {
this.state.set(new StateWithReason(State.TERMINATING));
this.state.set(new StateWithErrorMessage(State.TERMINATING));
}

/**
Expand All @@ -93,10 +100,10 @@ public void setTerminating() {
* to be returned back to the user.
*/
public Optional<Response> checkReady() {
final StateWithReason state = this.state.get();
final StateWithErrorMessage state = this.state.get();
switch (state.state) {
case INITIALIZING:
return Optional.of(Errors.serverNotReady(state.reason));
return Optional.of(Errors.serverNotReady(state.errorMessage));
case TERMINATING:
return Optional.of(Errors.serverShuttingDown());
default:
Expand Down
Expand Up @@ -37,6 +37,7 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.QueuedCommandStatus;
Expand Down Expand Up @@ -345,9 +346,11 @@ public void shouldCheckPreconditionsBeforeUsingServiceContext() {
@Test
public void shouldNotInitializeUntilPreconditionsChecked() {
// Given:
final Queue<String> errors = new LinkedList<>();
errors.add("error1");
errors.add("error2");
KsqlErrorMessage error1 = new KsqlErrorMessage(50000, "error1");
KsqlErrorMessage error2 = new KsqlErrorMessage(50000, "error2");
final Queue<KsqlErrorMessage> errors = new LinkedList<>();
errors.add(error1);
errors.add(error2);
when(precondition2.checkPrecondition(any(), any())).then(a -> {
verifyZeroInteractions(serviceContext);
return Optional.ofNullable(errors.isEmpty() ? null : errors.remove());
Expand All @@ -360,10 +363,10 @@ public void shouldNotInitializeUntilPreconditionsChecked() {
final InOrder inOrder = Mockito.inOrder(precondition1, precondition2, serverState);
inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext);
inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext);
inOrder.verify(serverState).setInitializingReason("error1");
inOrder.verify(serverState).setInitializingReason(error1);
inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext);
inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext);
inOrder.verify(serverState).setInitializingReason("error2");
inOrder.verify(serverState).setInitializingReason(error2);
inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext);
inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext);
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.resources.Errors;
import java.util.Optional;
import javax.ws.rs.core.Response;
Expand All @@ -30,15 +31,16 @@ public class ServerStateTest {
@Test
public void shouldReturnErrorWhenInitializing() {
// Given:
serverState.setInitializingReason("bad stuff is happening");
KsqlErrorMessage error = new KsqlErrorMessage(12345, "bad stuff is happening");
serverState.setInitializingReason(error);

// When:
final Optional<Response> result = serverState.checkReady();

// Then:
assertThat(result.isPresent(), is(true));
final Response response = result.get();
final Response expected = Errors.serverNotReady("bad stuff is happening");
final Response expected = Errors.serverNotReady(error);
assertThat(response.getStatus(), equalTo(expected.getStatus()));
assertThat(response.getEntity(), equalTo(expected.getEntity()));
}
Expand Down

0 comments on commit 9d4ce6d

Please sign in to comment.