Skip to content

Commit

Permalink
Implement new bounded buffer for constant query collection
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd committed Mar 29, 2019
1 parent 37f8e0c commit a9b1a0d
Show file tree
Hide file tree
Showing 4 changed files with 414 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.collector;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
* Implementation of {@link RecentBuffer} using {@link ConcurrentLinkedQueue}.
*/
public class ConcurrentLinkedQueueRecentBuffer<T> implements RecentBuffer<T>
{
private final ConcurrentLinkedQueue<T> queue;
private final int maxSize;
private final AtomicInteger size;

public ConcurrentLinkedQueueRecentBuffer( int bitSize )
{
maxSize = 1 << bitSize;
queue = new ConcurrentLinkedQueue<>();
size = new AtomicInteger( 0 );
}

/* ---- many producers ---- */

@Override
public void produce( T t )
{
queue.add( t );
int newSize = size.incrementAndGet();
if ( newSize > maxSize )
{
queue.poll();
size.decrementAndGet();
}
}

/* ---- single consumer ---- */

@Override
public void clear()
{
queue.clear();
// might go out of sync with queue here, but should be minor slippage.
// Will not accumulate leaks either, but reset on every clear.
size.set( 0 );
}

@Override
public void foreach( Consumer<T> consumer )
{
queue.forEach( consumer );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.collector;

import java.util.function.Consumer;

/**
* Bounded buffer which holds the last n elements. When the buffer is full, each
* produce will replace the elements in the buffer that was added first.
*
* This collection thread-safely allows
* - multiple threads concurrently calling `produce`
* - serialized calling of `clear` and `foreach`
*
* @param <T> type of elements in this buffer.
*/
public interface RecentBuffer<T>
{
/**
* Produce element into the buffer.
*
* @param t element to produce
*/
void produce( T t );

/**
* Clear all elements from the buffer.
*/
void clear();

/**
* Iterate over all elements in the buffer. No elements are removed from the buffer.
*
* @param consumer consumer to apply on each element
*/
void foreach( Consumer<T> consumer );
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.collector;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/**
* Implementation of {@link RecentBuffer} using ring buffer.
*/
public class RingRecentBuffer<T> implements RecentBuffer<T>
{
private final int size;
private final int mask;
private final VolatileRef<T>[] data;

private final AtomicLong produceCount;
private final AtomicLong consumeCount;

public RingRecentBuffer( int bitSize )
{
size = 1 << bitSize;
mask = size - 1;

//noinspection unchecked
data = new VolatileRef[size];
for ( int i = 0; i < size; i++ )
{
data[i] = new VolatileRef<>();
}

produceCount = new AtomicLong( 0 );
consumeCount = new AtomicLong( 0 );
}

/* ---- many producers ---- */

@Override
public void produce( T t )
{
long produceNumber = produceCount.getAndIncrement();
int offset = (int) (produceNumber & mask);
VolatileRef<T> volatileRef = data[offset];
volatileRef.ref = t;
volatileRef.produceNumber = produceNumber;
}

/* ---- single consumer ---- */

@Override
public void clear()
{
for ( VolatileRef<T> volatileRef : data )
{
volatileRef.ref = null;
}
long snapshotProduce = produceCount.get();
consumeCount.set( snapshotProduce );
}

@Override
public void foreach( Consumer<T> consumer )
{
long snapshotProduce = produceCount.get();
long snapshotConsume = Math.max( consumeCount.get(), snapshotProduce - size );
for ( long i = snapshotConsume; i < snapshotProduce; i++ )
{
int offset = (int) (i & mask);
VolatileRef<T> volatileRef = data[offset];
if ( volatileRef.produceNumber < i )
{
return;
}
consumer.accept( volatileRef.ref );
}
}

private static class VolatileRef<T>
{
private volatile T ref;
private volatile long produceNumber;
}
}

0 comments on commit a9b1a0d

Please sign in to comment.