Skip to content

Commit

Permalink
CASSANDRA-13409: Support coexistting regular-row-tombstone and shadow…
Browse files Browse the repository at this point in the history
…able-row-tombstone and add support for shadowable in sstabledump
  • Loading branch information
Zhao Yang authored and jasonstack committed May 7, 2017
1 parent 65bd035 commit 26f3e7b
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 13 deletions.
107 changes: 101 additions & 6 deletions src/java/org/apache/cassandra/db/rows/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,51 @@ public interface Row extends Unfiltered, Collection<ColumnData>
*/
public void apply(Consumer<ColumnData> function, Predicate<ColumnData> stopCondition, boolean reverse);

/**
* Use to aggregate multiple row deletions and retain most recent regular-row-tombstone
* if its TS is less than most recent shadowable-row-tombstone
*/
public static class DeletionAggregator
{
private DeletionTime shadowable = DeletionTime.LIVE;
private DeletionTime regular = DeletionTime.LIVE;

/**
* Accumulate most recent regular-row-tombstone which TS is less than most recent shadowable-row-tombstone.
*
*
* @param deletion
* deletion to be added
*/
public void aggregate(Deletion deletion)
{
if (deletion.isShadowable)
{
if (deletion.supersedes(shadowable))
shadowable = deletion.time;
if (deletion.getRetainedDeletion().supersedes(regular))
regular = deletion.getRetainedDeletion();
}
else if (deletion.supersedes(regular))
regular = deletion.time;
}

public Deletion build()
{
// either no shadowable or shadowable is shadowed by regular
if (shadowable.isLive() || regular.compareTo(shadowable) >= 0)
{
return new Deletion(regular, false);
}
else
{
if (regular.isLive())
return new Deletion(shadowable, true);
else
return new Deletion(shadowable, regular);
}
}
}
/**
* A row deletion/tombstone.
* <p>
Expand All @@ -285,14 +330,26 @@ public static class Deletion

private final DeletionTime time;
private final boolean isShadowable;
// retainedDeletion can only be added if current deletion is shadowable and more recent than retainedDeletion
private final DeletionTime retainedRegularRowDeletion;

public Deletion(DeletionTime time, boolean isShadowable)
{
assert !time.isLive() || !isShadowable;
this.time = time;
this.isShadowable = isShadowable;
this.retainedRegularRowDeletion = DeletionTime.LIVE;
}

private Deletion(DeletionTime shadowable, DeletionTime regular)
{
assert shadowable.supersedes(regular);
this.time = shadowable;
this.isShadowable = true;
this.retainedRegularRowDeletion = regular;
}


public static Deletion regular(DeletionTime time)
{
return time.isLive() ? LIVE : new Deletion(time, false);
Expand All @@ -303,6 +360,12 @@ public static Deletion shadowable(DeletionTime time)
return new Deletion(time, true);
}

public static Deletion shadowableWithRetainedRegular(DeletionTime shadowable, DeletionTime regular)
{
assert !shadowable.isLive();
return new Deletion(shadowable, regular);
}

/**
* The time of the row deletion.
*
Expand All @@ -313,6 +376,25 @@ public DeletionTime time()
return time;
}

/**
* Check if there is retained regular-row-deletion
*
* @return
*/
public boolean hasRetainedDeletion()
{
return !retainedRegularRowDeletion.isLive();
}

