Skip to content

Commit 7331e08

Browse files
Sam TunnicliffeMarcus Eriksson
authored andcommitted
[CEP-21] Add exception code to commit result if rejected
DDL statements submitted by clients may return one of several different error responses
1 parent 7e368cf commit 7331e08

23 files changed

+168
-129
lines changed

src/java/org/apache/cassandra/schema/Schema.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@
3434
import org.apache.cassandra.db.Keyspace;
3535
import org.apache.cassandra.db.SystemKeyspace;
3636
import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
37+
import org.apache.cassandra.exceptions.ConfigurationException;
3738
import org.apache.cassandra.exceptions.InvalidRequestException;
39+
import org.apache.cassandra.exceptions.SyntaxException;
40+
import org.apache.cassandra.exceptions.UnauthorizedException;
3841
import org.apache.cassandra.io.sstable.Descriptor;
3942
import org.apache.cassandra.locator.LocalStrategy;
4043
import org.apache.cassandra.tcm.ClusterMetadata;
@@ -289,8 +292,18 @@ public ClusterMetadata submit(SchemaTransformation transformation)
289292
return ClusterMetadataService.instance().commit(new AlterSchema(transformation, this),
290293
(metadata) -> true,
291294
(metadata) -> metadata,
292-
(metadata, reason) -> {
293-
throw new InvalidRequestException(reason);
295+
(metadata, code, reason) -> {
296+
switch (code)
297+
{
298+
case CONFIG_ERROR:
299+
throw new ConfigurationException(reason);
300+
case SYNTAX_ERROR:
301+
throw new SyntaxException(reason);
302+
case UNAUTHORIZED:
303+
throw new UnauthorizedException(reason);
304+
default:
305+
throw new InvalidRequestException(reason);
306+
}
294307
});
295308
}
296309

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ public void runMayThrow() throws InterruptedException, ExecutionException, IOExc
719719
ClusterMetadataService.instance().commit(getStartupSequence(finishJoiningRing),
720720
(metadata_) -> !metadata_.inProgressSequences.contains(self),
721721
(metadata_) -> null,
722-
(metadata_, reason) -> {
722+
(metadata_, code, reason) -> {
723723
throw new IllegalStateException(String.format("Can not commit event to metadata service: %s. Interrupting startup sequence.",
724724
reason));
725725
});
@@ -3584,7 +3584,7 @@ public void decommission(boolean force)
35843584
ClusterMetadataService.instance().placementProvider()),
35853585
(metadata_) -> !metadata_.inProgressSequences.contains(self),
35863586
(metadata_) -> null,
3587-
(metadata_, reason) -> {
3587+
(metadata_, code, reason) -> {
35883588
throw new IllegalStateException(String.format("Can not commit event to metadata service: %s. Interrupting leave sequence.",
35893589
reason));
35903590
});
@@ -3705,7 +3705,7 @@ private void move(Token newToken)
37053705
true),
37063706
(metadata_) -> !metadata_.inProgressSequences.contains(self),
37073707
(metadata_) -> null,
3708-
(metadata_, reason) -> {
3708+
(metadata_, code, reason) -> {
37093709
throw new IllegalStateException(String.format("Can not commit event to metadata service: %s. Interrupting leave sequence.",
37103710
reason));
37113711
});

src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.cassandra.tcm.log.LocalLog;
2828
import org.apache.cassandra.tcm.log.Replication;
2929
import org.apache.cassandra.utils.FBUtilities;
30+
import org.apache.cassandra.utils.JVMStabilityInspector;
31+
32+
import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR;
3033

3134
public abstract class AbstractLocalProcessor implements Processor
3235
{
@@ -51,7 +54,8 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
5154
catch (Throwable e)
5255
{
5356
logger.error("Caught error while trying to perform a local commit", e);
54-
return new Commit.Result.Failure(e.getMessage(), false);
57+
JVMStabilityInspector.inspectThrowable(e);
58+
return new Commit.Result.Failure(SERVER_ERROR, e.getMessage(), false);
5559
}
5660

5761
if (result.isSuccess())
@@ -72,7 +76,7 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi
7276
}
7377
else
7478
{
75-
return new Commit.Result.Failure(result.rejected().reason, true);
79+
return new Commit.Result.Failure(result.rejected().code, result.rejected().reason, true);
7680
}
7781
}
7882

src/java/org/apache/cassandra/tcm/ClusterMetadataService.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Map;
2424
import java.util.Set;
2525
import java.util.concurrent.TimeoutException;
26-
import java.util.function.BiFunction;
2726
import java.util.function.Function;
2827
import java.util.function.Predicate;
2928
import java.util.function.Supplier;
@@ -33,6 +32,7 @@
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534

