Skip to content

Commit

Permalink
Improvements to LinearHistoryPageCacheTracer
Browse files Browse the repository at this point in the history
* Significantly improve the print-out speed of large histories by buffering
  output to reduce the system call overhead on non-buffered PrintStreams.
* Fix a couple of formatting errors where two events could end up on the same
  line.
* Always record the PageSwapper in eviction events, even when the flush fails.
  • Loading branch information
chrisvest committed Mar 22, 2015
1 parent a6dcf7b commit 6964aaa
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 21 deletions.
Expand Up @@ -455,20 +455,19 @@ public void evict( EvictionEvent evictionEvent ) throws IOException
long filePageId = this.filePageId;
evictionEvent.setCachePageId( getCachePageId() );
evictionEvent.setFilePageId( filePageId );
PageSwapper swapper = this.swapper;
evictionEvent.setSwapper( swapper );

flush( evictionEvent.flushEventOpportunity() );
UnsafeUtil.setMemory( pointer, getCachePageSize(), (byte) 0 );
this.filePageId = PageCursor.UNBOUND_PAGE_ID;

PageSwapper swapper = this.swapper;
this.swapper = null;

if ( swapper != null )
{
// The swapper can be null if the last page fault
// that page threw an exception.
swapper.evicted( filePageId, this );
evictionEvent.setSwapper( swapper );
}
}

Expand Down
Expand Up @@ -19,14 +19,18 @@
*/
package org.neo4j.test;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StringReader;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.io.pagecache.PageSwapper;
Expand All @@ -50,6 +54,28 @@ public final class LinearHistoryPageCacheTracer implements PageCacheTracer
{
private final AtomicReference<HEvent> history = new AtomicReference<>();

// The output buffering mechanics are pre-allocated in case we have to deal with low-memory situations.
// The output switching is guarded by the monitor lock on the LinearHistoryPageCacheTracer instance.
// The class name cache is similarly guarded the monitor lock. In short, only a single thread can print history
// at a time.
private final SwitchableBufferedOutputStream bufferOut = new SwitchableBufferedOutputStream();
private final PrintStream out = new PrintStream( bufferOut );
private final Map<Class<?>, String> classSimpleNameCache = new IdentityHashMap<>();

private static class SwitchableBufferedOutputStream extends BufferedOutputStream
{

public SwitchableBufferedOutputStream()
{
super( null ); // No output target by default. This is changed in printHistory.
}

public void setOut( OutputStream out )
{
super.out = out;
}
}

private final HEvent end = new HEvent()
{
@Override
Expand Down Expand Up @@ -78,7 +104,7 @@ public final void print( PrintStream out, String exceptionLinePrefix )
{
if ( getClass() == EndHEvent.class )
{
out.append( '-' );
out.print( '-' );
}
out.print( getClass().getSimpleName() );
out.print( '[' );
Expand Down Expand Up @@ -143,7 +169,14 @@ void printBody( PrintStream out, String exceptionLinePrefix )
out.print( ", elapsedMicros:" );
out.print( (time - event.time) / 1000 );
out.print( ", endOf:" );
out.print( event.getClass().getSimpleName() );
Class<? extends IntervalHEven> eventClass = event.getClass();
String className = classSimpleNameCache.get( eventClass );
if ( className == null )
{
className = eventClass.getSimpleName();
classSimpleNameCache.put( eventClass, className );
}
out.print( className );
}
}

Expand Down Expand Up @@ -216,7 +249,7 @@ public void setFilePageId( long filePageId )
@Override
public void setSwapper( PageSwapper swapper )
{
file = swapper.file();
file = swapper == null? null : swapper.file();
}

@Override
Expand All @@ -240,7 +273,7 @@ public void setCachePageId( int cachePageId )
@Override
public FlushEvent beginFlush( long filePageId, int cachePageId, PageSwapper swapper )
{
return add( new FlushHEvent( filePageId, cachePageId, swapper, this ) );
return add( new FlushHEvent( filePageId, cachePageId, swapper ) );
}

@Override
Expand All @@ -260,16 +293,14 @@ private class FlushHEvent extends IntervalHEven implements FlushEvent
private long filePageId;
private int cachePageId;
private File file;
private HEvent cause;
private int bytesWritten;
private IOException exception;

public FlushHEvent( long filePageId, int cachePageId, PageSwapper swapper, HEvent cause )
public FlushHEvent( long filePageId, int cachePageId, PageSwapper swapper )
{
this.filePageId = filePageId;
this.cachePageId = cachePageId;
this.file = swapper.file();
this.cause = cause;
}

@Override
Expand Down Expand Up @@ -328,7 +359,7 @@ public void setCachePageId( int cachePageId )
@Override
public PageFaultEvent beginPageFault()
{
return add( new PageFaultHEvent( this ) );
return add( new PageFaultHEvent() );
}

@Override
Expand All @@ -352,17 +383,11 @@ void printBody( PrintStream out, String exceptionLinePrefix )

private class PageFaultHEvent extends IntervalHEven implements PageFaultEvent
{
private PinHEvent cause;
private int bytesRead;
private int cachePageId;
private boolean parked;
private Throwable exception;

public PageFaultHEvent( PinHEvent cause )
{
this.cause = cause;
}

@Override
public void addBytesRead( int bytes )
{
Expand Down Expand Up @@ -425,7 +450,7 @@ public FlushEventOpportunity flushEventOpportunity()
@Override
public FlushEvent beginFlush( long filePageId, int cachePageId, PageSwapper swapper )
{
return add( new FlushHEvent( filePageId, cachePageId, swapper, this ) );
return add( new FlushHEvent( filePageId, cachePageId, swapper ) );
}

@Override
Expand All @@ -442,8 +467,9 @@ <E extends HEvent> E add( E event )
return event;
}

public void printHistory( PrintStream out )
public synchronized void printHistory( PrintStream outputStream )
{
bufferOut.setOut( outputStream );
HEvent events = history.getAndSet( null );
if ( events == null )
{
Expand All @@ -469,9 +495,9 @@ public void printHistory( PrintStream out )
concurrentIntervals.remove( idx );
if ( left > 0 )
{
out.println();
putcs( out, '|', idx );
putcs( out, '/', left );
out.println();
}
}
else if ( events instanceof IntervalHEven )
Expand All @@ -480,16 +506,17 @@ else if ( events instanceof IntervalHEven )
out.print( "+ " );
events.print( out, exceptionLinePrefix );
concurrentIntervals.add( events );
out.println();
}
else
{
putcs( out, '|', concurrentIntervals.size() );
out.print( "> " );
events.print( out, exceptionLinePrefix );
}
out.println();
events = events.prev;
}
out.flush();
}

