Skip to content

Commit

Permalink
Move rrdb files in its own directory
Browse files Browse the repository at this point in the history
This change is needed since the old default directory was the store
directory, but this was causing issues when branching data, for
example we failed to delete old branched data due to rrd files still
in use.  Moving the rrd files out of that directory will solve this
problem.  The new default directory will be 'data/rrd'.

This adds also a check for the availability of database before
allowing the rrd sampler to sample nodes/relationships/properties.
This should minimize the sampling failures when the db is down due to
cluster reconfigurations.
  • Loading branch information
davidegrohmann committed Oct 23, 2015
1 parent 338f5ed commit 8510a40
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 382 deletions.
147 changes: 45 additions & 102 deletions community/server/src/main/java/org/neo4j/server/rrd/RrdFactory.java
Expand Up @@ -19,22 +19,20 @@
*/ */
package org.neo4j.server.rrd; package org.neo4j.server.rrd;


import org.rrd4j.ConsolFun;
import org.rrd4j.core.DsDef;
import org.rrd4j.core.RrdDb;
import org.rrd4j.core.RrdDef;
import org.rrd4j.core.RrdToolkit;

import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


import org.rrd4j.ConsolFun; import org.neo4j.kernel.AvailabilityGuard;
import org.rrd4j.core.DsDef;
import org.rrd4j.core.RrdDb;
import org.rrd4j.core.RrdDef;
import org.rrd4j.core.RrdToolkit;

import org.neo4j.io.fs.FileUtils;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.InternalAbstractGraphDatabase;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.state.NeoStoreProvider; import org.neo4j.kernel.impl.transaction.state.NeoStoreProvider;
import org.neo4j.kernel.logging.ConsoleLogger; import org.neo4j.kernel.logging.ConsoleLogger;
Expand All @@ -45,14 +43,14 @@
import org.neo4j.server.rrd.sampler.NodeIdsInUseSampleable; import org.neo4j.server.rrd.sampler.NodeIdsInUseSampleable;
import org.neo4j.server.rrd.sampler.PropertyCountSampleable; import org.neo4j.server.rrd.sampler.PropertyCountSampleable;
import org.neo4j.server.rrd.sampler.RelationshipCountSampleable; import org.neo4j.server.rrd.sampler.RelationshipCountSampleable;
import org.neo4j.server.web.ServerInternalSettings;


import static java.lang.Double.NaN; import static java.lang.Double.NaN;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;

import static org.rrd4j.ConsolFun.AVERAGE; import static org.rrd4j.ConsolFun.AVERAGE;
import static org.rrd4j.ConsolFun.MAX; import static org.rrd4j.ConsolFun.MAX;
import static org.rrd4j.ConsolFun.MIN; import static org.rrd4j.ConsolFun.MIN;
Expand All @@ -71,28 +69,36 @@ public RrdFactory( Config config, Logging logging )
this.log = logging.getConsoleLog( getClass() ); this.log = logging.getConsoleLog( getClass() );
} }