/**
* The retained regular-row-deletion
*
* @return retained regular-row-deletion
*/
public DeletionTime getRetainedDeletion()
{
return retainedRegularRowDeletion;
}
/**
* Whether the deletion is a shadowable one or not.
*
Expand Down Expand Up @@ -364,6 +446,10 @@ public void digest(MessageDigest digest)
{
time.digest(digest);
FBUtilities.updateWithBoolean(digest, isShadowable);
if (!retainedRegularRowDeletion.isLive())
{
retainedRegularRowDeletion.digest(digest);
}
}

public int dataSize()
Expand All @@ -389,7 +475,12 @@ public final int hashCode()
@Override
public String toString()
{
return String.format("%s%s", time, isShadowable ? "(shadowable)" : "");
boolean hasRetained = hasRetainedDeletion();
if (!hasRetained)
return String.format("%s%s", time, isShadowable ? "(shadowable)" : "");
else
return String.format("%s%s %s%s", time, isShadowable ? "(shadowable)" : "",
retainedRegularRowDeletion, "(retained)");
}
}

Expand Down Expand Up @@ -641,20 +732,24 @@ public Row merge(DeletionTime activeDeletion)
}

LivenessInfo rowInfo = LivenessInfo.EMPTY;
Deletion rowDeletion = Deletion.LIVE;
DeletionAggregator accumulator = new DeletionAggregator();
for (Row row : rows)
{
if (row == null)
continue;

if (row.primaryKeyLivenessInfo().supersedes(rowInfo))
rowInfo = row.primaryKeyLivenessInfo();
if (row.deletion().supersedes(rowDeletion))
rowDeletion = row.deletion();
accumulator.aggregate(row.deletion());
}

Deletion rowDeletion = accumulator.build();
if (rowDeletion.isShadowedBy(rowInfo))
rowDeletion = Deletion.LIVE;
{
// if shadowable-row-tombstone is shadowed by new rowInfo,
// it will be removed and use retained regular-row-tombstone.
// if there is not retained regular-row-tombstone, it will be LIVE
rowDeletion = Deletion.regular(rowDeletion.getRetainedDeletion());
}

if (rowDeletion.supersedes(activeDeletion))
activeDeletion = rowDeletion.time();
Expand Down
40 changes: 34 additions & 6 deletions src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public class UnfilteredSerializer
*/
private final static int IS_STATIC = 0x01; // Whether the encoded row is a static. If there is no extended flag, the row is assumed not static.
private final static int HAS_SHADOWABLE_DELETION = 0x02; // Whether the row deletion is shadowable. If there is no extended flag (or no row deletion), the deletion is assumed not shadowable.
private final static int HAS_RETAINED_DELETION = 0x04; // Whether the row deletion contains retained regular deletion.
// If there is no extended flag (or no shadowable row deletion),
// the deletion is assumed not containing ratained regular row deletion.

public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
throws IOException
Expand Down Expand Up @@ -164,8 +167,11 @@ private void serialize(Row row, SerializationHeader header, DataOutputPlus out,
if (!deletion.isLive())
{
flags |= HAS_DELETION;
if (deletion.isShadowable())
if (deletion.isShadowable()) {
extendedFlags |= HAS_SHADOWABLE_DELETION;
if (deletion.hasRetainedDeletion())
extendedFlags |= HAS_RETAINED_DELETION;
}
}
if (hasComplexDeletion)
flags |= HAS_COMPLEX_DELETION;
Expand Down Expand Up @@ -219,7 +225,7 @@ private void serializeRowBody(Row row, int flags, SerializationHeader header, Da
header.writeLocalDeletionTime(pkLiveness.localExpirationTime(), out);
}
if ((flags & HAS_DELETION) != 0)
header.writeDeletionTime(deletion.time(), out);
serializeRowDeletion(out, header, deletion);

if ((flags & HAS_ALL_COLUMNS) == 0)
Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
Expand Down Expand Up @@ -259,6 +265,14 @@ private void serializeRowBody(Row row, int flags, SerializationHeader header, Da
}
}

private void serializeRowDeletion(DataOutputPlus out, SerializationHeader header, Deletion deletion) throws IOException
{
header.writeDeletionTime(deletion.time(), out);
if (deletion.hasRetainedDeletion()) {
header.writeDeletionTime(deletion.getRetainedDeletion(), out);
}
}

private void writeComplexColumn(ComplexColumnData data, ColumnDefinition column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out)
throws IOException
{
Expand Down Expand Up @@ -342,7 +356,11 @@ private long serializedRowBodySize(Row row, SerializationHeader header, long pre
size += header.localDeletionTimeSerializedSize(pkLiveness.localExpirationTime());
}
if (!deletion.isLive())
{
size += header.deletionTimeSerializedSize(deletion.time());
if (deletion.hasRetainedDeletion())
size += header.deletionTimeSerializedSize(deletion.getRetainedDeletion());
}

if (!hasAllColumns)
size += Columns.serializer.serializedSubsetSize(Collections2.transform(row, ColumnData::column), header.columns(isStatic));
Expand Down Expand Up @@ -500,7 +518,6 @@ public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeade
assert header.isForSSTable();
boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
boolean hasTTL = (flags & HAS_TTL) != 0;
boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
Clustering clustering = Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes());
long nextPosition = in.readUnsignedVInt() + in.getFilePointer();
in.readUnsignedVInt(); // skip previous unfiltered size
Expand All @@ -514,7 +531,7 @@ public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeade
}
}