private String exceptionLinePrefix( int size )
Expand Down
@@ -0,0 +1,112 @@
/**
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.test;

import org.junit.Ignore;
import org.junit.Test;

import java.io.File;
import java.util.concurrent.ThreadLocalRandom;

import org.neo4j.adversaries.RandomAdversary;
import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PageSwapperFactory;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.io.pagecache.impl.SingleFilePageSwapperFactory;
import org.neo4j.io.pagecache.impl.muninn.MuninnPageCache;

import static org.neo4j.io.pagecache.PagedFile.PF_EXCLUSIVE_LOCK;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_LOCK;

public class LinearHistoryPageCacheTracerTest
{
@Ignore( "This test is only here for checking that the output from the LinearHistoryPageCacheTracer looks good. " +
"This is pretty subjective and requires manual inspection. Therefore there's no point in running it " +
"automatically in all our builds. Instead, run it as needed when you make changes to the printout code." )
@Test
public void makeSomeTestOutput() throws Exception
{
RandomAdversary adversary = new RandomAdversary( 0.1, 0.1, 0.0 );
adversary.setProbabilityFactor( 0.0 );
EphemeralFileSystemAbstraction fs = new EphemeralFileSystemAbstraction();
AdversarialFileSystemAbstraction afs = new AdversarialFileSystemAbstraction( adversary, fs );
PageSwapperFactory swapperFactory = new SingleFilePageSwapperFactory( afs );
int maxPages = 10;
int cachePageSize = 8192;
LinearHistoryPageCacheTracer tracer = new LinearHistoryPageCacheTracer();

File fileA = new File( "a" );
File fileB = new File( "b" );
fs.open( fileA, "rw" ).close();
fs.open( fileB, "rw" ).close();

try ( MuninnPageCache cache = new MuninnPageCache( swapperFactory, maxPages, cachePageSize, tracer );
PagedFile pfA = cache.map( fileA, 8192 );
PagedFile pfB = cache.map( fileB, 8192 ) )
{
adversary.setProbabilityFactor( 1.0 );
ThreadLocalRandom rng = ThreadLocalRandom.current();
for ( int i = 0; i < 200; i++ )
{
try
{
boolean readOnly = rng.nextBoolean();
int flags = readOnly ? PF_SHARED_LOCK : PF_EXCLUSIVE_LOCK;
int startPage = rng.nextInt( 0, 10 );
int iterations = rng.nextInt( 1, 10 );
PagedFile file = rng.nextBoolean()? pfA : pfB;
try ( PageCursor cursor = file.io( startPage, flags ) )
{
for ( int j = 0; j < iterations; j++ )
{
cursor.next();
Thread.sleep( 1 );
if ( !readOnly )
{
for ( int k = 0; k < 8192 / 4; k++ )
{
cursor.putInt( rng.nextInt() );
}
}
}
}
if ( rng.nextDouble() < 0.1 )
{
file.flush();
}
else if ( rng.nextBoolean() )
{
cache.flush();
}
}
catch ( Throwable ignore )
{
}
}

// Don't fail when we close or unmap.
adversary.setProbabilityFactor( 0.0 );
}

tracer.printHistory( System.out );
}
}

0 comments on commit 6964aaa

Please sign in to comment.