Skip to content

Commit

Permalink
Resolve concurrency perf issue with RecentK
Browse files Browse the repository at this point in the history
  • Loading branch information
jakewins committed Jun 16, 2015
1 parent bbd40d7 commit b6c5b2b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 90 deletions.
Expand Up @@ -20,31 +20,28 @@
package org.neo4j.concurrent;

import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* Tracks an (approximate) set of recently seen unique elements in a stream, based on a concurrent LRU implementation.
*
* This class is thread safe. For the common case of items that are recently seen being seen again, this class is
* lock free.
*
* @param <Type> the entry type stored
*/
public class RecentK<Type> implements Iterable<Type>
{
private final int maxItems;

/**
* Maps items to their slots in the LRU queue, the keys in here is what we use as the source of truth
* for what are the most recent items.
* Source of truth - the keys in this map are the "recent set". For each value, we also track a counter for
* the number of times we've seen it, which is used to evict older and less used values.
*/
private final ConcurrentHashMap<Type, Slot> index = new ConcurrentHashMap<>();

/** Most recently seen item */
// guarded by synchronized(this)
private Slot head;

/** Least recently used item */
// guarded by synchronized(this)
private Slot tail;
private final ConcurrentHashMap<Type, AtomicLong> recentItems = new ConcurrentHashMap<>();

/**
* @param maxItems is the size of the item set to track
Expand All @@ -57,112 +54,87 @@ public RecentK( int maxItems )
/**
* @param item a new item to the tracked set.
*/
public synchronized void add( Type item )
public void add( Type item )
{
Slot slot = index.get( item );
if(slot != null)
{
// Known item, move to head of queue
markAsMostRecent( slot );
}
else if( index.size() >= maxItems )
AtomicLong counter = recentItems.get( item );
if(counter != null)
{
// New item, queue full
removeLeastRecent();
addNewItem( item );
counter.incrementAndGet();
}
else
{
// New item, not yet reached max size
addNewItem( item );
// Double-checked locking ahead: Check if there is space for our item
if( recentItems.size() >= maxItems )
{
// If not, synchronize and check again (this will happen if there is > maxItems in the current set)
synchronized ( recentItems )
{
// Proper check under lock, make space in the set for our new item
while( recentItems.size() >= maxItems )
{
removeItemWithLowestCount();
}

halveCounts();
recentItems.putIfAbsent( item, new AtomicLong( 1 ) );
}
}
else
{
// There were < maxItems in the set. This is racy as multiple clients may have hit this branch
// simultaneously. We accept going above max items here. The set will recover next time an item
// is added, since the synchronized block above will bring the set to maxItems items again.
recentItems.putIfAbsent( item, new AtomicLong( 1 ) );
}
}
}

private void removeLeastRecent()
{
// assumes synchronized(this)
index.remove( tail.item );

if( head == tail ) // can happen if maxSize = 1
{
head = tail = null;
}
else
{
tail = tail.prev;
tail.next = null;
}
}

private void addNewItem( Type item )
/**
* In order to give lower-and-lower priority to keys we've seen a lot in the past, but don't see much anymore,
* we cut all key counts in half after we've run an eviction cycle.
*/
private void halveCounts()
{
// assumes synchronized(this)

// Create a new slot
Slot slot = new Slot( item );
index.put( item, slot );

// Add it as head
if( head != null)
{
slot.next = head;
head.prev = slot;
head = slot;
}
else
for ( AtomicLong count : recentItems.values() )
{
head = slot;
tail = slot;
long prev, next;
do {
prev = count.get();
next = Math.max( prev / 2, 1 );
} while (!count.compareAndSet(prev, next));

}
}

private void markAsMostRecent( Slot slot )
private void removeItemWithLowestCount()
{
// assumes synchronized(this)
if(slot == head)
Type lowestCountKey = null;
long lowestCount = Long.MAX_VALUE;
for ( Map.Entry<Type,AtomicLong> entry : recentItems.entrySet() )
{
return;
long currentCount = entry.getValue().get();
if( currentCount < lowestCount)
{
lowestCount = currentCount;
lowestCountKey = entry.getKey();
}
}

// Mend the hole we're about to make in the chain
if(slot == tail)
{
tail = slot.prev;
slot.prev.next = slot.next;
}
else
if( lowestCountKey != null )
{
slot.prev.next = slot.next;
slot.next.prev = slot.prev;
recentItems.remove( lowestCountKey );
}

// Make head entry
slot.prev = null;
slot.next = head;
head = slot;
}

public Set<Type> recentItems()
{
return index.keySet();
return recentItems.keySet();
}

@Override
public Iterator<Type> iterator()
{
return recentItems().iterator();
}

class Slot
{
private final Type item;

private Slot prev;
private Slot next;

Slot( Type item )
{
this.item = item;
}
}
}
Expand Up @@ -26,7 +26,6 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;

public class RecentKTest
Expand All @@ -37,7 +36,7 @@ public void shouldEvictOnOverflow() throws Throwable
// When & Then
assertThat( appendSequence( 1, 1, 1, 1, 1, 1, 1 ), yieldsSet( 1 ));
assertThat( appendSequence( 1, 2, 3, 4, 1, 1, 1 ), yieldsSet( 1, 3, 4 ));
assertThat( appendSequence( 1, 2, 6, 4, 1, 2, 2, 2, 5, 5 ), yieldsSet( 1, 2, 5 ));
assertThat( appendSequence( 1, 1, 1, 2, 2, 6, 4, 4, 1, 1, 2, 2, 2, 5, 5 ), yieldsSet( 1, 2, 5 ));
}

private Matcher<RecentK<Integer>> yieldsSet( final Integer ... expectedItems )
Expand Down

0 comments on commit b6c5b2b

Please sign in to comment.