Skip to content

Commit

Permalink
add cqld4 prepared statement binding diagnostics for errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jshook committed Jun 2, 2022
1 parent 07339c4 commit 5aaae15
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 18 deletions.
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.adapter.cqld4.opdispensers;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.data.UdtValue;
import com.datastax.oss.driver.api.core.type.*;
import com.datastax.oss.driver.internal.core.type.PrimitiveType;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.*;
import java.util.function.LongFunction;

import static com.datastax.oss.protocol.internal.ProtocolConstants.DataType.*;

/**
* This should only be used when there is an exception thrown by some higher level logic.
* The purpose of this class is to do a more thorough job of checking each step of binding
* values to a prepared statement, and to provide useful feedback to the user
* explaining more specifically what the problem was that caused the original error to be thrown.
*/
public class CQLD4PreparedStmtDiagnostics {
private final static Logger logger = LogManager.getLogger(CQLD4PreparedStmtDiagnostics.class);

public static BoundStatement bindStatement(BoundStatement bound, CqlIdentifier colname, Object colval, DataType coltype) {

return switch (coltype) {
case PrimitiveType pt -> switch (pt.getProtocolCode()) {
case CUSTOM -> throw new OpConfigError("Error with Custom DataType");
case ASCII, VARCHAR -> bound.setString(colname, (String) colval);
case BIGINT, COUNTER ->bound.setLong(colname, (long) colval);
case BLOB -> bound.setByteBuffer(colname, (ByteBuffer) colval);
case BOOLEAN -> bound.setBoolean(colname, (boolean) colval);
case DECIMAL ->bound.setBigDecimal(colname, (BigDecimal) colval);
case DOUBLE ->bound.setDouble(colname, (double) colval);
case FLOAT ->bound.setFloat(colname, (float) colval);
case INT, SMALLINT, TINYINT -> bound.setInt(colname, (int) colval);
case TIMESTAMP -> bound.setInstant(colname, (Instant) colval);
case TIMEUUID, UUID ->bound.setUuid(colname, (UUID) colval);
case VARINT ->bound.setBigInteger(colname, (BigInteger) colval);
case INET ->bound.setInetAddress(colname, (InetAddress) colval);
case DATE ->bound.setLocalDate(colname, (LocalDate) colval);
case TIME -> bound.setLocalTime(colname, (LocalTime) colval);
case DURATION ->bound.setCqlDuration(colname, (CqlDuration) colval);
case LIST -> bound.setList(colname,(List)colval,((List)colval).get(0).getClass());
case MAP -> {
Map map = (Map) colval;
Set<Map.Entry> entries = map.entrySet();
Optional<Map.Entry> first = entries.stream().findFirst();
if (first.isPresent()) {
yield bound.setMap(colname,map,first.get().getKey().getClass(),first.get().getValue().getClass());
} else {
yield bound.setMap(colname,map,Object.class,Object.class);
}
}
case SET -> {
Set set = (Set)colval;
Optional first = set.stream().findFirst();
if (first.isPresent()) {
yield bound.setSet(colname,set,first.get().getClass());
} else {
yield bound.setSet(colname,Set.of(),Object.class);
}
}
case UDT -> {
UdtValue udt = (UdtValue) colval;
yield bound.setUdtValue(colname, udt);
}
case TUPLE -> {
TupleValue tuple = (TupleValue) colval;
yield bound.setTupleValue(colname,tuple);
}
default-> throw new RuntimeException("Unknown CQL type for diagnostic (type:'" + coltype +"',code:'" + coltype.getProtocolCode()+"'");
};

default -> throw new IllegalStateException("Unexpected value: " + coltype);
};

}

public static Cqld4CqlOp rebindWithDiagnostics(
PreparedStatement preparedStmt,
LongFunction<Object[]> fieldsF,
long cycle,
Exception exception
) {
logger.error(exception);
ColumnDefinitions defs = preparedStmt.getVariableDefinitions();
Object[] values = fieldsF.apply(cycle);
if (defs.size() != values.length) {
throw new OpConfigError("There are " + defs.size() + " anchors in statement '" + preparedStmt.getQuery() + "'" +
"but only " + values.length + " values were provided.");
}

BoundStatement bound = preparedStmt.bind();
int idx = 0;
for (int i = 0; i < defs.size(); i++) {
Object value = values[i];
ColumnDefinition def = defs.get(i);
CqlIdentifier defname = def.getName();
DataType type = def.getType();
try {
bound = CQLD4PreparedStmtDiagnostics.bindStatement(bound, defname, value, type);
} catch (ClassCastException cce) {
String errormsg = String.format(
"Unable to bind column '%s' to cql type '%s' with value '%s' (class '%s')",
defname,
type.asCql(false, false),
value,
value.getClass().getCanonicalName()
);
logger.error(errormsg);
throw new OpConfigError(errormsg, cce);

}
}

// If we got here, then either someone used the diagnostic binder where they shouldn't (It's SLOW,
// and there was no exception which prompted a retry with this diagnostic) OR
// There was an error detected in the caller and it was not seen here where it should have been
// reproduced.
throw new OpConfigError("The diagnostic binder was called but no error was found. This is a logic error.");
}

}
Expand Up @@ -25,53 +25,72 @@
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.function.LongFunction;

