Skip to content

Commit

Permalink
misc review improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd committed Mar 29, 2019
1 parent 3709ba7 commit 8320e79
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 22 deletions.
Expand Up @@ -199,7 +199,7 @@ object DataCollectorMatchers {
errors += s"Expected value '$expectedValue' at position $i, but list was too small"
}
if (values.size > expected.size)
errors += s"Expected list of maxSize ${expected.size}, but got additional elements ${values.slice(expected.size, values.size)}"
errors += s"Expected list of ${expected.size} elements, but got additional elements ${values.slice(expected.size, values.size)}"

case x =>
errors += s"Expected list but got '$x'"
Expand Down
Expand Up @@ -298,13 +298,10 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {

test("[retrieveAllAnonymized] should anonymize tokens inside queries") {
// given
execute("CALL db.stats.stop('QUERIES')").single
execute("CREATE (:User {age: 99})-[:KNOWS]->(:Buddy {p: 42})-[:WANTS]->(:Raccoon)") // create tokens

execute("CALL db.stats.collect('QUERIES')").single
execute("MATCH (:User)-[:KNOWS]->(:Buddy)-[:WANTS]->(:Raccoon) RETURN 1")
execute("MATCH ({p: 42}), ({age: 43}) RETURN 1")
execute("CALL db.stats.stop('QUERIES')").single

// when
val res = execute("CALL db.stats.retrieveAllAnonymized('myToken')")
Expand All @@ -319,10 +316,9 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
test("[retrieveAllAnonymized] should handle pre-parser options") {
// given
execute("CREATE (:User {age: 99})-[:KNOWS]->(:Buddy {p: 42})-[:WANTS]->(:Raccoon)") // create tokens
execute("CALL db.stats.collect('QUERIES')").single

execute("EXPLAIN MATCH (:User)-[:KNOWS]->(:Buddy)-[:WANTS]->(:Raccoon) RETURN 1")
execute("CYPHER 3.4 runtime=interpreted PROFILE CREATE ()")
execute("CALL db.stats.stop('QUERIES')").single

// when
val res = execute("CALL db.stats.retrieveAllAnonymized('myToken')")
Expand All @@ -338,10 +334,9 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
// given
val path = Files.createTempFile("data", ".csv")
val url = path.toUri.toURL.toString
execute("CALL db.stats.collect('QUERIES')").single

execute(s"LOAD CSV FROM '$url' AS row CREATE ({key: row[0]})")
execute(s"USING PERIODIC COMMIT 30 LOAD CSV FROM '$url' AS row CREATE ({key: row[0]})")
execute("CALL db.stats.stop('QUERIES')").single

// when
val res = execute("CALL db.stats.retrieveAllAnonymized('myToken')")
Expand Down
Expand Up @@ -19,9 +19,7 @@
*/
package org.neo4j.internal.collector;

import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

Expand Down Expand Up @@ -63,7 +61,7 @@ public Stream<RetrieveResult> retrieve( @Name( value = "section" ) String sectio
return TokensSection.retrieve( dataCollector.kernel );

case Sections.META:
return MetaSection.retrieve( null, dataCollector.kernel, dataCollector.queryCollector.nSilentQueryDrops() );
return MetaSection.retrieve( null, dataCollector.kernel, dataCollector.queryCollector.numSilentQueryDrops() );

case Sections.QUERIES:
return QueriesSection.retrieve( dataCollector.queryCollector.getData(),
Expand All @@ -87,7 +85,7 @@ public Stream<RetrieveResult> retrieveAllAnonymized( @Name( value = "graphToken"
throw new InvalidArgumentsException( "Graph token must be a non-empty string" );
}

return Stream.of( MetaSection.retrieve( graphToken, dataCollector.kernel, dataCollector.queryCollector.nSilentQueryDrops() ),
return Stream.of( MetaSection.retrieve( graphToken, dataCollector.kernel, dataCollector.queryCollector.numSilentQueryDrops() ),
GraphCountsSection.retrieve( dataCollector.kernel, Anonymizer.IDS ),
QueriesSection.retrieve( dataCollector.queryCollector.getData(),
new IdAnonymizer( transaction.tokenRead() ),
Expand Down
Expand Up @@ -45,7 +45,9 @@ private MetaSection()
{ // only static methods
}

static Stream<RetrieveResult> retrieve( String graphToken, Kernel kernel, long nSilentQueryDrops ) throws TransactionFailureException
static Stream<RetrieveResult> retrieve( String graphToken,
Kernel kernel,
long numSilentQueryDrops ) throws TransactionFailureException
{
Map<String, Object> systemData = new HashMap<>();
systemData.put( "jvmMemoryFree", Runtime.getRuntime().freeMemory() );
Expand Down Expand Up @@ -77,7 +79,7 @@ static Stream<RetrieveResult> retrieve( String graphToken, Kernel kernel, long n
systemData.put( "fileEncoding", System.getProperty( "file.encoding" ) );

Map<String, Object> internalData = new HashMap<>();
internalData.put( "numSilentQueryCollectionMisses", nSilentQueryDrops );
internalData.put( "numSilentQueryCollectionMisses", numSilentQueryDrops );

Map<String, Object> metaData = new HashMap<>();
metaData.put( "graphToken", graphToken );
Expand Down
Expand Up @@ -48,18 +48,27 @@ class QueryCollector extends CollectorStateMachine<Iterator<QuerySnapshot>> impl
private volatile boolean isCollecting;
private final RingRecentBuffer<QuerySnapshot> queries;
private final JobScheduler jobScheduler;
/**
* We retain at max 2^13 = 8192 queries in memory at any given time. This number
* was chosen as a trade-off between getting a useful amount of queries, and not
* wasting too much heap. Even with a buffer full of unique queries, the estimated
* footprint lies in tens of MBs. If the buffer is full of cached queries, the
* retained size was measured to 265 kB.
*/
private static final int QUERY_BUFFER_SIZE_IN_BITS = 13;

QueryCollector( JobScheduler jobScheduler )
{
super( true );
this.jobScheduler = jobScheduler;
isCollecting = false;
queries = new RingRecentBuffer<>( 13 );

queries = new RingRecentBuffer<>( QUERY_BUFFER_SIZE_IN_BITS );
}

long nSilentQueryDrops()
long numSilentQueryDrops()
{
return queries.nSilentQueryDrops();
return queries.numSilentQueryDrops();
}

// CollectorStateMachine
Expand Down
Expand Up @@ -53,7 +53,7 @@ public RingRecentBuffer( int bitSize )
dropEvents = new AtomicLong( 0 );
}

long nSilentQueryDrops()
long numSilentQueryDrops()
{
return dropEvents.get();
}
Expand Down
Expand Up @@ -62,7 +62,7 @@ void shouldJustWork()
}
buffer.foreach( Assertions::assertNotNull );

assertEquals( 0, buffer.nSilentQueryDrops() );
assertEquals( 0, buffer.numSilentQueryDrops() );
}

@Test
Expand Down Expand Up @@ -100,7 +100,7 @@ void shouldNotReadSameElementTwice() throws ExecutionException, InterruptedExcep
{
executor.shutdown();
}
assertEquals( 0, buffer.nSilentQueryDrops() );
assertEquals( 0, buffer.numSilentQueryDrops() );
}

@Test
Expand Down Expand Up @@ -133,7 +133,7 @@ void shouldNeverReadUnwrittenElements() throws ExecutionException, InterruptedEx
{
executor.shutdown();
}
assertEquals( 0, buffer.nSilentQueryDrops() );
assertEquals( 0, buffer.numSilentQueryDrops() );
}

@Test
Expand Down Expand Up @@ -170,7 +170,7 @@ void shouldWorkWithManyConcurrentProducers() throws ExecutionException, Interrup
{
executor.shutdown();
}
assertEquals( 0, buffer.nSilentQueryDrops() );
assertEquals( 0, buffer.numSilentQueryDrops() );
}

private <T> Runnable stress( int n, LongConsumer action )
Expand Down

0 comments on commit 8320e79

Please sign in to comment.