-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
KernelStatement.java
241 lines (215 loc) · 7.57 KB
/
KernelStatement.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;
import java.util.stream.Stream;
import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.kernel.api.DataWriteOperations;
import org.neo4j.kernel.api.ExecutingQuery;
import org.neo4j.kernel.api.QueryRegistryOperations;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.SchemaWriteOperations;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.impl.locking.StatementLocks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.storageengine.api.StorageStatement;
/**
* A resource efficient implementation of {@link Statement}. Designed to be reused within a
* {@link KernelTransactionImplementation} instance, even across transactions since this instances itself
* doesn't hold essential state. Usage:
*
* <ol>
* <li>Construct {@link KernelStatement} when {@link KernelTransactionImplementation} is constructed</li>
* <li>For every transaction...</li>
* <li>Call {@link #initialize(StatementLocks)} which makes this instance
* full available and ready to use. Call when the {@link KernelTransactionImplementation} is initialized.</li>
* <li>Alternate {@link #acquire()} / {@link #close()} when acquiring / closing a statement for the transaction...
* Temporarily asymmetric number of calls to {@link #acquire()} / {@link #close()} is supported, although in
* the end an equal number of calls must have been issued.</li>
* <li>To be safe call {@link #forceClose()} at the end of a transaction to force a close of the statement,
* even if there are more than one current call to {@link #acquire()}. This instance is now again ready
* to be {@link #initialize(StatementLocks) initialized} and used for the transaction
* instance again, when it's initialized.</li>
* </ol>
*/
public class KernelStatement implements TxStateHolder, Statement
{
private final TxStateHolder txStateHolder;
private final StorageStatement storeStatement;
private final KernelTransactionImplementation transaction;
private final DataOperationsFacade facade;
private StatementLocks statementLocks;
private int referenceCount;
private volatile ExecutingQueryList executingQueryList;
public KernelStatement(
KernelTransactionImplementation transaction,
TxStateHolder txStateHolder,
StatementOperationParts operations,
StorageStatement storeStatement,
Procedures procedures
)
{
this.transaction = transaction;
this.txStateHolder = txStateHolder;
this.storeStatement = storeStatement;
this.facade = new DataOperationsFacade( transaction, this, operations, procedures );
this.executingQueryList = ExecutingQueryList.EMPTY;
}
@Override
public ReadOperations readOperations()
{
if( !transaction.mode().allowsReads() )
{
throw transaction.mode().onViolation(
String.format( "Read operations are not allowed for '%s'.", transaction.mode().name() ) );
}
return facade;
}
@Override
public TokenWriteOperations tokenWriteOperations()
{
return facade;
}
@Override
public DataWriteOperations dataWriteOperations()
throws InvalidTransactionTypeKernelException
{
if( !transaction.mode().allowsWrites() )
{
throw transaction.mode().onViolation(
String.format( "Write operations are not allowed for '%s'.", transaction.mode().name() ) );
}
transaction.upgradeToDataWrites();
return facade;
}
@Override
public SchemaWriteOperations schemaWriteOperations()
throws InvalidTransactionTypeKernelException
{
if( !transaction.mode().allowsSchemaWrites() )
{
throw transaction.mode().onViolation(
String.format( "Schema operations are not allowed for '%s'.", transaction.mode().name() ) );
}
transaction.upgradeToSchemaWrites();
return facade;
}
@Override
public QueryRegistryOperations queryRegistration()
{
return facade;
}
@Override
public TransactionState txState()
{
return txStateHolder.txState();
}
@Override
public LegacyIndexTransactionState legacyIndexTxState()
{
return txStateHolder.legacyIndexTxState();
}
@Override
public boolean hasTxStateWithChanges()
{
return txStateHolder.hasTxStateWithChanges();
}
@Override
public void close()
{
// Check referenceCount > 0 since we allow multiple close calls,
// i.e. ignore closing already closed statements
if ( referenceCount > 0 && (--referenceCount == 0) )
{
cleanupResources();
}
}
void assertOpen()
{
if ( referenceCount == 0 )
{
throw new NotInTransactionException( "The statement has been closed." );
}
Status terminationReason = transaction.getReasonIfTerminated();
if ( terminationReason != null )
{
throw new TransactionTerminatedException( terminationReason );
}
}
void initialize( StatementLocks statementLocks )
{
this.statementLocks = statementLocks;
}
public StatementLocks locks()
{
return statementLocks;
}
final void acquire()
{
if ( referenceCount++ == 0 )
{
storeStatement.acquire();
}
}
final boolean isAcquired()
{
return ( referenceCount > 0 );
}
final void forceClose()
{
if ( referenceCount > 0 )
{
referenceCount = 0;
cleanupResources();
}
}
final String authSubjectName()
{
return transaction.mode().name();
}
final Stream<ExecutingQuery> executingQueries()
{
return executingQueryList.queries();
}
final void startQueryExecution( ExecutingQuery query )
{
this.executingQueryList = executingQueryList.push( query );
}
final void stopQueryExecution( ExecutingQuery executingQuery )
{
this.executingQueryList = executingQueryList.remove( executingQuery );
}
/* only public for tests */ public final StorageStatement getStoreStatement()
{
return storeStatement;
}
private void cleanupResources()
{
// closing is done by KTI
storeStatement.release();
executingQueryList = ExecutingQueryList.EMPTY;
}
}