public class Cqld4PreparedStmtDispenser extends BaseCqlStmtDispenser {
private final static Logger logger = LogManager.getLogger(Cqld4PreparedStmtDispenser.class);

private final RSProcessors processors;
private final LongFunction<Statement> stmtFunc;
private final ParsedTemplate stmtTpl;
private final LongFunction<Object[]> fieldsF;
private PreparedStatement preparedStmt;
private CqlSession boundSession;

public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp cmd, ParsedTemplate stmtTpl, RSProcessors processors) {
super(sessionFunc, cmd);
if (cmd.isDynamic("space")) {
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp op, ParsedTemplate stmtTpl, RSProcessors processors) {
super(sessionFunc, op);
if (op.isDynamic("space")) {
throw new RuntimeException("Prepared statements and dynamic space values are not supported." +
" This would churn the prepared statement cache, defeating the purpose of prepared statements.");
}
this.processors = processors;
this.stmtTpl = stmtTpl;
stmtFunc = createStmtFunc(cmd);
this.fieldsF = getFieldsFunction(op);
stmtFunc = createStmtFunc(fieldsF, op);
}

protected LongFunction<Statement> createStmtFunc(ParsedOp cmd) {

private LongFunction<Object[]> getFieldsFunction(ParsedOp op) {
LongFunction<Object[]> varbinder;
varbinder = cmd.newArrayBinderFromBindPoints(stmtTpl.getBindPoints());
varbinder = op.newArrayBinderFromBindPoints(stmtTpl.getBindPoints());
return varbinder;
}

protected LongFunction<Statement> createStmtFunc(LongFunction<Object[]> fieldsF, ParsedOp op) {

String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");
boundSession = getSessionFunc().apply(0);
preparedStmt = boundSession.prepare(preparedQueryString);

LongFunction<Statement> boundStmtFunc = c -> {
Object[] apply = varbinder.apply(c);
Object[] apply = fieldsF.apply(c);
return preparedStmt.bind(apply);
};
return super.getEnhancedStmtFunc(boundStmtFunc, cmd);
return super.getEnhancedStmtFunc(boundStmtFunc, op);
}

@Override
public Cqld4CqlOp apply(long value) {
public Cqld4CqlOp apply(long cycle) {

return new Cqld4CqlPreparedStatement(
boundSession,
(BoundStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace(),
processors
);
BoundStatement boundStatement;
try {
boundStatement = (BoundStatement) stmtFunc.apply(cycle);
return new Cqld4CqlPreparedStatement(
boundSession,
boundStatement,
getMaxPages(),
isRetryReplace(),
processors
);
} catch (Exception exception) {
return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics(
preparedStmt,
fieldsF,
cycle,
exception
);
}
}

}

0 comments on commit 5aaae15

Please sign in to comment.