-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
KillQueryTest.scala
132 lines (113 loc) · 5.14 KB
/
KillQueryTest.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher
import java.util
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean
import org.neo4j.cypher.internal.{CommunityCompatibilityFactory, ExecutionEngine}
import org.neo4j.graphdb.{TransactionTerminatedException, TransientTransactionFailureException}
import org.neo4j.kernel.api.KernelTransaction.Type
import org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker
import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo
import org.neo4j.kernel.impl.query.{Neo4jTransactionalContextFactory, TransactionalContext, TransactionalContextFactory}
import org.neo4j.logging.NullLogProvider
import org.neo4j.values.virtual.VirtualValues.EMPTY_MAP
class KillQueryTest extends ExecutionEngineFunSuite {
/*
This test creates 10 threads that run a Cypher query over and over again for 10 seconds.
Concurrently, another thread tries to terminate all running queries. This should not lead to weird behaviour - only
well known and expected exceptions should be produced.
*/
val emptyMap = new util.HashMap[String, AnyRef]
val NODE_COUNT = 1000
val THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2
val SECONDS_TO_RUN = 5
test("run queries and kill them left and right") {
val locker = new PropertyContainerLocker()
val contextFactory = Neo4jTransactionalContextFactory.create(graph, locker)
(1 to NODE_COUNT) foreach { x =>
createLabeledNode(Map("x" -> x, "name" -> ("apa" + x)), "Label")
}
val logProvider = NullLogProvider.getInstance()
val compatibilityFactory = new CommunityCompatibilityFactory(graph, kernelMonitors, logProvider)
val engine = new ExecutionEngine(graph, logProvider, compatibilityFactory)
val query = "MATCH (n:Label) WHERE n.x > 12 RETURN n.name"
val continue = new AtomicBoolean(true)
var exceptionsThrown = List.empty[Throwable]
val tcs = new ArrayBlockingQueue[TransactionalContext](1000)
val queryRunner = createQueryRunner(continue, contextFactory, query, tcs, engine, e => exceptionsThrown = exceptionsThrown :+ e)
val queryKiller = createQueryKiller(continue, tcs, e => exceptionsThrown = exceptionsThrown :+ e)
val threads: Seq[Thread] = (0 until THREAD_COUNT map (x => new Thread(queryRunner))) :+ new Thread(queryKiller)
threads.foreach(_.start())
threads.foreach(_.join())
exceptionsThrown.foreach(throw _)
}
private val connectionInfo = new ClientConnectionInfo {
override def asConnectionDetails(): String = ???
override def protocol(): String = ???
}
private def createQueryKiller(continue: AtomicBoolean, tcs: ArrayBlockingQueue[TransactionalContext], exLogger: Throwable => Unit) = {
new Runnable {
override def run(): Unit =
try {
val start = System.currentTimeMillis()
while ((System.currentTimeMillis() - start) < SECONDS_TO_RUN * 1000 && continue.get()) {
val transactionalContext = tcs.poll()
if (transactionalContext != null)
try {
transactionalContext.terminate()
} catch {
case e: Throwable =>
exLogger(e)
continue.set(false)
}
}
} finally {
continue.set(false)
}
}
}
private def createQueryRunner(continue: AtomicBoolean, contextFactory: TransactionalContextFactory, query: String, tcs: ArrayBlockingQueue[TransactionalContext], engine: ExecutionEngine, exLogger: Throwable => Unit) = {
new Runnable {
def run() {
while (continue.get()) {
val tx = graph.beginTransaction(Type.`implicit`, AUTH_DISABLED)
try {
val transactionalContext: TransactionalContext = contextFactory.newContext(connectionInfo, tx, query, EMPTY_MAP)
tcs.put(transactionalContext)
val result = engine.execute(query, Map.empty[String, AnyRef], transactionalContext)
result.resultAsString()
tx.success()
}
catch {
// These are the acceptable exceptions
case _: TransactionTerminatedException =>
case _: TransientTransactionFailureException =>
case e: Throwable =>
tx.close()
continue.set(false)
exLogger(e)
}
}
}
}
}
}