-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
SchemaIndexProvider.java
290 lines (259 loc) · 10.4 KB
/
SchemaIndexProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.api.index;
import java.io.File;
import java.io.IOException;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.storemigration.StoreMigrationParticipant;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
/**
* Contract for implementing an index in Neo4j.
*
* This is a sensitive thing to implement, because it manages data that is controlled by
* Neo4js logical log. As such, the implementation needs to behave under some rather strict rules.
*
* <h3>Populating the index</h3>
*
* When an index rule is added, the {@link IndexingService} is notified. It will, in turn, ask
* your {@link SchemaIndexProvider} for a\
* {@link #getPopulator(long, IndexDescriptor, IndexSamplingConfig) batch index writer}.
*
* A background index job is triggered, and all existing data that applies to the new rule, as well as new data
* from the "outside", will be inserted using the writer. You are guaranteed that usage of this writer,
* during population, will be single threaded.
*
* These are the rules you must adhere to here:
*
* <ul>
* <li>You CANNOT say that the state of the index is {@link InternalIndexState#ONLINE}</li>
* <li>You MUST store all updates given to you</li>
* <li>You MAY persistently store the updates</li>
* </ul>
*
*
* <h3>The Flip</h3>
*
* Once population is done, the index needs to be "flipped" to an online mode of operation.
*
* The index will be notified, through the {@link org.neo4j.kernel.api.index.IndexPopulator#close(boolean)}
* method, that population is done, and that the index should turn it's state to {@link InternalIndexState#ONLINE} or
* {@link InternalIndexState#FAILED} depending on the value given to the
* {@link org.neo4j.kernel.api.index.IndexPopulator#close(boolean) close method}.
*
* If the index is persisted to disk, this is a <i>vital</i> part of the index lifecycle.
* For a persisted index, the index MUST NOT store the state as online unless it first guarantees that the entire index
* is flushed to disk. Failure to do so could produce a situation where, after a crash,
* an index is believed to be online when it in fact was not yet fully populated. This would break the database
* recovery process.
*
* If you are implementing this interface, you can choose to not store index state. In that case,
* you should report index state as {@link InternalIndexState#POPULATING} upon startup.
* This will cause the database to re-create the index from scratch again.
*
* These are the rules you must adhere to here:
*
* <ul>
* <li>You MUST have flushed the index to durable storage if you are to persist index state as {@link InternalIndexState#ONLINE}</li>
* <li>You MAY decide not to store index state</li>
* <li>If you don't store index state, you MUST default to {@link InternalIndexState#POPULATING}</li>
* </ul>
*
* <h3>Online operation</h3>
*
* Once the index is online, the database will move to using the
* {@link #getOnlineAccessor(long, IndexDescriptor, IndexSamplingConfig) online accessor} to
* write to the index.
*/
public abstract class SchemaIndexProvider extends LifecycleAdapter implements Comparable<SchemaIndexProvider>
{
public static final SchemaIndexProvider NO_INDEX_PROVIDER =
new SchemaIndexProvider( new Descriptor( "no-index-provider", "1.0" ), -1 )
{
private final IndexAccessor singleWriter = new IndexAccessor.Adapter();
private final IndexPopulator singlePopulator = new IndexPopulator.Adapter();
@Override
public IndexAccessor getOnlineAccessor( long indexId, IndexDescriptor descriptor,
IndexSamplingConfig samplingConfig )
{
return singleWriter;
}
@Override
public IndexPopulator getPopulator( long indexId, IndexDescriptor descriptor,
IndexSamplingConfig samplingConfig )
{
return singlePopulator;
}
@Override
public InternalIndexState getInitialState( long indexId, IndexDescriptor descriptor )
{
return InternalIndexState.POPULATING;
}
@Override
public StoreMigrationParticipant storeMigrationParticipant( FileSystemAbstraction fs,
PageCache pageCache )
{
return StoreMigrationParticipant.NOT_PARTICIPATING;
}
@Override
public String getPopulationFailure( long indexId ) throws IllegalStateException
{
throw new IllegalStateException();
}
};
protected final int priority;
private final Descriptor providerDescriptor;
protected SchemaIndexProvider( Descriptor descriptor, int priority )
{
assert descriptor != null;
this.priority = priority;
this.providerDescriptor = descriptor;
}
/**
* Used for initially populating a created index, using batch insertion.
*/
public abstract IndexPopulator getPopulator( long indexId, IndexDescriptor descriptor,
IndexSamplingConfig samplingConfig );
/**
* Used for updating an index once initial population has completed.
*/
public abstract IndexAccessor getOnlineAccessor( long indexId, IndexDescriptor descriptor,
IndexSamplingConfig samplingConfig ) throws IOException;
/**
* Returns a failure previously gotten from {@link IndexPopulator#markAsFailed(String)}
*
* Implementations are expected to persist this failure
*/
public abstract String getPopulationFailure( long indexId ) throws IllegalStateException;
/**
* Called during startup to find out which state an index is in. If {@link InternalIndexState#FAILED}
* is returned then a further call to {@link #getPopulationFailure(long)} is expected and should return
* the failure accepted by any call to {@link IndexPopulator#markAsFailed(String)} call at the time
* of failure.
*/
public abstract InternalIndexState getInitialState( long indexId, IndexDescriptor descriptor );
/**
* @return a description of this index provider
*/
public Descriptor getProviderDescriptor()
{
return providerDescriptor;
}
@Override
public int compareTo( SchemaIndexProvider o )
{
return this.priority - o.priority;
}
@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
SchemaIndexProvider other = (SchemaIndexProvider) o;
return priority == other.priority &&
providerDescriptor.equals( other.providerDescriptor );
}
@Override
public int hashCode()
{
int result = priority;
result = 31 * result + (providerDescriptor != null ? providerDescriptor.hashCode() : 0);
return result;
}
/**
* Get schema index store root directory in specified store.
* @param storeDir store root directory
* @return schema index store root directory
*/
public File getSchemaIndexStoreDirectory( File storeDir )
{
return new File( new File( new File( storeDir, "schema" ), "index" ), getProviderDescriptor().getKey() );
}
public abstract StoreMigrationParticipant storeMigrationParticipant( FileSystemAbstraction fs, PageCache pageCache );
/**
* Provides a snapshot of meta files about this index provider, not the indexes themselves.
* @return
*/
public ResourceIterator<File> snapshotMetaFiles()
{
return Iterators.emptyResourceIterator();
}
public static class Descriptor
{
private final String key;
private final String version;
public Descriptor( String key, String version )
{
if ( key == null )
{
throw new IllegalArgumentException( "null provider key prohibited" );
}
if ( key.length() == 0 )
{
throw new IllegalArgumentException( "empty provider key prohibited" );
}
if ( version == null )
{
throw new IllegalArgumentException( "null provider version prohibited" );
}
this.key = key;
this.version = version;
}
public String getKey()
{
return key;
}
public String getVersion()
{
return version;
}
@Override
public int hashCode()
{
return ( 23 + key.hashCode() ) ^ version.hashCode();
}
@Override
public boolean equals( Object obj )
{
if ( obj != null && obj instanceof Descriptor )
{
Descriptor otherDescriptor = (Descriptor) obj;
return key.equals( otherDescriptor.getKey() ) && version.equals( otherDescriptor.getVersion() );
}
return false;
}
@Override
public String toString()
{
return "{key=" + key + ", version=" + version + "}";
}
}
}