public org.neo4j.server.database.RrdDbWrapper createRrdDbAndSampler( final Database db, JobScheduler scheduler ) throws IOException public org.neo4j.server.database.RrdDbWrapper createRrdDbAndSampler( final Database db, JobScheduler scheduler )
throws IOException
{ {
NeoStoreProvider neoStore = db.getGraph().getDependencyResolver().resolveDependency( NeoStoreProvider.class ); NeoStoreProvider neoStore = db.getGraph().getDependencyResolver().resolveDependency( NeoStoreProvider.class );
AvailabilityGuard guard = db.getGraph().getDependencyResolver().resolveDependency( AvailabilityGuard.class );


Sampleable[] primitives = { Sampleable[] primitives = {
new NodeIdsInUseSampleable( neoStore ), new NodeIdsInUseSampleable( neoStore, guard ),
new PropertyCountSampleable( neoStore ), new PropertyCountSampleable( neoStore, guard ),
new RelationshipCountSampleable( neoStore ) new RelationshipCountSampleable( neoStore, guard )
}; };


Sampleable[] usage = {}; Sampleable[] usage = {};


File oldRrdFile = new File( db.getGraph().getStoreDir(), "rrd" );
if ( oldRrdFile.exists() )
{
oldRrdFile.delete();
}

File rrdFile = config.get( ServerSettings.rrdb_location ); File rrdFile = config.get( ServerSettings.rrdb_location );
if( rrdFile == null || !rrdFile.exists() ) if ( rrdFile == null )
{ {
Map<String, String> params = config.getParams(); Map<String,String> params = config.getParams();
params.put( ServerSettings.rrdb_location.name(), getDefaultRrdFile( db.getGraph() ) ); params.put( ServerSettings.rrdb_location.name(), getDefaultRrdFile( config ).getAbsolutePath() );
config.applyChanges( params ); config.applyChanges( params );
rrdFile = config.get( ServerSettings.rrdb_location ); rrdFile = config.get( ServerSettings.rrdb_location );
} }

final RrdDbWrapper rrdb = createRrdb( rrdFile, isEphemereal( db.getGraph() ), join( primitives, usage ) ); final RrdDbWrapper rrdb = createRrdb( rrdFile, join( primitives, usage ) );


scheduler.scheduleAtFixedRate( scheduler.scheduleAtFixedRate(
new RrdJob( new RrdSamplerImpl( rrdb.get(), primitives ) ), new RrdJob( new RrdSamplerImpl( rrdb.get(), primitives ) ),
Expand All @@ -114,80 +120,46 @@ private Sampleable[] join( Sampleable[]... sampleables )
return result.toArray( new Sampleable[result.size()] ); return result.toArray( new Sampleable[result.size()] );
} }


private String getDefaultRrdFile( GraphDatabaseAPI db ) throws IOException private File getDefaultRrdFile( Config config ) throws IOException
{
return isEphemereal( db ) ? tempRrdFile() : new File( db.getStoreDir(), "rrd" ).getAbsolutePath();
}

protected String tempRrdFile() throws IOException
{
final File tempFile = File.createTempFile( "neo4j", "rrd" );
tempFile.delete();
tempFile.mkdir();

Runtime.getRuntime().addShutdownHook( new Thread()
{
@Override
public void run()
{
try
{
FileUtils.deleteRecursively(tempFile);
}
catch ( IOException e )
{
// Ignore
}
}
});

return tempFile.getAbsolutePath();
}

private boolean isEphemereal( GraphDatabaseAPI db )
{ {
Config config = db.getDependencyResolver().resolveDependency( Config.class ); String path = config.get( ServerInternalSettings.rrd_store ).getAbsolutePath();

File rrdStore = new File( path );
if ( config == null ) if ( !rrdStore.exists() && !rrdStore.mkdirs() )
{ {
return false; throw new IOException( "Cannot create directory " + rrdStore );
}
else
{
Boolean ephemeral = config.get( InternalAbstractGraphDatabase.Configuration.ephemeral );
return ephemeral != null && ephemeral;
} }
return rrdStore;
} }


protected RrdDbWrapper createRrdb( File rrdFile, boolean ephemeral, Sampleable... sampleables ) protected RrdDbWrapper createRrdb( File rrdFile, Sampleable... sampleables )
{ {
if ( rrdFile.exists() ) if ( rrdFile.exists() )
{ {
try try
{ {
if ( !validateStepSize( rrdFile ) ) if ( !validateStepSize( rrdFile ) )
{ {
return recreateArchive( rrdFile, ephemeral, sampleables ); return recreateArchive( rrdFile, sampleables );
} }


Sampleable[] missing = checkDataSources( rrdFile.getAbsolutePath(), sampleables ); Sampleable[] missing = checkDataSources( rrdFile.getAbsolutePath(), sampleables );
if ( missing.length > 0 ) if ( missing.length > 0 )
{ {
updateDataSources( rrdFile.getAbsolutePath(), missing ); updateDataSources( rrdFile.getAbsolutePath(), missing );
} }
return wrap( new RrdDb( rrdFile.getAbsolutePath() ), ephemeral ); return new RrdDbWrapper.Plain( new RrdDb( rrdFile.getAbsolutePath() ) );
} }
catch ( IOException e ) catch ( IOException e )
{ {
// RRD file may have become corrupt // RRD file may have become corrupt
log.error( "Unable to open rrd store, attempting to recreate it", e ); log.error( "Unable to open rrd store, attempting to recreate it", e );
return recreateArchive( rrdFile, ephemeral, sampleables ); return recreateArchive( rrdFile, sampleables );
} }
catch ( IllegalArgumentException e ) catch ( IllegalArgumentException e )
{ {
// RRD file may have become corrupt // RRD file may have become corrupt
log.error( "Unable to open rrd store, attempting to recreate it", e ); log.error( "Unable to open rrd store, attempting to recreate it", e );
return recreateArchive( rrdFile, ephemeral, sampleables ); return recreateArchive( rrdFile, sampleables );
} }
} }
else else
Expand All @@ -197,7 +169,7 @@ protected RrdDbWrapper createRrdb( File rrdFile, boolean ephemeral, Sampleable..
addArchives( rrdDef ); addArchives( rrdDef );
try try
{ {
return wrap( new RrdDb( rrdDef ), ephemeral ); return new RrdDbWrapper.Plain( new RrdDb( rrdDef ) );
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand All @@ -207,72 +179,43 @@ protected RrdDbWrapper createRrdb( File rrdFile, boolean ephemeral, Sampleable..
} }
} }


private RrdDbWrapper wrap( RrdDb db, boolean ephemeral ) throws IOException
{
return ephemeral ? cleaningRrdDb( db ) : new RrdDbWrapper.Plain( db );
}

private RrdDbWrapper cleaningRrdDb( final RrdDb db )
{
return new RrdDbWrapper()
{
@Override
public RrdDb get()
{
return db;
}

@Override
public void close() throws IOException
{
try
{
db.close();
}
finally
{
new File( db.getPath() ).delete();
}
}
};
}


private boolean validateStepSize( File rrdFile ) throws IOException private boolean validateStepSize( File rrdFile ) throws IOException
{ {
RrdDb r = null; RrdDb r = null;
try try
{ {
r = new RrdDb( rrdFile.getAbsolutePath(), true ); r = new RrdDb( rrdFile.getAbsolutePath(), true );
return ( r.getRrdDef().getStep() == STEP_SIZE ); return (r.getRrdDef().getStep() == STEP_SIZE);
} }
finally finally
{ {
if( r != null ) if ( r != null )
{ {
r.close(); r.close();
} }
} }
} }


private RrdDbWrapper recreateArchive( File rrdFile, boolean ephemeral, Sampleable[] sampleables ) private RrdDbWrapper recreateArchive( File rrdFile, Sampleable[] sampleables )
{ {
File file = new File( rrdFile.getParentFile(), File file = new File( rrdFile.getParentFile(),
rrdFile.getName() + "-invalid-" + System.currentTimeMillis() ); rrdFile.getName() + "-invalid-" + System.currentTimeMillis() );


if ( rrdFile.renameTo( file ) ) if ( rrdFile.renameTo( file ) )
{ {
log.error( "current RRDB is invalid, renamed it to %s", file.getAbsolutePath() ); log.error( "current RRDB is invalid, renamed it to %s", file.getAbsolutePath() );
return createRrdb( rrdFile, ephemeral, sampleables ); return createRrdb( rrdFile, sampleables );
} }


throw new RuntimeException( "RRD file ['" + rrdFile.getAbsolutePath() throw new RuntimeException( "RRD file ['" + rrdFile.getAbsolutePath()
+ "'] is invalid, but I do not have write permissions to recreate it." ); + "'] is invalid, but I do not have write permissions to recreate it." );
} }


private static Sampleable[] checkDataSources( String rrdPath, Sampleable[] sampleables ) throws IOException private static Sampleable[] checkDataSources( String rrdPath, Sampleable[] sampleables ) throws IOException
{ {
RrdDb rrdDb = new RrdDb( rrdPath, true ); RrdDb rrdDb = new RrdDb( rrdPath, true );
List<Sampleable> missing = new ArrayList<Sampleable>(); List<Sampleable> missing = new ArrayList<>();
for ( Sampleable sampleable : sampleables ) for ( Sampleable sampleable : sampleables )
{ {
if ( rrdDb.getDatasource( sampleable.getName() ) == null ) if ( rrdDb.getDatasource( sampleable.getName() ) == null )
Expand Down Expand Up @@ -314,8 +257,8 @@ private void addArchives( RrdDef rrdDef )
private void addArchive( RrdDef rrdDef, ConsolFun fun, long length, long resolution ) private void addArchive( RrdDef rrdDef, ConsolFun fun, long length, long resolution )
{ {
rrdDef.addArchive( fun, 0.2, rrdDef.addArchive( fun, 0.2,
(int) ( resolution * STEP_SIZE ), (int) (resolution * STEP_SIZE),
(int) ( length / ( resolution * STEP_SIZE ) ) ); (int) (length / (resolution * STEP_SIZE)) );
} }


private void defineDataSources( RrdDef rrdDef, Sampleable[] sampleables ) private void defineDataSources( RrdDef rrdDef, Sampleable[] sampleables )
Expand Down
Expand Up @@ -19,12 +19,12 @@
*/ */
package org.neo4j.server.rrd; package org.neo4j.server.rrd;


import java.io.IOException;

import org.rrd4j.core.RrdDb; import org.rrd4j.core.RrdDb;
import org.rrd4j.core.Sample; import org.rrd4j.core.Sample;
import org.rrd4j.core.Util; import org.rrd4j.core.Util;


import java.io.IOException;

/** /**
* Manages sampling the state of the database and storing the samples in a round * Manages sampling the state of the database and storing the samples in a round
* robin database instance. * robin database instance.
Expand Down Expand Up @@ -61,10 +61,6 @@ protected RrdSamplerImpl(RrdDb rrdDb, Sampleable... samplables) {


sample.update(); sample.update();
} }
catch ( UnableToSampleException e )
{
e.printStackTrace();
}
catch ( IOException e ) catch ( IOException e )
{ {
throw new RuntimeException( "IO Error trying to access round robin database path. See nested exception.", e ); throw new RuntimeException( "IO Error trying to access round robin database path. See nested exception.", e );
Expand Down
Expand Up @@ -21,29 +21,50 @@


import org.rrd4j.DsType; import org.rrd4j.DsType;


import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.impl.store.NeoStore; import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.transaction.state.NeoStoreProvider; import org.neo4j.kernel.impl.transaction.state.NeoStoreProvider;
import org.neo4j.server.rrd.Sampleable; import org.neo4j.server.rrd.Sampleable;


public abstract class DatabasePrimitivesSampleableBase implements Sampleable public abstract class DatabasePrimitivesSampleableBase implements Sampleable
{ {
private final NeoStoreProvider neoStore; private final NeoStoreProvider neoStore;
private final AvailabilityGuard guard;
private double lastReadValue = 0d;


public DatabasePrimitivesSampleableBase( NeoStoreProvider neoStore ) public DatabasePrimitivesSampleableBase( NeoStoreProvider neoStore, AvailabilityGuard guard )
{ {
if( neoStore == null ) if ( neoStore == null )
{ {
throw new RuntimeException( "Database sampler needs a NeoStore to work, was given null." ); throw new RuntimeException( "Database sampler needs a NeoStore to work, was given null." );
} }
this.neoStore = neoStore; this.neoStore = neoStore;

this.guard = guard;
} }


protected NeoStore getNeoStore() @Override
public double getValue()
{ {
return neoStore.evaluate(); if ( guard.isAvailable(0) )
{
try
{
lastReadValue = readValue( neoStore.evaluate() );
}
catch ( Exception e )
{
/*
* oh well, this is unfortunate, perhaps the db went down after the check, so let's ignore this problem
* and return the old value, we'll sample again when the db is online again
*/
}
}

return lastReadValue;
} }


protected abstract double readValue( NeoStore neoStore );

@Override @Override
public DsType getType() public DsType getType()
{ {
Expand Down

0 comments on commit 8510a40

Please sign in to comment.