From 23e494e048a63a227daaf4a84ef8e663671414f6 Mon Sep 17 00:00:00 2001 From: fickludd Date: Tue, 14 May 2019 12:09:34 +0200 Subject: [PATCH] Truncate the queryTexts and parameters in the QueryCollector... Otherwise the collector will hold references to these for a very long time, preventing them from being garbage collected. This become a big memory problem for e.g. import queries that might be very big, or contain some very big parameters. --- .../DataCollectorQueriesAcceptanceTest.scala | 78 ++++++- .../internal/collector/QueryCollector.java | 27 +-- .../collector/TruncatedQuerySnapshot.java | 210 ++++++++++++++++++ .../internal/collector/QueriesSection.scala | 20 +- 4 files changed, 305 insertions(+), 30 deletions(-) create mode 100644 community/data-collector/src/main/java/org/neo4j/internal/collector/TruncatedQuerySnapshot.java diff --git a/community/community-it/cypher-it/src/test/scala/org/neo4j/internal/collector/DataCollectorQueriesAcceptanceTest.scala b/community/community-it/cypher-it/src/test/scala/org/neo4j/internal/collector/DataCollectorQueriesAcceptanceTest.scala index 7508b38e4905..40fd59f0e71e 100644 --- a/community/community-it/cypher-it/src/test/scala/org/neo4j/internal/collector/DataCollectorQueriesAcceptanceTest.scala +++ b/community/community-it/cypher-it/src/test/scala/org/neo4j/internal/collector/DataCollectorQueriesAcceptanceTest.scala @@ -21,6 +21,7 @@ package org.neo4j.internal.collector import java.nio.file.Files +import org.neo4j.graphdb.{Node, Path, Relationship} import org.scalatest.matchers.{MatchResult, Matcher} import scala.collection.mutable.ArrayBuffer @@ -201,7 +202,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport { execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> "BrassLeg")) execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 2)) execute("WITH 42 AS x RETURN x") - execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> List(3.1, 3.2))) + execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 3.1)) execute("WITH 42 AS x RETURN x") // when @@ -216,7 +217,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport { "invocations" -> beInvocationsInOrder( Map("param" -> "BrassLeg"), Map("param" -> Long.box(2)), - Map("param" -> List(3.1, 3.2)) + Map("param" -> Double.box(3.1)) ) ) ), @@ -235,7 +236,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport { execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> "BrassLeg")) execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 2)) execute("WITH 42 AS x RETURN x") - execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> List(3.1, 3.2))) + execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 3.1)) execute("WITH 42 AS x RETURN x") // when @@ -259,7 +260,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport { execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> "BrassLeg")) execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 2)) execute("WITH 42 AS x RETURN x") - execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> List(3.1, 3.2))) + execute("MATCH (n {p: $param}) RETURN count(n)", Map("param" -> 3.1)) execute("WITH 42 AS x RETURN x") execute("WITH 42 AS x RETURN x") execute("WITH 42 AS x RETURN x") @@ -296,6 +297,71 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport { assertInvalidArgument("CALL db.stats.retrieve('QUERIES', {maxInvocations: -1})") // negative integer is not fine } + test("should limit the collected query text size") { + // given + val largeQuery = (0 until 10000).map(i => s"CREATE (n$i) ").mkString("\n") + execute(largeQuery) + + // when + val res = execute("CALL db.stats.retrieve('QUERIES')").toList + + // then + res should beListWithoutOrder( + beMapContaining( + "section" -> "QUERIES", + "data" -> beMapContaining( + "query" -> largeQuery.take(10000) + ) + ) + ) + } + + test("should limit the collected query parameter sizes") { + // given + val entities = execute( + """CREATE (node:A {p: 'startNode'})-[relationship:R {p: 'rel'}]->(_:B {p: 'endNode'}) + |WITH node, relationship + |MATCH path=()-->() + |RETURN node, relationship, path + """.stripMargin).single + + val node = entities("node").asInstanceOf[Node] + val relationship = entities("relationship").asInstanceOf[Relationship] + val path = entities("path").asInstanceOf[Path] + + val longString: String = "".padTo(200, 'x') + val query = "RETURN $param" + execute(query, params = Map("param" -> longString)) + execute(query, params = Map("param" -> List(1,2,3))) + execute(query, params = Map("param" -> Map("x" -> 1))) + execute(query, params = Map("param" -> node)) + execute(query, params = Map("param" -> relationship)) + execute(query, params = Map("param" -> path)) + + // when + val res = execute("CALL db.stats.retrieve('QUERIES')").toList + + println(res) + + // then + res should beListWithoutOrder( + beMapContaining( + "section" -> "QUERIES", + "data" -> beMapContaining( + "query" -> query, + "invocations" -> beInvocationsInOrder( + Map("param" -> longString.take(100)), + Map("param" -> "§LIST[3]"), + Map("param" -> "§MAP[1]"), + Map("param" -> node), + Map("param" -> relationship), + Map("param" -> "§PATH[1]") + ) + ) + ) + ) + } + test("[retrieveAllAnonymized] should anonymize tokens inside queries") { // given execute("CREATE (:User {age: 99})-[:KNOWS]->(:Buddy {p: 42})-[:WANTS]->(:Raccoon)") // create tokens @@ -413,7 +479,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport { "invocations" -> beInvocationsInOrder( "4ac156c0", // Map("user" -> "BrassLeg", "name" -> "George") "b439d06c", // Map("user" -> 2, "name" -> "Glinda") - "e5adc9ad" // Map("user" -> List(3.1, 3.2), "name" -> "Kim") + "8e2eb26e" // Map("user" -> List(3.1, 3.2), "name" -> "Kim") ) ) ), @@ -422,7 +488,7 @@ class DataCollectorQueriesAcceptanceTest extends DataCollectorTestSupport { "data" -> beMapContaining( "query" -> "RETURN $param0, $param1, $param0 + $param1", "invocations" -> beInvocationsInOrder( - "e5adc9ad" // Map("user" -> List(3.1, 3.2), "name" -> "Kim") + "8e2eb26e" // Map("user" -> List(3.1, 3.2), "name" -> "Kim") ) ) ) diff --git a/community/data-collector/src/main/java/org/neo4j/internal/collector/QueryCollector.java b/community/data-collector/src/main/java/org/neo4j/internal/collector/QueryCollector.java index 6047a81ef4a5..7102b74c078d 100644 --- a/community/data-collector/src/main/java/org/neo4j/internal/collector/QueryCollector.java +++ b/community/data-collector/src/main/java/org/neo4j/internal/collector/QueryCollector.java @@ -33,20 +33,14 @@ import org.neo4j.scheduler.JobScheduler; /** - * Simple Thread-safe query collector. + * Thread-safe query collector. * - * Note that is has several potentially not-so-nice properties: - * - * - It buffers all query data until collection is done. On high-workload systems - * this could use substantial memory - * - * - All threads that report queries on {@link QueryCollector#endSuccess(org.neo4j.kernel.api.query.ExecutingQuery)} - * contend for writing to the queue, which might cause delays before the first result on highly concurrent systems + * Delegates to RingRecentBuffer to hard limit the number of collected queries at any point in time. */ -class QueryCollector extends CollectorStateMachine> implements QueryExecutionMonitor +class QueryCollector extends CollectorStateMachine> implements QueryExecutionMonitor { private volatile boolean isCollecting; - private final RingRecentBuffer queries; + private final RingRecentBuffer queries; private final JobScheduler jobScheduler; /** * We retain at max 2^13 = 8192 queries in memory at any given time. This number @@ -100,9 +94,9 @@ protected Result doClear() } @Override - protected Iterator doGetData() + protected Iterator doGetData() { - List querySnapshots = new ArrayList<>(); + List querySnapshots = new ArrayList<>(); queries.foreach( querySnapshots::add ); return querySnapshots.iterator(); } @@ -119,7 +113,14 @@ public void endSuccess( ExecutingQuery query ) { if ( isCollecting ) { - queries.produce( query.snapshot() ); + QuerySnapshot snapshot = query.snapshot(); + queries.produce( + new TruncatedQuerySnapshot( snapshot.queryText(), + snapshot.queryPlan(), + snapshot.queryParameters(), + snapshot.elapsedTimeMicros(), + snapshot.compilationTimeMicros(), + snapshot.startTimestampMillis() ) ); } } } diff --git a/community/data-collector/src/main/java/org/neo4j/internal/collector/TruncatedQuerySnapshot.java b/community/data-collector/src/main/java/org/neo4j/internal/collector/TruncatedQuerySnapshot.java new file mode 100644 index 000000000000..4aa992b6ebd7 --- /dev/null +++ b/community/data-collector/src/main/java/org/neo4j/internal/collector/TruncatedQuerySnapshot.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.internal.collector; + +import org.neo4j.graphdb.ExecutionPlanDescription; +import org.neo4j.values.AnyValue; +import org.neo4j.values.SequenceValue; +import org.neo4j.values.ValueMapper; +import org.neo4j.values.storable.BooleanValue; +import org.neo4j.values.storable.DateTimeValue; +import org.neo4j.values.storable.DateValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.LocalTimeValue; +import org.neo4j.values.storable.NumberValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.storable.TextValue; +import org.neo4j.values.storable.TimeValue; +import org.neo4j.values.storable.Values; +import org.neo4j.values.virtual.MapValue; +import org.neo4j.values.virtual.NodeValue; +import org.neo4j.values.virtual.PathValue; +import org.neo4j.values.virtual.RelationshipValue; +import org.neo4j.values.virtual.VirtualNodeValue; +import org.neo4j.values.virtual.VirtualRelationshipValue; +import org.neo4j.values.virtual.VirtualValues; + +/** + * Variant of QuerySnapshot that truncates queryText and queryParameter data to limit the memory footprint of + * constant query collection. This is crucial to avoid bloating memory use for data import scenarios, and in general + * to avoid hogging lot's of memory that will be long-lived and likely tenured. + */ +class TruncatedQuerySnapshot +{ + final String queryText; + final ExecutionPlanDescription queryPlan; + final MapValue queryParameters; + final Long elapsedTimeMicros; + final Long compilationTimeMicros; + final Long startTimestampMillis; + + TruncatedQuerySnapshot( String queryText, + ExecutionPlanDescription queryPlan, + MapValue queryParameters, + Long elapsedTimeMicros, + Long compilationTimeMicros, + Long startTimestampMillis ) + { + this.queryText = truncateQueryText( queryText, 10000 ); + this.queryPlan = queryPlan; + this.queryParameters = truncateParameters( queryParameters ); + this.elapsedTimeMicros = elapsedTimeMicros; + this.compilationTimeMicros = compilationTimeMicros; + this.startTimestampMillis = startTimestampMillis; + } + + private static String truncateQueryText( String queryText, int maxLength ) + { + return queryText.length() > maxLength ? queryText.substring( 0, maxLength ) : queryText; + } + + private static MapValue truncateParameters( MapValue parameters ) + { + String[] keys = new String[parameters.size()]; + AnyValue[] values = new AnyValue[keys.length]; + + int i = 0; + for ( String key : parameters.keySet() ) + { + keys[i] = key; + values[i] = parameters.get( key ).map( VALUE_TRUNCATER ); + i++; + } + + return VirtualValues.map( keys, values ); + } + + private static ValueTruncater VALUE_TRUNCATER = new ValueTruncater(); + + static class ValueTruncater implements ValueMapper + { + + @Override + public AnyValue mapPath( PathValue value ) + { + return Values.stringValue( "§PATH[" + value.size() + "]" ); + } + + @Override + public AnyValue mapNode( VirtualNodeValue value ) + { + if ( value instanceof NodeValue ) + { + // Note: we do not want to keep a reference to the whole node value as it could contains a lot of data. + return VirtualValues.node( value.id() ); + } + return value; + } + + @Override + public AnyValue mapRelationship( VirtualRelationshipValue value ) + { + if ( value instanceof RelationshipValue ) + { + // Note: we do not want to keep a reference to the whole relationship value as it could contains a lot of data. + return VirtualValues.relationship( value.id() ); + } + return value; + } + + @Override + public AnyValue mapMap( MapValue map ) + { + return Values.stringValue( "§MAP[" + map.size() + "]" ); + } + + @Override + public AnyValue mapNoValue() + { + return Values.NO_VALUE; + } + + @Override + public AnyValue mapSequence( SequenceValue value ) + { + return Values.stringValue( "§LIST[" + value.length() + "]" ); + } + + @Override + public AnyValue mapText( TextValue value ) + { + if ( value.length() > 100 ) + { + return Values.stringValue( value.stringValue().substring( 0, 100 ) ); + } + return value; + } + + @Override + public AnyValue mapBoolean( BooleanValue value ) + { + return value; + } + + @Override + public AnyValue mapNumber( NumberValue value ) + { + return value; + } + + @Override + public AnyValue mapDateTime( DateTimeValue value ) + { + return value; + } + + @Override + public AnyValue mapLocalDateTime( LocalDateTimeValue value ) + { + return value; + } + + @Override + public AnyValue mapDate( DateValue value ) + { + return value; + } + + @Override + public AnyValue mapTime( TimeValue value ) + { + return value; + } + + @Override + public AnyValue mapLocalTime( LocalTimeValue value ) + { + return value; + } + + @Override + public AnyValue mapDuration( DurationValue value ) + { + return value; + } + + @Override + public AnyValue mapPoint( PointValue value ) + { + return value; + } + } +} diff --git a/community/data-collector/src/main/scala/org/neo4j/internal/collector/QueriesSection.scala b/community/data-collector/src/main/scala/org/neo4j/internal/collector/QueriesSection.scala index 7ee33abed58a..c387ca4bec71 100644 --- a/community/data-collector/src/main/scala/org/neo4j/internal/collector/QueriesSection.scala +++ b/community/data-collector/src/main/scala/org/neo4j/internal/collector/QueriesSection.scala @@ -20,12 +20,10 @@ package org.neo4j.internal.collector import java.util -import java.util.{Spliterator, Spliterators} import java.util.stream.{Stream, StreamSupport} +import java.util.{Spliterator, Spliterators} import org.neo4j.graphdb.ExecutionPlanDescription -import org.neo4j.kernel.api.query.QuerySnapshot -import org.neo4j.values.ValueMapper import org.neo4j.values.virtual.MapValue import scala.collection.mutable @@ -52,21 +50,21 @@ object QueriesSection { val profiles = new ArrayBuffer[ProfileData] } - val QUERY_FILTER = "(?:(?i)call)\\s+(?:dbms\\.|db\\.stats\\.)".r + private val QUERY_FILTER = "(?:(?i)call)\\s+(?:dbms\\.|db\\.stats\\.)".r - def retrieve(querySnapshots: java.util.Iterator[QuerySnapshot], + def retrieve(querySnapshots: java.util.Iterator[TruncatedQuerySnapshot], anonymizer: QueryAnonymizer, maxInvocations: Int): Stream[RetrieveResult] = { val queries = new mutable.HashMap[QueryKey, QueryData]() while (querySnapshots.hasNext) { val snapshot = querySnapshots.next() - val queryString = snapshot.queryText() + val queryString = snapshot.queryText if (QUERY_FILTER.findFirstMatchIn(queryString).isEmpty) { - val snapshotList = queries.getOrElseUpdate(QueryKey(queryString, snapshot.queryPlan()), new QueryData()) - snapshotList.invocations += SingleInvocation(snapshot.queryParameters(), - snapshot.elapsedTimeMicros(), - snapshot.compilationTimeMicros(), - snapshot.startTimestampMillis()) + val snapshotList = queries.getOrElseUpdate(QueryKey(queryString, snapshot.queryPlan), new QueryData()) + snapshotList.invocations += SingleInvocation(snapshot.queryParameters, + snapshot.elapsedTimeMicros, + snapshot.compilationTimeMicros, + snapshot.startTimestampMillis) } }