-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
SwapperSet.java
197 lines (183 loc) · 7.92 KB
/
SwapperSet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.pagecache.impl.muninn;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.IntPredicate;
import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.io.pagecache.PageSwapper;
/**
* The SwapperSet maintains the set of allocated {@link PageSwapper}s, and their mapping to swapper ids.
* These swapper ids are a limited resource, so they must eventually be reused as files are mapped and unmapped.
* Before a swapper id can be reused, we have to make sure that there are no pages in the page cache, that
* are bound to the old swapper id. To ensure this, we have to periodically {@link MuninnPageCache#vacuum(SwapperSet)}
* the page cache. The vacuum process will then fully evict all pages that are bound to a page swapper id that
* was freed before the start of the vacuum process.
*/
final class SwapperSet
{
// The sentinel is used to reserve swapper id 0 as a special value.
private static final SwapperMapping SENTINEL = new SwapperMapping( 0, null );
// The tombstone is used as a marker to reserve allocation entries that have been freed, but not yet vacuumed.
// An allocation cannot be reused until it has been vacuumed.
private static final SwapperMapping TOMBSTONE = new SwapperMapping( 0, null );
private static final int MAX_SWAPPER_ID = Short.MAX_VALUE;
private volatile SwapperMapping[] swapperMappings = new SwapperMapping[] { SENTINEL };
private final PrimitiveIntSet free = Primitive.intSet();
private final Object vacuumLock = new Object();
private int freeCounter; // Used in `free`; Guarded by `this`
/**
* The mapping entry between a {@link PageSwapper} and its swapper id.
*/
static final class SwapperMapping
{
public final int id;
public final PageSwapper swapper;
private SwapperMapping( int id, PageSwapper swapper )
{
this.id = id;
this.swapper = swapper;
}
}
/**
* Get the {@link SwapperMapping} for the given swapper id.
*/
SwapperMapping getAllocation( int id )
{
checkId( id );
SwapperMapping swapperMapping = swapperMappings[id];
if ( swapperMapping == null || swapperMapping == TOMBSTONE )
{
return null;
}
return swapperMapping;
}
private void checkId( int id )
{
if ( id == 0 )
{
throw new IllegalArgumentException( "0 is an invalid swapper id" );
}
}
/**
* Allocate a new swapper id for the given {@link PageSwapper}.
*/
synchronized int allocate( PageSwapper swapper )
{
SwapperMapping[] swapperMappings = this.swapperMappings;
// First look for an available freed slot.
synchronized ( free )
{
if ( !free.isEmpty() )
{
int id = free.iterator().next();
free.remove( id );
swapperMappings[id] = new SwapperMapping( id, swapper );
this.swapperMappings = swapperMappings; // Volatile store synchronizes-with loads in getters.
return id;
}
}
// No free slot was found above, so we extend the array to make room for a new slot.
int id = swapperMappings.length;
if ( id + 1 > MAX_SWAPPER_ID )
{
throw new IllegalStateException( "All swapper ids are allocated: " + MAX_SWAPPER_ID );
}
swapperMappings = Arrays.copyOf( swapperMappings, id + 1 );
swapperMappings[id] = new SwapperMapping( id, swapper );
this.swapperMappings = swapperMappings; // Volatile store synchronizes-with loads in getters.
return id;
}
/**
* Free the given swapper id, and return {@code true} if it is time for a
* {@link MuninnPageCache#vacuum(SwapperSet)}, otherwise it returns {@code false}.
*/
synchronized boolean free( int id )
{
checkId( id );
SwapperMapping[] swapperMappings = this.swapperMappings;
SwapperMapping current = swapperMappings[id];
if ( current == null || current == TOMBSTONE )
{
throw new IllegalStateException(
"PageSwapper allocation id " + id + " is currently not allocated. Likely a double free bug." );
}
swapperMappings[id] = TOMBSTONE;
this.swapperMappings = swapperMappings; // Volatile store synchronizes-with loads in getters.
freeCounter++;
if ( freeCounter == 20 )
{
freeCounter = 0;
return true;
}
return false;
}
/**
* Collect all freed page swapper ids, and pass them to the given callback, after which the freed ids will be
* elegible for reuse.
* This is done with careful synchronisation such that allocating and freeing of ids is allowed to mostly proceed
* concurrently.
*/
void vacuum( Consumer<IntPredicate> evictAllLoadedPagesCallback )
{
// We do this complicated locking to avoid blocking allocate() and free() as much as possible, while still only
// allow a single thread to do vacuum at a time, and at the same time have consistent locking around the
// set of free ids.
synchronized ( vacuumLock )
{
// Collect currently free ids.
PrimitiveIntSet freeIds = Primitive.intSet();
SwapperMapping[] swapperMappings = this.swapperMappings;
for ( int id = 0; id < swapperMappings.length; id++ )
{
SwapperMapping swapperMapping = swapperMappings[id];
if ( swapperMapping == TOMBSTONE )
{
freeIds.add( id );
}
}
// Evict all of them without holding up the lock on the free id set. This allows allocate() and free() to
// proceed concurrently with our eviction. This is safe because we know that the ids we are evicting cannot
// possibly be reused until we remove them from the free id set, which we won't do until after we've evicted
// all of their loaded pages.
evictAllLoadedPagesCallback.accept( freeIds );
// Finally, all of the pages that remained in memory with an unmapped swapper id have been evicted. We can
// now safely allow those ids to be reused. Note, however, that free() might have been called while we were
// doing this, so we can't just free.clear() the set; no, we have to explicitly remove only those specific
// ids whose pages we evicted.
synchronized ( this )
{
PrimitiveIntIterator itr = freeIds.iterator();
while ( itr.hasNext() )
{
int freeId = itr.next();
swapperMappings[freeId] = null;
}
this.swapperMappings = swapperMappings; // Volatile store synchronizes-with loads in getters.
}
synchronized ( free )
{
free.addAll( freeIds.iterator() );
}
}
}
}