35+
import org.apache.cassandra.exceptions.ExceptionCode;
3636
import org.apache.cassandra.locator.InetAddressAndPort;
3737
import org.apache.cassandra.net.IVerbHandler;
3838
import org.apache.cassandra.net.Message;
@@ -307,12 +307,22 @@ public ClusterMetadata commit(Transformation transform)
307307
return commit(transform,
308308
(metadata) -> false,
309309
(metadata) -> metadata,
310-
(metadata, reason) -> {
310+
(metadata, code, reason) -> {
311311
throw new IllegalStateException(reason);
312312
});
313313
}
314314

315-
public <T1> T1 commit(Transformation transform, Predicate<ClusterMetadata> retry, Function<ClusterMetadata, T1> onSuccess, BiFunction<ClusterMetadata, String, T1> onReject)
315+
public interface CommitSuccessHandler<T>
316+
{
317+
T accept(ClusterMetadata latest);
318+
}
319+
320+
public interface CommitRejectionHandler<T>
321+
{
322+
T accept(ClusterMetadata latest, ExceptionCode code, String message);
323+
}
324+
325+
public <T1> T1 commit(Transformation transform, Predicate<ClusterMetadata> retry, CommitSuccessHandler<T1> onSuccess, CommitRejectionHandler<T1> onReject)
316326
{
317327
Retry.Backoff backoff = new Retry.Backoff();
318328
while (!backoff.reachedMax())
@@ -325,7 +335,7 @@ public <T1> T1 commit(Transformation transform, Predicate<ClusterMetadata> retry
325335
{
326336
try
327337
{
328-
return onSuccess.apply(awaitAtLeast(result.success().epoch));
338+
return onSuccess.accept(awaitAtLeast(result.success().epoch));
329339
}
330340
catch (TimeoutException t)
331341
{
@@ -343,7 +353,7 @@ public <T1> T1 commit(Transformation transform, Predicate<ClusterMetadata> retry
343353
ClusterMetadata metadata = replayAndWait();
344354

345355
if (result.failure().rejected)
346-
return onReject.apply(metadata, result.failure().message);
356+
return onReject.accept(metadata, result.failure().code, result.failure().message);
347357

348358
if (!retry.test(metadata))
349359
throw new IllegalStateException(String.format("Committing transformation %s failed and retry criteria was not satisfied. Current tries: %s", transform, backoff.tries + 1));
@@ -485,7 +495,7 @@ public ClusterMetadata sealPeriod()
485495
return ClusterMetadataService.instance.commit(SealPeriod.instance,
486496
(ClusterMetadata metadata) -> metadata.lastInPeriod,
487497
(ClusterMetadata metadata) -> metadata,
488-
(metadata, reason) -> {
498+
(metadata, code, reason) -> {
489499
// If the transformation got rejected, someone else has beat us to seal this period
490500
return metadata;
491501
});

src/java/org/apache/cassandra/tcm/Commit.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import org.apache.cassandra.db.TypeSizes;
30+
import org.apache.cassandra.exceptions.ExceptionCode;
3031
import org.apache.cassandra.io.IVersionedSerializer;
3132
import org.apache.cassandra.io.util.DataInputPlus;
3233
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -144,13 +145,15 @@ public boolean isFailure()
144145

145146
final class Failure implements Result
146147
{
148+
public final ExceptionCode code;
147149
public final String message;
148150
// Rejection means that we were able to linearize the operation,
149151
// but it was rejected by the internal logic of the transformation.
150152
public final boolean rejected;
151153

152-
public Failure(String message, boolean rejected)
154+
public Failure(ExceptionCode code, String message, boolean rejected)
153155
{
156+
this.code = code;
154157
this.message = message;
155158
this.rejected = rejected;
156159
}
@@ -159,6 +162,7 @@ public Failure(String message, boolean rejected)
159162
public String toString()
160163
{
161164
return "Failure{" +
165+
"code='" + code + '\'' +
162166
"message='" + message + '\'' +
163167
"rejected=" + rejected +
164168
'}';
@@ -177,13 +181,15 @@ public boolean isFailure()
177181

178182
class Serializer implements IVersionedSerializer<Result>
179183
{
180-
184+
private static final byte SUCCESS = 1;
185+
private static final byte REJECTED = 2;
186+
private static final byte FAILED = 3;
181187
@Override
182188
public void serialize(Result t, DataOutputPlus out, int version) throws IOException
183189
{
184190
if (t instanceof Success)
185191
{
186-
out.writeByte(1);
192+
out.writeByte(SUCCESS);
187193
Version metadataVersion = t.success().metadataVersion;
188194
out.writeUnsignedVInt32(metadataVersion.asInt());
189195
Replication.serializer.serialize(t.success().replication, out, metadataVersion);
@@ -193,7 +199,8 @@ public void serialize(Result t, DataOutputPlus out, int version) throws IOExcept
193199
{
194200
assert t instanceof Failure;
195201
Failure failure = (Failure) t;
196-
out.writeByte(failure.rejected ? 2 : 3);
202+
out.writeByte(failure.rejected ? REJECTED : FAILED);
203+
out.writeUnsignedVInt32(failure.code.value);
197204
out.writeUTF(failure.message);
198205
}
199206
}
@@ -202,7 +209,7 @@ public void serialize(Result t, DataOutputPlus out, int version) throws IOExcept
202209
public Result deserialize(DataInputPlus in, int version) throws IOException
203210
{
204211
int b = in.readByte();
205-
if (b == 1)
212+
if (b == SUCCESS)
206213
{
207214
Version metadataVersion = Version.fromInt(in.readUnsignedVInt32());
208215
Replication delta = Replication.serializer.deserialize(in, metadataVersion);
@@ -211,7 +218,9 @@ public Result deserialize(DataInputPlus in, int version) throws IOException
211218
}
212219
else
213220
{
214-
return new Failure(in.readUTF(), b == 2);
221+
return new Failure(ExceptionCode.fromValue(in.readUnsignedVInt32()),
222+
in.readUTF(),
223+
b == REJECTED);
215224
}
216225
}
217226

@@ -222,13 +231,14 @@ public long serializedSize(Result t, int version)
222231
if (t instanceof Success)
223232
{
224233
Version metadataVersion = t.success().metadataVersion;
225-
size += VIntCoding.computeVIntSize(metadataVersion.asInt());
234+
size += VIntCoding.computeUnsignedVIntSize(metadataVersion.asInt());
226235
size += Replication.serializer.serializedSize(t.success().replication, metadataVersion);
227236
size += Epoch.serializer.serializedSize(t.success().epoch, metadataVersion);
228237
}
229238
else
230239
{
231240
assert t instanceof Failure;
241+
size += VIntCoding.computeUnsignedVIntSize(((Failure) t).code.value);
232242
size += TypeSizes.sizeof(((Failure)t).message);
233243
}
234244
return size;

src/java/org/apache/cassandra/tcm/RemoteProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.cassandra.utils.concurrent.Future;
5757
import org.apache.cassandra.utils.concurrent.Promise;
5858

59+
import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR;
5960
import static org.apache.cassandra.net.NoPayload.noPayload;
6061
import static org.apache.cassandra.tcm.ClusterMetadataService.State.REMOTE;
6162

@@ -93,7 +94,9 @@ public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch la
9394
}
9495
catch (Exception e)
9596
{
96-
return new Commit.Result.Failure(e.getMessage() == null ? e.getClass().toString() : e.getMessage(), false);
97+
return new Commit.Result.Failure(SERVER_ERROR, e.getMessage() == null
98+
? e.getClass().toString()
99+
: e.getMessage(), false);
97100
}
98101
}
99102

src/java/org/apache/cassandra/tcm/Transformation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.primitives.Ints;
2727

2828
import org.apache.cassandra.db.TypeSizes;
29+
import org.apache.cassandra.exceptions.ExceptionCode;
2930
import org.apache.cassandra.io.util.DataInputBuffer;
3031
import org.apache.cassandra.io.util.DataInputPlus;
3132
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -108,10 +109,12 @@ public String toString()
108109

109110
final class Rejected implements Result
110111
{
112+
public final ExceptionCode code;
111113
public final String reason;
112114

113-
public Rejected(String reason)
115+
public Rejected(ExceptionCode code, String reason)
114116
{
117+
this.code = code;
115118
this.reason = reason;
116119
}
117120

src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public static void initiate(NodeId nodeId, InetAddressAndPort addr)
8080
.commit(new StartAddToCMS(addr),
8181
(metadata) -> !metadata.inProgressSequences.contains(nodeId),
8282
(metadata) -> metadata.inProgressSequences.get(nodeId),
83-
(metadata, reason) -> {
83+
(metadata, code, reason) -> {
8484
throw new IllegalStateException("Can't join ownership group: " + reason);
8585
});
8686
if (continuation.kind() != JOIN_OWNERSHIP_GROUP)
@@ -137,11 +137,11 @@ private void finishJoin()
137137
{
138138
NodeId self = ClusterMetadata.current().myNodeId();
139139
ClusterMetadataService.instance().commit(finishJoin,
140-
(ClusterMetadata metadata) -> metadata.inProgressSequences.contains(self),
141-
(ClusterMetadata metadata) -> null,
142-
(ClusterMetadata metadata, String error) -> {
143-
throw new IllegalStateException(String.format("Could not finish join due to \"%s\". Next transformation in sequence: %s.",
144-
error,
140+
(metadata) -> metadata.inProgressSequences.contains(self),
141+
(metadata) -> null,
142+
(metadata, code, error) -> {
143+
throw new IllegalStateException(String.format("Could not finish join due to [%s]: \"%s\". Next transformation in sequence: %s.",
144+
code, error,
145145
metadata.inProgressSequences.contains(self) ? metadata.inProgressSequences.get(self).nextStep() : null));
146146
});
147147
}

0 commit comments

Comments
 (0)