Skip to content

Commit

Permalink
Don't warn on big batches if everything is in the same partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Zhou committed Apr 21, 2017
1 parent 91661ec commit 2c61388
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 36 deletions.
85 changes: 53 additions & 32 deletions src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
Expand Up @@ -34,6 +34,8 @@
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.*;
import org.apache.cassandra.tracing.Tracing;
Expand Down Expand Up @@ -258,24 +260,34 @@ private int updatedRows()
/**
* Checks batch size to ensure threshold is met. If not, a warning is logged.
*
* @param cfs ColumnFamilies that will store the batch's mutations.
* @param mutations The batched mutations to be applied.
*/
public static void verifyBatchSize(Iterable<PartitionUpdate> updates) throws InvalidRequestException
private static void verifyBatchSize(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
// We only warn for batch spanning multiple mutations (#10876)
if (mutations.size() <= 1)
return;

long size = 0;
long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
long failThreshold = DatabaseDescriptor.getBatchSizeFailThreshold();

for (PartitionUpdate update : updates)
size += update.dataSize();
for (IMutation mutation : mutations)
{
for (PartitionUpdate update : mutation.getPartitionUpdates())
size += update.dataSize();
}

if (size > warnThreshold)
{
Set<String> tableNames = new HashSet<>();
for (PartitionUpdate update : updates)
tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
for (IMutation mutation : mutations)
{
for (PartitionUpdate update : mutation.getPartitionUpdates())
tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
}

String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.{}";
String format = "Batch for {} is of size {}, exceeding specified threshold of {} by {}.{}";
if (size > failThreshold)
{
Tracing.trace(format, tableNames, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
Expand All @@ -290,34 +302,52 @@ else if (logger.isWarnEnabled())
}
}

private void verifyBatchType(Iterable<PartitionUpdate> updates)
private void verifyBatchType(Collection<? extends IMutation> mutations)
{
if (!isLogged() && Iterables.size(updates) > 1)
if (!isLogged() && mutations.size() > 1)
{
Set<DecoratedKey> keySet = new HashSet<>();
Set<String> tableNames = new HashSet<>();

for (PartitionUpdate update : updates)
Map<String, Collection<Range<Token>>> localTokensByKs = new HashMap<>();
boolean localPartitionsOnly = true;
for (IMutation mutation : mutations)
{
keySet.add(update.partitionKey());
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
keySet.add(update.partitionKey());
tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
}

tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
if (localPartitionsOnly)
localPartitionsOnly &= isPartitionLocal(localTokensByKs, mutation);
}

// CASSANDRA-11529: log only if we have more than a threshold of keys, this was also suggested in the
// original ticket that introduced this warning, CASSANDRA-9282
if (keySet.size() > DatabaseDescriptor.getUnloggedBatchAcrossPartitionsWarnThreshold())
{
// CASSANDRA-9303: If we only have local mutations we do not warn.
if (localPartitionsOnly)
return;

NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING,
keySet.size(), tableNames.size() == 1 ? "" : "s", tableNames);
NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING,
keySet.size(), keySet.size() == 1 ? "" : "s",
tableNames.size() == 1 ? "" : "s", tableNames);

ClientWarn.instance.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(),
tableNames.size() == 1 ? "" : "s", tableNames}).getMessage());
}
ClientWarn.instance.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(),
tableNames.size() == 1 ? "" : "s", tableNames}).getMessage());
}
}

private boolean isPartitionLocal(Map<String, Collection<Range<Token>>> localTokensByKs, IMutation mutation)
{
String ksName = mutation.getKeyspaceName();
Collection<Range<Token>> localRanges = localTokensByKs.get(ksName);
if (localRanges == null)
{
localRanges = StorageService.instance.getLocalRanges(ksName);
localTokensByKs.put(ksName, localRanges);
}

return Range.isInRanges(mutation.key().getToken(), localRanges);
}

public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
Expand Down Expand Up @@ -349,17 +379,8 @@ private void executeWithoutConditions(Collection<? extends IMutation> mutations,
if (mutations.isEmpty())
return;

// Extract each collection of updates from it's IMutation and then lazily concatenate all of them into a single Iterable.
Iterable<PartitionUpdate> updates = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<PartitionUpdate>>()
{
public Collection<PartitionUpdate> apply(IMutation im)
{
return im.getPartitionUpdates();
}
}));

verifyBatchSize(updates);
verifyBatchType(updates);
verifyBatchSize(mutations);
verifyBatchType(mutations);

boolean mutateAtomic = (isLogged() && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
Expand Down
Expand Up @@ -228,10 +228,6 @@ public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequ
upd.applyUpdates(current, update);

Keyspace.openAndGetStore(cfm).indexManager.validate(update);

if (isBatch)
BatchStatement.verifyBatchSize(Collections.singleton(update));

return update;
}

Expand Down

0 comments on commit 2c61388

Please sign in to comment.