Skip to content

Commit

Permalink
Second part of #1644 fix: C*4 from 4.0.1 to 4.0.3 upgrade (#1647)
Browse files Browse the repository at this point in the history
Also adds work-around/fix for https://issues.apache.org/jira/browse/CASSANDRA-17401 as well as a test to verify.
  • Loading branch information
tatu-at-datastax authored and jeffreyscarpenter committed Feb 3, 2023
1 parent ffebd04 commit 66328d4
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 30 deletions.
2 changes: 1 addition & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ RUN git clone --branch master --single-branch https://github.com/riptano/ccm.git
sudo python setup.py install

# Create clusters to pre-download necessary artifacts
RUN ccm create -v 4.0.1 stargate_40 && ccm remove stargate_40
RUN ccm create -v 4.0.3 stargate_40 && ccm remove stargate_40
RUN ccm create -v 3.11.12 stargate_311 && ccm remove stargate_311
RUN ccm create -v 6.8.20 --dse stargate_dse68 && ccm remove stargate_dse68

Expand Down
2 changes: 1 addition & 1 deletion cql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>4.0.1</version>
<version>4.0.3</version>
<exclusions>
<exclusion>
<groupId>org.gridkit.jvmtool</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private RetryDecision shouldRetry(Throwable throwable, int retryCount) {
}
PersistenceException pe = cause.get();
switch (pe.code()) {
case UNPREPARED:
return RetryDecision.RETRY;
case READ_TIMEOUT:
return retryPolicy.onReadTimeout((ReadTimeoutException) pe, retryCount);
case WRITE_TIMEOUT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.cassandra.stargate.exceptions.IsBootstrappingException;
import org.apache.cassandra.stargate.exceptions.OverloadedException;
import org.apache.cassandra.stargate.exceptions.PersistenceException;
import org.apache.cassandra.stargate.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.stargate.exceptions.ReadFailureException;
import org.apache.cassandra.stargate.exceptions.ReadTimeoutException;
import org.apache.cassandra.stargate.exceptions.RequestFailureReason;
Expand All @@ -53,16 +52,13 @@
import org.apache.cassandra.stargate.locator.InetAddressAndPort;
import org.apache.cassandra.stargate.transport.ProtocolException;
import org.apache.cassandra.stargate.transport.ServerError;
import org.apache.cassandra.stargate.utils.MD5Digest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class PersistenceExceptionTest extends BaseGrpcServiceTest {

private static final MD5Digest UNPREPARED_ID = MD5Digest.compute(new byte[] {0});

@Test
public void unavailable() {
assertThatThrownBy(
Expand Down Expand Up @@ -279,10 +275,6 @@ public static Stream<Arguments> persistenceExceptionValues() {
new ProtocolException("Some protocol problem"),
Status.INTERNAL,
"Some protocol problem"),
Arguments.of(
new PreparedQueryNotFoundException(UNPREPARED_ID),
Status.INTERNAL,
String.format("Prepared query with ID %s not found", UNPREPARED_ID)),
Arguments.of(
new InvalidRequestException("Some request problem"),
Status.INVALID_ARGUMENT,
Expand Down
2 changes: 1 addition & 1 deletion persistence-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>4.0.1</version>
<version>4.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.marshal.AbstractType;
Expand Down Expand Up @@ -546,13 +544,12 @@ public static Result toResultInternal(
case PREPARED:
ResultMessage.Prepared prepared = (ResultMessage.Prepared) resultMessage;
PreparedWithInfo preparedWithInfo = (PreparedWithInfo) prepared;
ParsedStatement.Prepared preparedStatement =
QueryProcessor.instance.getPrepared(prepared.statementId);
return new Result.Prepared(
Conversion.toExternal(prepared.statementId),
null,
toResultMetadata(prepared.resultMetadata, null),
toPreparedMetadata(prepared.metadata.names, preparedStatement.partitionKeyBindIndexes),
toPreparedMetadata(
prepared.metadata.names, preparedWithInfo.getPartitionKeyBindVariableIndexes()),
preparedWithInfo.isIdempotent(),
preparedWithInfo.isUseKeyspace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class PreparedWithInfo extends ResultMessage.Prepared {
private final boolean idempotent;
private final boolean useKeyspace;
private final short[] partitionKeyBindVariableIndexes;

public PreparedWithInfo(
boolean idempotent,
Expand All @@ -30,6 +31,7 @@ public PreparedWithInfo(
super(prepare.statementId, prepared);
this.idempotent = idempotent;
this.useKeyspace = useKeyspace;
this.partitionKeyBindVariableIndexes = prepared.partitionKeyBindIndexes;
}

public boolean isIdempotent() {
Expand All @@ -39,4 +41,8 @@ public boolean isIdempotent() {
public boolean isUseKeyspace() {
return useKeyspace;
}

public short[] getPartitionKeyBindVariableIndexes() {
return partitionKeyBindVariableIndexes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ public ResultMessage process(
public Prepared prepare(String s, QueryState queryState, Map<String, ByteBuffer> customPayload)
throws RequestValidationException {
Prepared prepare = QueryProcessor.instance.prepare(s, queryState, customPayload);
ParsedStatement.Prepared prepared = QueryProcessor.instance.getPrepared(prepare.statementId);
ParsedStatement.Prepared prepared =
Optional.ofNullable(QueryProcessor.instance.getPrepared(prepare.statementId))
.orElseGet(() -> QueryProcessor.getStatement(s, queryState.getClientState()));
boolean idempotent = IdempotencyAnalyzer.isIdempotent(prepared.statement);
boolean useKeyspace = prepared.statement instanceof UseStatement;
return new PreparedWithInfo(idempotent, useKeyspace, prepare, prepared);
Expand Down
23 changes: 23 additions & 0 deletions persistence-cassandra-4.0/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Persistence Cassandra 4.0

This module represents the implementation of the [persistence-api](../persistence-api) for the Cassandra `4.0.x` version.

## Cassandra version update

The current Cassandra version this module depends on is `4.0.3`.
In order to update to a newer patch version, please follow the guidelines below:

* Update the `cassandra.version` property in the [pom.xml](pom.xml).
* Update the `ccm.version` property (`it-cassandra-4.0` profile section) in [testing/pom.xml](testing/pom.xml)
* Check the transitive dependencies of the `org.apache.cassandra:cassandra-all` for the new version.
Make sure that the version of the `com.datastax.cassandra:cassandra-driver-core` that `cassandra-all` depends on, is same as in the `cassandra.bundled-driver.version` property in the [pom.xml](pom.xml).
This dependency is set as optional in the `cassandra-all`, but we need it to correctly handle UDFs.
Note that transitive dependencies can be seen on [mvnrepository.com](https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all) or by running `./mvnw dependency:tree -pl persistence-cassandra-4.0`.
* Update the [CI Dockerfile](../ci/Dockerfile) and set the new version in the `ccm create` command related to 4.0.
Note that this will have no effect until the docker image is rebuilt and pushed to the remote repository, thus creating an issue for that would be a good idea.
* Make sure everything compiles and CI tests are green.
* Update this `README.md` file with the new or updated instructions.

It's always good to validate your work against the pull requests that bumped the version in the past:

* `4.0.1` -> `4.0.3` [stargate/stargate#1647](https://github.com/stargate/stargate/pull/1647)
3 changes: 2 additions & 1 deletion persistence-cassandra-4.0/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
<artifactId>persistence-cassandra-4.0</artifactId>
<properties>
<!-- If you update this, make sure to keep `cassandra.bundled-driver.version` in sync -->
<cassandra.version>4.0.1</cassandra.version>
<!-- 4.0.3 depends on 3.11.0: https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all/4.0.3 -->
<cassandra.version>4.0.3</cassandra.version>
<!--
The driver used internally by cassandra-all for UDFs (must match the version declared in
cassandra-all's POM).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.LivenessInfo;
Expand Down Expand Up @@ -494,15 +492,12 @@ private static Result toResultInternal(
case PREPARED:
ResultMessage.Prepared prepared = (ResultMessage.Prepared) resultMessage;
PreparedWithInfo preparedWithInfo = (PreparedWithInfo) prepared;
QueryHandler.Prepared preparedStatement =
QueryProcessor.instance.getPrepared(prepared.statementId);
return new Result.Prepared(
Conversion.toExternal(prepared.statementId),
Conversion.toExternal(prepared.resultMetadataId),
toResultMetadata(prepared.resultMetadata, null),
toPreparedMetadata(
prepared.metadata.names,
preparedStatement.statement.getPartitionKeyBindVariableIndexes()),
prepared.metadata.names, preparedWithInfo.getPartitionKeyBindVariableIndexes()),
preparedWithInfo.isIdempotent(),
preparedWithInfo.isUseKeyspace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
public class PreparedWithInfo extends ResultMessage.Prepared {
private final boolean idempotent;
private final boolean useKeyspace;
private final short[] partitionKeyBindVariableIndexes;

public PreparedWithInfo(boolean idempotent, boolean useKeyspace, Prepared prepare) {
public PreparedWithInfo(
boolean idempotent,
boolean useKeyspace,
short[] partitionKeyBindVariableIndexes,
Prepared prepare) {
super(prepare.statementId, prepare.resultMetadataId, prepare.metadata, prepare.resultMetadata);
this.idempotent = idempotent;
this.useKeyspace = useKeyspace;
this.partitionKeyBindVariableIndexes = partitionKeyBindVariableIndexes;
}

public boolean isIdempotent() {
Expand All @@ -34,4 +40,8 @@ public boolean isIdempotent() {
public boolean isUseKeyspace() {
return useKeyspace;
}

public short[] getPartitionKeyBindVariableIndexes() {
return partitionKeyBindVariableIndexes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ public ResultMessage.Prepared prepare(
String s, ClientState clientState, Map<String, ByteBuffer> map)
throws RequestValidationException {
ResultMessage.Prepared prepare = QueryProcessor.instance.prepare(s, clientState, map);
Prepared prepared = QueryProcessor.instance.getPrepared(prepare.statementId);
boolean idempotent = IdempotencyAnalyzer.isIdempotent(prepared.statement);
boolean useKeyspace = prepared.statement instanceof UseStatement;
return new PreparedWithInfo(idempotent, useKeyspace, prepare);
CQLStatement statement =
Optional.ofNullable(QueryProcessor.instance.getPrepared(prepare.statementId))
.map(p -> p.statement)
.orElseGet(() -> QueryProcessor.getStatement(s, clientState));
boolean idempotent = IdempotencyAnalyzer.isIdempotent(statement);
boolean useKeyspace = statement instanceof UseStatement;
return new PreparedWithInfo(
idempotent, useKeyspace, statement.getPartitionKeyBindVariableIndexes(), prepare);
}

@Override
Expand Down
20 changes: 20 additions & 0 deletions persistence-dse-6.8/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Persistence DSE 6.8

This module represents the implementation of the [persistence-api](../persistence-api) for
the DSE (DataStax Enterprise cassandra) `6.8.x` version.

## Cassandra version update

The current Cassandra version this module depends on is `6.8.20`.
In order to update to a newer patch version, please follow the guidelines below:

* Update the `cassandra.version` property in the [pom.xml](pom.xml).
* Update the `ccm.version` property (`it-dse-6.8` profile section) in [testing/pom.xml](testing/pom.xml)
* Update the [CI Dockerfile](../ci/Dockerfile) and set the new version in the `ccm create` command related to DSE 6.8.
Note that this will have no effect until the docker image is rebuilt and pushed to the remote repository, thus creating an issue for that would be a good idea.
* Make sure everything compiles and CI tests are green.
* Update this `README.md` file with the new or updated instructions.

It's always good to validate your work against the pull requests that bumped the version in the past:

* `6.8.16` -> `6.8.20` [stargate/stargate#1652](https://github.com/stargate/stargate/pull/1652)
2 changes: 1 addition & 1 deletion testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@
</goals>
<configuration>
<systemPropertyVariables>
<ccm.version>4.0.1</ccm.version>
<ccm.version>4.0.3</ccm.version>
<stargate.test.nodes>1</stargate.test.nodes>
</systemPropertyVariables>
<failIfNoTests>true</failIfNoTests>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.RandomUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -116,6 +117,53 @@ public void onCompleted() {}
rowOf(Values.of("c"), Values.of(3)))));
}

@Test
public void manyStreamingBatch(@TestKeyspace CqlIdentifier keyspace) {
List<StreamingResponse> responses = new CopyOnWriteArrayList<>();

StargateStub stub = asyncStubWithCallCredentials();
StreamObserver<StreamingResponse> responseStreamObserver =
new StreamObserver<StreamingResponse>() {
@Override
public void onNext(StreamingResponse value) {
responses.add(value);
}

@Override
public void onError(Throwable t) {}

@Override
public void onCompleted() {}
};

StreamObserver<Batch> requestObserver = stub.executeBatchStream(responseStreamObserver);

int queries = RandomUtils.nextInt(10, 50);
for (int i = 0; i < queries; i++) {
Batch batch =
Batch.newBuilder()
.addQueries(
cqlBatchQuery(
"INSERT INTO test (k, v) VALUES (?, ?)", Values.of("m"), Values.of(i)))
.setParameters(batchParameters(keyspace))
.build();
requestObserver.onNext(batch);
}
requestObserver.onCompleted();

// make sure all queries where executed, and we got not errors back
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
assertThat(responses).hasSize(queries);

assertThat(responses)
.extracting(StreamingResponse::getStatus)
.allSatisfy(status -> assertThat(status.getCode()).isZero());
});
}

@Test
public void streamingQueryWithError(@TestKeyspace CqlIdentifier keyspace)
throws InvalidProtocolBufferException {
Expand Down

0 comments on commit 66328d4

Please sign in to comment.