Skip to content

Commit

Permalink
Truncate the queryTexts and parameters in the QueryCollector...
Browse files Browse the repository at this point in the history
Otherwise the collector will hold references to these for a very long
time, preventing them from being garbage collected. This become a big
memory problem for e.g. import queries that might be very big, or
contain some very big parameters.
  • Loading branch information
fickludd committed May 20, 2019
1 parent 813b309 commit 23e494e
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 30 deletions.
Expand Up @@ -21,6 +21,7 @@ package org.neo4j.internal.collector

import java.nio.file.Files

import org.neo4j.graphdb.{Node, Path, Relationship}
import org.scalatest.matchers.{MatchResult, Matcher}

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -201,7 +202,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> "BrassLeg"))
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 2))
execute("WITH 42 AS x RETURN x")
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> List(3.1, 3.2)))
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 3.1))
execute("WITH 42 AS x RETURN x")

// when
Expand All @@ -216,7 +217,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
"invocations" -> beInvocationsInOrder(
Map("param" -> "BrassLeg"),
Map("param" -> Long.box(2)),
Map("param" -> List(3.1, 3.2))
Map("param" -> Double.box(3.1))
)
)
),
Expand All @@ -235,7 +236,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> "BrassLeg"))
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 2))
execute("WITH 42 AS x RETURN x")
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> List(3.1, 3.2)))
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 3.1))
execute("WITH 42 AS x RETURN x")

// when
Expand All @@ -259,7 +260,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> "BrassLeg"))
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 2))
execute("WITH 42 AS x RETURN x")
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> List(3.1, 3.2)))
execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 3.1))
execute("WITH 42 AS x RETURN x")
execute("WITH 42 AS x RETURN x")
execute("WITH 42 AS x RETURN x")
Expand Down Expand Up @@ -296,6 +297,71 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
assertInvalidArgument("CALL db.stats.retrieve('QUERIES', {maxInvocations: -1})") // negative integer is not fine
}

test("should limit the collected query text size") {
// given
val largeQuery = (0 until 10000).map(i => s"CREATE (n$i) ").mkString("\n")
execute(largeQuery)

// when
val res = execute("CALL db.stats.retrieve('QUERIES')").toList

// then
res should beListWithoutOrder(
beMapContaining(
"section" -> "QUERIES",
"data" -> beMapContaining(
"query" -> largeQuery.take(10000)
)
)
)
}

test("should limit the collected query parameter sizes") {
// given
val entities = execute(
"""CREATE (node:A {p: 'startNode'})-[relationship:R {p: 'rel'}]->(_:B {p: 'endNode'})
|WITH node, relationship
|MATCH path=()-->()
|RETURN node, relationship, path
""".stripMargin).single

val node = entities("node").asInstanceOf[Node]
val relationship = entities("relationship").asInstanceOf[Relationship]
val path = entities("path").asInstanceOf[Path]

val longString: String = "".padTo(200, 'x')
val query = "RETURN $param"
execute(query, params = Map("param" -> longString))
execute(query, params = Map("param" -> List(1,2,3)))
execute(query, params = Map("param" -> Map("x" -> 1)))
execute(query, params = Map("param" -> node))
execute(query, params = Map("param" -> relationship))
execute(query, params = Map("param" -> path))

// when
val res = execute("CALL db.stats.retrieve('QUERIES')").toList

println(res)

// then
res should beListWithoutOrder(
beMapContaining(
"section" -> "QUERIES",
"data" -> beMapContaining(
"query" -> query,
"invocations" -> beInvocationsInOrder(
Map("param" -> longString.take(100)),
Map("param" -> "§LIST[3]"),
Map("param" -> "§MAP[1]"),
Map("param" -> node),
Map("param" -> relationship),
Map("param" -> "§PATH[1]")
)
)
)
)
}

