Skip to content

Commit

Permalink
merge from 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
jbellis committed Jul 3, 2012
2 parents 76ada11 + 67dec69 commit 602e383
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 19 deletions.
8 changes: 5 additions & 3 deletions src/java/org/apache/cassandra/db/AbstractColumnContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ public void maybeResetDeletionTimes(int gcBefore)
columns.maybeResetDeletionTimes(gcBefore);
}

/**
* We need to go through each column in the column container and resolve it before adding
*/
public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
{
return columns.addAllWithSizeDelta(cc.columns, allocator, transformation);
}

public void addAll(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation)
{
columns.addAll(cc.columns, allocator, transformation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ else if (c < 0)
// having to care about the deletion infos
protected abstract void addAllColumns(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation);

public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
{
// sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers
throw new UnsupportedOperationException();
}

public void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation)
{
addAllColumns(columns, allocator, transformation);
Expand Down
31 changes: 28 additions & 3 deletions src/java/org/apache/cassandra/db/AtomicSortedColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public void addColumn(IColumn column, Allocator allocator)
}

public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
{
addAllWithSizeDelta(cm, allocator, transformation);
}

public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
{
/*
* This operation needs to atomicity and isolation. To that end, we
Expand All @@ -169,22 +174,27 @@ public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, ICo
* we bail early, avoiding unnecessary work if possible.
*/
Holder current, modified;
long sizeDelta;

main_loop:
do
{
sizeDelta = 0;
current = ref.get();
DeletionInfo newDelInfo = current.deletionInfo.add(cm.getDeletionInfo());
modified = new Holder(current.map.clone(), newDelInfo);

for (IColumn column : cm.getSortedColumns())
{
modified.addColumn(transformation.apply(column), allocator);
sizeDelta += modified.addColumn(transformation.apply(column), allocator);
// bail early if we know we've been beaten
if (ref.get() != current)
continue main_loop;
}
}
while (!ref.compareAndSet(current, modified));

return sizeDelta;
}

public boolean replace(IColumn oldColumn, IColumn newColumn)
Expand Down Expand Up @@ -325,29 +335,44 @@ Holder clear()
return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo);
}

void addColumn(IColumn column, Allocator allocator)
long addColumn(IColumn column, Allocator allocator)
{
ByteBuffer name = column.name();
IColumn oldColumn;
while ((oldColumn = map.putIfAbsent(name, column)) != null)
long sizeDelta = 0;
while (true)
{
oldColumn = map.putIfAbsent(name, column);
if (oldColumn == null)
{
sizeDelta += column.dataSize();
break;
}

if (oldColumn instanceof SuperColumn)
{
assert column instanceof SuperColumn;
long previousSize = oldColumn.dataSize();
((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
sizeDelta += oldColumn.dataSize() - previousSize;
break; // Delegated to SuperColumn
}
else
{
// calculate reconciled col from old (existing) col and new col
IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
if (map.replace(name, oldColumn, reconciledColumn))
{
sizeDelta += reconciledColumn.dataSize() - oldColumn.dataSize();
break;
}

// We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
// (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
}
}

return sizeDelta;
}

void retainAll(ISortedColumns columns)
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/db/ISortedColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public interface ISortedColumns extends IIterableColumns
* add(c);
* </code>
* but is potentially faster.
*
* @return the difference in size seen after merging the given columns
*/
public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);

/**
* Adds the columns without necessarily computing the size delta
*/
public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);

Expand Down
24 changes: 11 additions & 13 deletions src/java/org/apache/cassandra/db/Memtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected void afterExecute(Runnable r, Throwable t)
volatile static Memtable activelyMeasuring;

private volatile boolean isFrozen;
private final AtomicLong currentThroughput = new AtomicLong(0);
private final AtomicLong currentSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);

// We index the memtable by RowPosition only for the purpose of being able
Expand Down Expand Up @@ -120,12 +120,12 @@ public Set<Object> call() throws Exception

public long getLiveSize()
{
return (long) (currentThroughput.get() * cfs.liveRatio);
return (long) (currentSize.get() * cfs.liveRatio);
}

public long getSerializedSize()
{
return currentThroughput.get();
return currentSize.get();
}

public long getOperations()
Expand Down Expand Up @@ -188,7 +188,7 @@ public void run()
deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
objects += entry.getValue().getColumnCount();
}
double newRatio = (double) deepSize / currentThroughput.get();
double newRatio = (double) deepSize / currentSize.get();

if (newRatio < MIN_SANE_LIVE_RATIO)
{
Expand Down Expand Up @@ -224,12 +224,6 @@ public void run()

private void resolve(DecoratedKey key, ColumnFamily cf)
{
currentThroughput.addAndGet(cf.dataSize());
currentOperations.addAndGet((cf.getColumnCount() == 0)
? cf.isMarkedForDelete() ? 1 : 0
: cf.getColumnCount());


ColumnFamily previous = columnFamilies.get(key);

if (previous == null)
Expand All @@ -242,7 +236,11 @@ private void resolve(DecoratedKey key, ColumnFamily cf)
previous = empty;
}

previous.addAll(cf, allocator, localCopyFunction);
long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction);
currentSize.addAndGet(sizeDelta);
currentOperations.addAndGet((cf.getColumnCount() == 0)
? cf.isMarkedForDelete() ? 1 : 0
: cf.getColumnCount());
}

// for debugging
Expand Down Expand Up @@ -272,7 +270,7 @@ private SSTableReader writeSortedContents(Future<ReplayPosition> context) throws
}
long estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
+ currentThroughput.get()) // data
+ currentSize.get()) // data
* 1.2); // bloom filter and row index overhead
SSTableReader ssTable;
// errors when creating the writer that may leave empty temp files.
Expand Down Expand Up @@ -323,7 +321,7 @@ public void runMayThrow() throws Exception
public String toString()
{
return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations);
cfs.getColumnFamilyName(), hashCode(), currentSize, getLiveSize(), currentOperations);
}

/**
Expand Down

0 comments on commit 602e383

Please sign in to comment.