Deletion deletion = new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable);
Deletion deletion = deserializeRowDeletion(in, header, extendedFlags);
in.seek(nextPosition);
return BTreeRow.emptyDeletedRow(clustering, deletion);
}
Expand Down Expand Up @@ -568,7 +585,6 @@ public Row deserializeRowBody(DataInputPlus in,
boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
boolean hasTTL = (flags & HAS_TTL) != 0;
boolean hasDeletion = (flags & HAS_DELETION) != 0;
boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
Columns headerColumns = header.columns(isStatic);
Expand All @@ -589,7 +605,7 @@ public Row deserializeRowBody(DataInputPlus in,
}

builder.addPrimaryKeyLivenessInfo(rowLiveness);
builder.addRowDeletion(hasDeletion ? new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : Row.Deletion.LIVE);
builder.addRowDeletion(hasDeletion ? deserializeRowDeletion(in, header, extendedFlags) : Row.Deletion.LIVE);

Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);

Expand Down Expand Up @@ -631,6 +647,18 @@ public Row deserializeRowBody(DataInputPlus in,
}
}

private Deletion deserializeRowDeletion(DataInputPlus in, SerializationHeader header, int extendedFlags) throws IOException
{
boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
boolean deletionHasRetainedRegular = (extendedFlags & HAS_RETAINED_DELETION) != 0;
if (deletionIsShadowable && deletionHasRetainedRegular)
{
return Row.Deletion.shadowableWithRetainedRegular(header.readDeletionTime(in), header.readDeletionTime(in));
}
return new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable);
}


private void readSimpleColumn(ColumnDefinition column, DataInputPlus in, SerializationHeader header, SerializationHelper helper, Row.Builder builder, LivenessInfo rowLiveness)
throws IOException
{
Expand Down
31 changes: 30 additions & 1 deletion src/java/org/apache/cassandra/tools/JsonTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.Row.Deletion;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.impl.Indenter;
import org.codehaus.jackson.util.DefaultPrettyPrinter;
Expand Down Expand Up @@ -266,7 +268,7 @@ private void serializeRow(Row row)
// If this is a deletion, indicate that, otherwise write cells.
if (!row.deletion().isLive())
{
serializeDeletion(row.deletion().time());
serializeRowDeletion(row.deletion());
}
json.writeFieldName("cells");
json.writeStartArray();
Expand Down Expand Up @@ -351,6 +353,33 @@ private void serializeClustering(ClusteringPrefix clustering) throws IOException
}
}

private void serializeRowDeletion(Deletion deletion) throws IOException
{
json.writeFieldName("deletion_info");
json.writeStartObject();

serializeDeletionInfo(deletion.isShadowable() ? "shadowable" : "regular", deletion.time());
if (deletion.hasRetainedDeletion())
{
serializeDeletionInfo("regular", deletion.getRetainedDeletion());
}
json.writeEndObject();
}

private void serializeDeletionInfo(String variant, DeletionTime deletion)
throws IOException
{
json.writeFieldName(variant); // regular or shadowable
objectIndenter.setCompact(true);
json.writeStartObject();
json.writeFieldName("marked_deleted");
json.writeString(dateString(TimeUnit.MICROSECONDS, deletion.markedForDeleteAt()));
json.writeFieldName("local_delete_time");
json.writeString(dateString(TimeUnit.SECONDS, deletion.localDeletionTime()));
json.writeEndObject();
objectIndenter.setCompact(false);
}

private void serializeDeletion(DeletionTime deletion) throws IOException
{
json.writeFieldName("deletion_info");
Expand Down

0 comments on commit 26f3e7b

Please sign in to comment.