Skip to content

Commit

Permalink
Hybrid Java buffer release: Support both manual and auto release at t…
Browse files Browse the repository at this point in the history
…he same time (apache#34)

* Revert "Add AutoBufferLedger (apache#31)"

This reverts commit e48da37.

* Commit 1

* Commit 2

* Fix config builder visibility in Scala

* Commit 2 Fixup

* Commit 3

* Commit 3 Fixup
  • Loading branch information
zhztheplayer committed Apr 27, 2022
1 parent f75cc6d commit d891503
Show file tree
Hide file tree
Showing 40 changed files with 2,204 additions and 1,564 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public void retain(int increment) {
bufRefCnt.addAndGet(increment);
}

@Override
public boolean isOpen() {
return getRefCount() > 0;
}

@Override
public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public void retain(int increment) {
Preconditions.checkState(originalReferenceCount > 0, "retain called but memory was already released");
}

@Override
public boolean isOpen() {
return getRefCount() > 0;
}

@Override
public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.VisibleForTesting;

/**
Expand All @@ -30,6 +29,20 @@
* "-XX:MaxDirectMemorySize".
*/
public class DirectReservationListener implements ReservationListener {
private final Method methodReserve;
private final Method methodUnreserve;

private DirectReservationListener() {
try {
final Class<?> classBits = Class.forName("java.nio.Bits");
methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
methodReserve.setAccessible(true);
methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
methodUnreserve.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static final DirectReservationListener INSTANCE = new DirectReservationListener();

Expand All @@ -42,22 +55,43 @@ public static DirectReservationListener instance() {
*/
@Override
public void reserve(long size) {
MemoryUtil.reserveDirectMemory(size);
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodReserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory.
*/
@Override
public void unreserve(long size) {
MemoryUtil.unreserveDirectMemory(size);
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodUnreserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Get current reservation of jVM direct memory. Visible for testing.
*/
@VisibleForTesting
public long getCurrentDirectMemReservation() {
return MemoryUtil.getCurrentDirectMemReservation();
try {
final Class<?> classBits = Class.forName("java.nio.Bits");
final Field f = classBits.getDeclaredField("reservedMemory");
f.setAccessible(true);
return ((AtomicLong) f.get(null)).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.jni;

import org.apache.arrow.memory.*;

/**
* MemoryChunkManager implementation for native allocated memory.
*/
public class NativeUnderlyingMemory implements MemoryChunk {

private final int size;
private final long nativeInstanceId;
private final long address;

/**
* Constructor.
*
* @param size Size of underlying memory (in bytes)
* @param nativeInstanceId ID of the native instance
* @param address Address of underlying memory
*/
NativeUnderlyingMemory(int size, long nativeInstanceId, long address) {
this.size = size;
this.nativeInstanceId = nativeInstanceId;
this.address = address;
}

@Override
public long size() {
return size;
}

@Override
public long memoryAddress() {
return address;
}

@Override
public void destroy() {
JniWrapper.get().releaseBuffer(nativeInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ AllocationOutcome allocateBytes(long size) {
} else {
// Try again, but with details this time.
// Populating details only on failures avoids performance overhead in the common case (success case).
AllocationOutcomeDetails details = new AllocationOutcomeDetails();
AllocationOutcomeDetails details = new AllocationOutcomeDetails(this);
status = allocateBytesInternal(size, details);
return new AllocationOutcome(status, details);
}
Expand Down

This file was deleted.

Loading

0 comments on commit d891503

Please sign in to comment.