/
MemoryCleaner.java
355 lines (310 loc) · 11.8 KB
/
MemoryCleaner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
/*
*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package ai.rapids.cudf;
import ai.rapids.cudf.ast.CompiledExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* ColumnVectors may store data off heap, and because of complicated processing the life time of
* an individual vector can vary a lot. Typically a java finalizer could be used for this but
* they can cause a number of performance issues related to gc, and in some cases may effectively
* leak resources if the heap is large and GC's end up being delayed.
* <p>
* To address these issues the primary way to releasing the resources of a ColumnVector that is
* stored off of the java heap should be through reference counting. Because memory leaks are
* really bad for long lived daemons this is intended to be a backup.
* <p>
* When a ColumnVector first allocates off heap resources it should register itself with this
* along with a Cleaner instance. The Cleaner instance should have no direct links to the
* ColumnVector that would prevent the ColumnVector from being garbage collected. This will
* use WeakReferences internally to know when the resources have been leaked.
* A ColumnVector may keep a reference to the Cleaner instance and either update it as new
* resources are allocated or use it to release the resources it is holding. Once the
* ColumnVector's reference count reaches 0 and the resources are released. At some point
* later the Cleaner itself will be released.
*/
public final class MemoryCleaner {
private static final boolean REF_COUNT_DEBUG = Boolean.getBoolean("ai.rapids.refcount.debug");
private static final Logger log = LoggerFactory.getLogger(MemoryCleaner.class);
private static final AtomicLong idGen = new AtomicLong(0);
/**
* Check if configured the shutdown hook which checks leaks at shutdown time.
*
* @return true if configured, false otherwise.
*/
public static boolean configuredDefaultShutdownHook() {
return REF_COUNT_DEBUG;
}
/**
* API that can be used to clean up the resources for a vector, even if there was a leak
*/
public static abstract class Cleaner {
private final List<RefCountDebugItem> refCountDebug;
public final long id = idGen.incrementAndGet();
private boolean leakExpected = false;
public Cleaner() {
if (REF_COUNT_DEBUG) {
refCountDebug = new LinkedList<>();
} else {
refCountDebug = null;
}
}
public final void addRef() {
if (REF_COUNT_DEBUG && refCountDebug != null) {
synchronized(this) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("INC"));
}
}
}
public final void delRef() {
if (REF_COUNT_DEBUG && refCountDebug != null) {
synchronized(this) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("DEC"));
}
}
}
public final void logRefCountDebug(String message) {
if (REF_COUNT_DEBUG && refCountDebug != null) {
synchronized(this) {
log.error("{} (ID: {}): {}", message, id, MemoryCleaner.stringJoin("\n", refCountDebug));
}
}
}
/**
* Clean up any resources not previously released.
* @param logErrorIfNotClean if true we should log a leak unless it is expected.
* @return true if resources were cleaned up else false.
*/
public final boolean clean(boolean logErrorIfNotClean) {
boolean cleaned = cleanImpl(logErrorIfNotClean && !leakExpected);
if (cleaned) {
all.remove(id);
}
return cleaned;
}
/**
* Return true if a leak is expected for this object else false.
*/
public final boolean isLeakExpected() {
return leakExpected;
}
/**
* Clean up any resources not previously released.
* @param logErrorIfNotClean if true and there are resources to clean up a leak has happened
* so log it.
* @return true if resources were cleaned up else false.
*/
protected abstract boolean cleanImpl(boolean logErrorIfNotClean);
public void noWarnLeakExpected() {
leakExpected = true;
}
/**
* Check if the underlying memory has been cleaned up or not.
* @return true this is clean else false.
*/
public abstract boolean isClean();
}
static final AtomicLong leakCount = new AtomicLong();
private static final Map<Long, CleanerWeakReference> all =
new ConcurrentHashMap(); // We want to be thread safe
private static final ReferenceQueue<?> collected = new ReferenceQueue<>();
private static class CleanerWeakReference<T> extends WeakReference<T> {
private final Cleaner cleaner;
final boolean isRmmBlocker;
public CleanerWeakReference(T orig, Cleaner cleaner, ReferenceQueue collected, boolean isRmmBlocker) {
super(orig, collected);
this.cleaner = cleaner;
this.isRmmBlocker = isRmmBlocker;
}
public void clean() {
if (cleaner.clean(true)) {
leakCount.incrementAndGet();
}
}
}
/**
* The default GPU as set by user threads.
*/
private static volatile int defaultGpu = -1;
/**
* This should be called from RMM when it is initialized.
*/
static void setDefaultGpu(int defaultGpuId) {
defaultGpu = defaultGpuId;
}
private static final Thread t = new Thread(() -> {
try {
int currentGpuId = -1;
while (true) {
CleanerWeakReference next = (CleanerWeakReference)collected.remove(100);
if (next != null) {
try {
if (currentGpuId != defaultGpu) {
Cuda.setDevice(defaultGpu);
currentGpuId = defaultGpu;
}
} catch (Throwable t) {
log.error("ERROR TRYING TO SET GPU ID TO " + defaultGpu, t);
}
try {
next.clean();
} catch (Throwable t) {
log.error("CAUGHT EXCEPTION WHILE TRYING TO CLEAN " + next, t);
}
all.remove(next.cleaner.id);
}
}
} catch (InterruptedException e) {
// Ignored just exit
}
}, "Cleaner Thread");
/**
* Default shutdown runnable used to be added to Java default shutdown hook.
* It checks the leaks at shutdown time.
*/
private static final Runnable DEFAULT_SHUTDOWN_RUNNABLE = () -> {
// If we are debugging things do a best effort to check for leaks at the end
System.gc();
// Avoid issues on shutdown with the cleaner thread.
t.interrupt();
try {
t.join(1000);
} catch (InterruptedException e) {
// Ignored
}
if (defaultGpu >= 0) {
Cuda.setDevice(defaultGpu);
}
for (CleanerWeakReference cwr : all.values()) {
cwr.clean();
}
};
private static final Thread DEFAULT_SHUTDOWN_THREAD = new Thread(DEFAULT_SHUTDOWN_RUNNABLE);
static {
t.setDaemon(true);
t.start();
if (REF_COUNT_DEBUG) {
Runtime.getRuntime().addShutdownHook(DEFAULT_SHUTDOWN_THREAD);
}
}
/**
* De-register the default shutdown hook from Java default Runtime, then return the corresponding
* shutdown runnable.
* If you want to register the default shutdown runnable in a custom shutdown hook manager
* instead of Java default Runtime, should first remove it using this method and then add it
*
* @return the default shutdown runnable
*/
public static Runnable removeDefaultShutdownHook() {
Runtime.getRuntime().removeShutdownHook(DEFAULT_SHUTDOWN_THREAD);
return DEFAULT_SHUTDOWN_RUNNABLE;
}
static void register(ColumnVector vec, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(vec, cleaner, collected, true));
}
static void register(Scalar s, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(s, cleaner, collected, true));
}
static void register(HostColumnVectorCore vec, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(vec, cleaner, collected, false));
}
static void register(MemoryBuffer buf, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(buf, cleaner, collected, buf instanceof BaseDeviceMemoryBuffer));
}
static void register(Cuda.Stream stream, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(stream, cleaner, collected, false));
}
static void register(Cuda.Event event, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(event, cleaner, collected, false));
}
static void register(CuFileDriver driver, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(driver, cleaner, collected, false));
}
static void register(CuFileBuffer buffer, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(buffer, cleaner, collected, false));
}
static void register(CuFileHandle handle, Cleaner cleaner) {
// It is now registered...
all.put(cleaner.id, new CleanerWeakReference(handle, cleaner, collected, false));
}
public static void register(CompiledExpression expr, Cleaner cleaner) {
all.put(cleaner.id, new CleanerWeakReference(expr, cleaner, collected, false));
}
static void register(HashJoin hashJoin, Cleaner cleaner) {
all.put(cleaner.id, new CleanerWeakReference(hashJoin, cleaner, collected, true));
}
/**
* This is not 100% perfect and we can still run into situations where RMM buffers were not
* collected and this returns false because of thread race conditions. This is just a best effort.
* @return true if there are rmm blockers else false.
*/
static boolean bestEffortHasRmmBlockers() {
return all.values().stream().anyMatch(cwr -> cwr.isRmmBlocker && !cwr.cleaner.isClean());
}
/**
* Convert elements in it to a String and join them together. Only use for debug messages
* where the code execution itself can be disabled as this is not fast.
*/
private static <T> String stringJoin(String delim, Iterable<T> it) {
return String.join(delim,
StreamSupport.stream(it.spliterator(), false)
.map((i) -> i.toString())
.collect(Collectors.toList()));
}
/**
* When debug is enabled holds information about inc and dec of ref count.
*/
private static final class RefCountDebugItem {
final StackTraceElement[] stackTrace;
final long timeMs;
final String op;
public RefCountDebugItem(String op) {
this.stackTrace = Thread.currentThread().getStackTrace();
this.timeMs = System.currentTimeMillis();
this.op = op;
}
public String toString() {
Date date = new Date(timeMs);
// Simple Date Format is horribly expensive only do this when debug is turned on!
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS z");
return dateFormat.format(date) + ": " + op + "\n"
+ stringJoin("\n", Arrays.asList(stackTrace))
+ "\n";
}
}
}