test("[retrieveAllAnonymized] should anonymize tokens inside queries") {
// given
execute("CREATE (:User {age: 99})-[:KNOWS]->(:Buddy {p: 42})-[:WANTS]->(:Raccoon)") // create tokens
Expand Down Expand Up @@ -413,7 +479,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
"invocations" -> beInvocationsInOrder(
"4ac156c0", // Map("user" -> "BrassLeg", "name" -> "George")
"b439d06c", // Map("user" -> 2, "name" -> "Glinda")
"e5adc9ad" // Map("user" -> List(3.1, 3.2), "name" -> "Kim")
"8e2eb26e" // Map("user" -> List(3.1, 3.2), "name" -> "Kim")
)
)
),
Expand All @@ -422,7 +488,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport {
"data" -> beMapContaining(
"query" -> "RETURN $param0, $param1, $param0 + $param1",
"invocations" -> beInvocationsInOrder(
"e5adc9ad" // Map("user" -> List(3.1, 3.2), "name" -> "Kim")
"8e2eb26e" // Map("user" -> List(3.1, 3.2), "name" -> "Kim")
)
)
)
Expand Down
Expand Up @@ -33,20 +33,14 @@
import org.neo4j.scheduler.JobScheduler;

/**
* Simple Thread-safe query collector.
* Thread-safe query collector.
*
* Note that is has several potentially not-so-nice properties:
*
* - It buffers all query data until collection is done. On high-workload systems
* this could use substantial memory
*
* - All threads that report queries on {@link QueryCollector#endSuccess(org.neo4j.kernel.api.query.ExecutingQuery)}
* contend for writing to the queue, which might cause delays before the first result on highly concurrent systems
* Delegates to RingRecentBuffer to hard limit the number of collected queries at any point in time.
*/
class QueryCollector extends CollectorStateMachine<Iterator<QuerySnapshot>> implements QueryExecutionMonitor
class QueryCollector extends CollectorStateMachine<Iterator<TruncatedQuerySnapshot>> implements QueryExecutionMonitor
{
private volatile boolean isCollecting;
private final RingRecentBuffer<QuerySnapshot> queries;
private final RingRecentBuffer<TruncatedQuerySnapshot> queries;
private final JobScheduler jobScheduler;
/**
* We retain at max 2^13 = 8192 queries in memory at any given time. This number
Expand Down Expand Up @@ -100,9 +94,9 @@ protected Result doClear()
}

@Override
protected Iterator<QuerySnapshot> doGetData()
protected Iterator<TruncatedQuerySnapshot> doGetData()
{
List<QuerySnapshot> querySnapshots = new ArrayList<>();
List<TruncatedQuerySnapshot> querySnapshots = new ArrayList<>();
queries.foreach( querySnapshots::add );
return querySnapshots.iterator();
}
Expand All @@ -119,7 +113,14 @@ public void endSuccess( ExecutingQuery query )
{
if ( isCollecting )
{
queries.produce( query.snapshot() );
QuerySnapshot snapshot = query.snapshot();
queries.produce(
new TruncatedQuerySnapshot( snapshot.queryText(),
snapshot.queryPlan(),
snapshot.queryParameters(),
snapshot.elapsedTimeMicros(),
snapshot.compilationTimeMicros(),
snapshot.startTimestampMillis() ) );
}
}
}
@@ -0,0 +1,210 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.collector;

import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.values.AnyValue;
import org.neo4j.values.SequenceValue;
import org.neo4j.values.ValueMapper;
import org.neo4j.values.storable.BooleanValue;
import org.neo4j.values.storable.DateTimeValue;
import org.neo4j.values.storable.DateValue;
import org.neo4j.values.storable.DurationValue;
import org.neo4j.values.storable.LocalDateTimeValue;
import org.neo4j.values.storable.LocalTimeValue;
import org.neo4j.values.storable.NumberValue;
import org.neo4j.values.storable.PointValue;
import org.neo4j.values.storable.TextValue;
import org.neo4j.values.storable.TimeValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue;
import org.neo4j.values.virtual.NodeValue;
import org.neo4j.values.virtual.PathValue;
import org.neo4j.values.virtual.RelationshipValue;
import org.neo4j.values.virtual.VirtualNodeValue;
import org.neo4j.values.virtual.VirtualRelationshipValue;
import org.neo4j.values.virtual.VirtualValues;

/**
* Variant of QuerySnapshot that truncates queryText and queryParameter data to limit the memory footprint of
* constant query collection. This is crucial to avoid bloating memory use for data import scenarios, and in general
* to avoid hogging lot's of memory that will be long-lived and likely tenured.
*/
class TruncatedQuerySnapshot
{
final String queryText;
final ExecutionPlanDescription queryPlan;
final MapValue queryParameters;
final Long elapsedTimeMicros;
final Long compilationTimeMicros;
final Long startTimestampMillis;

TruncatedQuerySnapshot( String queryText,
ExecutionPlanDescription queryPlan,
MapValue queryParameters,
Long elapsedTimeMicros,
Long compilationTimeMicros,
Long startTimestampMillis )
{
this.queryText = truncateQueryText( queryText, 10000 );
this.queryPlan = queryPlan;
this.queryParameters = truncateParameters( queryParameters );
this.elapsedTimeMicros = elapsedTimeMicros;
this.compilationTimeMicros = compilationTimeMicros;
this.startTimestampMillis = startTimestampMillis;
}

private static String truncateQueryText( String queryText, int maxLength )
{
return queryText.length() > maxLength ? queryText.substring( 0, maxLength ) : queryText;
}

private static MapValue truncateParameters( MapValue parameters )
{
String[] keys = new String[parameters.size()];
AnyValue[] values = new AnyValue[keys.length];

int i = 0;
for ( String key : parameters.keySet() )
{
keys[i] = key;
values[i] = parameters.get( key ).map( VALUE_TRUNCATER );
i++;
}

return VirtualValues.map( keys, values );
}

private static ValueTruncater VALUE_TRUNCATER = new ValueTruncater();

static class ValueTruncater implements ValueMapper<AnyValue>
{

@Override
public AnyValue mapPath( PathValue value )
{
return Values.stringValue( "§PATH[" + value.size() + "]" );
}

@Override
public AnyValue mapNode( VirtualNodeValue value )
{
if ( value instanceof NodeValue )
{
// Note: we do not want to keep a reference to the whole node value as it could contains a lot of data.
return VirtualValues.node( value.id() );
}
return value;
}

@Override
public AnyValue mapRelationship( VirtualRelationshipValue value )
{
if ( value instanceof RelationshipValue )
{
// Note: we do not want to keep a reference to the whole relationship value as it could contains a lot of data.
return VirtualValues.relationship( value.id() );
}
return value;
}

@Override
public AnyValue mapMap( MapValue map )
{
return Values.stringValue( "§MAP[" + map.size() + "]" );
}

@Override
public AnyValue mapNoValue()
{
return Values.NO_VALUE;
}

@Override
public AnyValue mapSequence( SequenceValue value )
{
return Values.stringValue( "§LIST[" + value.length() + "]" );
}

@Override
public AnyValue mapText( TextValue value )
{
if ( value.length() > 100 )
{
return Values.stringValue( value.stringValue().substring( 0, 100 ) );
}
return value;
}

@Override
public AnyValue mapBoolean( BooleanValue value )
{
return value;
}

@Override
public AnyValue mapNumber( NumberValue value )
{
return value;
}

@Override
public AnyValue mapDateTime( DateTimeValue value )
{
return value;
}

@Override
public AnyValue mapLocalDateTime( LocalDateTimeValue value )
{
return value;
}

@Override
public AnyValue mapDate( DateValue value )
{
return value;
}

@Override
public AnyValue mapTime( TimeValue value )
{
return value;
}

@Override
public AnyValue mapLocalTime( LocalTimeValue value )
{
return value;
}

@Override
public AnyValue mapDuration( DurationValue value )
{
return value;
}

@Override
public AnyValue mapPoint( PointValue value )
{
return value;
}
}
}

0 comments on commit 23e494e

Please sign in to comment.