-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
Neo4jTransactionalContext.java
276 lines (249 loc) · 8.87 KB
/
Neo4jTransactionalContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.query;
import java.util.function.Supplier;
import org.neo4j.graphdb.Lock;
import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.PropertyContainer;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.ExecutingQuery;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.QueryRegistryOperations;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.dbms.DbmsOperations;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.guard.Guard;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
public class Neo4jTransactionalContext implements TransactionalContext
{
private final GraphDatabaseQueryService graph;
private final ThreadToStatementContextBridge txBridge;
private final KernelTransaction.Type transactionType;
private final AccessMode mode;
private final Supplier<Statement> statementSupplier;
private final DbmsOperations dbmsOperations;
private final Guard guard;
private final ExecutingQuery executingQuery;
private final PropertyContainerLocker locker;
private InternalTransaction transaction;
private Statement statement;
private boolean isOpen = true;
public Neo4jTransactionalContext(
GraphDatabaseQueryService graph,
InternalTransaction initialTransaction,
KernelTransaction.Type transactionType,
AccessMode transactionMode,
Supplier<Statement> statementSupplier,
ExecutingQuery executingQuery,
PropertyContainerLocker locker,
ThreadToStatementContextBridge txBridge,
DbmsOperations dbmsOperations,
Guard guard
) {
this.graph = graph;
this.transaction = initialTransaction;
this.transactionType = transactionType;
this.mode = transactionMode;
this.statementSupplier = statementSupplier;
this.statement = statementSupplier.get();
this.executingQuery = executingQuery;
this.locker = locker;
this.txBridge = txBridge;
this.dbmsOperations = dbmsOperations;
this.guard = guard;
}
@Override
public ExecutingQuery executingQuery()
{
return executingQuery;
}
@Override
public ReadOperations readOperations()
{
return statement.readOperations();
}
@Override
public DbmsOperations dbmsOperations()
{
return dbmsOperations;
}
@Override
public boolean isTopLevelTx()
{
return transaction.transactionType() == KernelTransaction.Type.implicit;
}
@Override
public void close( boolean success )
{
if ( isOpen )
{
try
{
statement.queryRegistration().unregisterExecutingQuery( executingQuery );
statement.close();
if ( success )
{
transaction.success();
}
else
{
transaction.failure();
}
transaction.close();
}
finally
{
statement = null;
transaction = null;
isOpen = false;
}
}
}
@Override // TODO: Make the state of this class a state machine that is a single value and maybe CAS state
// transitions
public void terminate()
{
if ( isOpen )
{
try
{
transaction.terminate();
close( false );
}
catch ( NotInTransactionException e )
{
// Ok then. Nothing to do
}
}
}
@Override
public void commitAndRestartTx()
{
/*
* This method is use by the Cypher runtime to cater for PERIODIC COMMIT, which allows a single query to
* periodically, after x number of rows, to commit a transaction and spawn a new one.
*
* To still keep track of the running stream after switching transactions, we need to open the new transaction
* before closing the old one. This way, a query will not disappear and appear when switching transactions.
*
* Since our transactions are thread bound, we must first unbind the old transaction from the thread before
* creating a new one. And then we need to do that thread switching again to close the old transaction.
*/
// (1) Unbind current transaction
QueryRegistryOperations oldQueryRegistryOperations = statement.queryRegistration();
InternalTransaction oldTransaction = transaction;
KernelTransaction oldKernelTx = txBridge.getKernelTransactionBoundToThisThread( true );
txBridge.unbindTransactionFromCurrentThread();
// (2) Create, bind, register, and unbind new transaction
transaction = graph.beginTransaction( transactionType, mode );
statement = txBridge.get();
statement.queryRegistration().registerExecutingQuery( executingQuery );
KernelTransaction kernelTx = txBridge.getKernelTransactionBoundToThisThread( true );
txBridge.unbindTransactionFromCurrentThread();
// (3) Rebind old transaction just to commit and close it (and unregister as a side effect of that)
txBridge.bindTransactionToCurrentThread( oldKernelTx );
oldQueryRegistryOperations.unregisterExecutingQuery( executingQuery );
try
{
oldTransaction.success();
oldTransaction.close();
}
catch ( Throwable t )
{
// Corner case: The old transaction might have been terminated by the user. Now we also need to
// terminate the new transaction.
txBridge.bindTransactionToCurrentThread( kernelTx );
transaction.failure();
transaction.close();
txBridge.unbindTransactionFromCurrentThread();
throw t;
}
// (4) Unbind the now closed old transaction and rebind the new transaction for continued execution
txBridge.unbindTransactionFromCurrentThread();
txBridge.bindTransactionToCurrentThread( kernelTx );
}
@Override
public void cleanForReuse()
{
// close the old statement reference after the statement has been "upgraded"
// to either a schema data or a schema statement, so that the locks are "handed over".
statement.queryRegistration().unregisterExecutingQuery( executingQuery );
statement.close();
statement = txBridge.get();
statement.queryRegistration().registerExecutingQuery( executingQuery );
}
@Override
public TransactionalContext getOrBeginNewIfClosed()
{
if ( !isOpen )
{
transaction = graph.beginTransaction( transactionType, mode );
statement = statementSupplier.get();
statement.queryRegistration().registerExecutingQuery( executingQuery );
isOpen = true;
}
return this;
}
@Override
public boolean isOpen()
{
return isOpen;
}
@Override
public GraphDatabaseQueryService graph()
{
return graph;
}
@Override
public Statement statement()
{
return statement;
}
@Override
public void check()
{
guard.check( (KernelStatement) statement );
}
@Override
public TxStateHolder stateView()
{
return (KernelStatement) statement;
}
@Override
public Lock acquireWriteLock( PropertyContainer p )
{
return locker.exclusiveLock( statement, p );
}
@Override
public KernelTransaction.Revertable restrictCurrentTransaction( AccessMode accessMode )
{
return transaction.overrideWith( accessMode );
}
@Override
public AccessMode accessMode()
{
return mode;
}
}