Skip to content

Commit

Permalink
apacheGH-37286: [Java] Start adding nullability/nullness annotations (a…
Browse files Browse the repository at this point in the history
…pache#37723)

### Rationale for this change

Closes: apache#37286

### What changes are included in this PR?

Initial support for: 

- Use the Checker Framework to enhances Java’s type system to make it more powerful and useful. Planning to start with [Nullness Checker](https://checkerframework.org/manual/#nullness-checker)

### Are these changes tested?

These are the activities involved on this PR:

- [x] Configure the Checker Framework
- [x] Treat checker errors as warnings initially 
- [x] Applying Nullness Checker annotation as needed: @ NonNull / @ Nullable
- [x] Check if building timer increases after this checker is added
- [x] Fixes for code review

### Are there any user-facing changes?

Yes
* Closes: apache#37286

Lead-authored-by: david dali susanibar arce <davi.sarces@gmail.com>
Co-authored-by: David Susanibar Arce <dsusanibar@Voltron.local>
Co-authored-by: David Susanibar Arce <davi.sarces@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
davisusanibar and David Susanibar Arce committed Jan 9, 2024
1 parent 74a29a5 commit ccc79e9
Show file tree
Hide file tree
Showing 19 changed files with 357 additions and 192 deletions.
45 changes: 45 additions & 0 deletions java/memory/memory-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -90,5 +94,46 @@
</plugins>
</build>
</profile>

<profile>
<id>checkerframework-jdk11+</id>
<activation>
<jdk>[11,]</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
<compilerArgs combine.children="append">
<arg>-Xmaxerrs</arg> <!-- javac only reports the first 100 errors or warnings -->
<arg>10000</arg>
<arg>-Xmaxwarns</arg>
<arg>10000</arg>
<arg>-AskipDefs=.*Test</arg> <!-- Skip analysis for Testing classes -->
<arg>-AatfDoNotCache</arg> <!-- not cache results -->
</compilerArgs>
<annotationProcessorPaths combine.children="append">
<path>
<groupId>org.checkerframework</groupId>
<artifactId>checker</artifactId>
<version>${checker.framework.version}</version>
</path>
</annotationProcessorPaths>
<annotationProcessors>
<!-- To support @Value.Immutable processors -->
<annotationProcessor>org.immutables.value.internal.$processor$.$Processor</annotationProcessor>
<!-- Add all the checkers you want to enable here -->
<annotationProcessor>org.checkerframework.checker.nullness.NullnessChecker</annotationProcessor>
</annotationProcessors>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.concurrent.ThreadSafe;

import org.apache.arrow.util.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Provides a concurrent way to manage account for memory usage without locking. Used as basis
Expand All @@ -34,7 +35,7 @@ class Accountant implements AutoCloseable {
/**
* The parent allocator.
*/
protected final Accountant parent;
protected final @Nullable Accountant parent;

private final String name;

Expand All @@ -59,7 +60,7 @@ class Accountant implements AutoCloseable {
*/
private final AtomicLong locallyHeldMemory = new AtomicLong();

public Accountant(Accountant parent, String name, long reservation, long maxAllocation) {
public Accountant(@Nullable Accountant parent, String name, long reservation, long maxAllocation) {
Preconditions.checkNotNull(name, "name must not be null");
Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative.");
Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative.");
Expand All @@ -73,12 +74,13 @@ public Accountant(Accountant parent, String name, long reservation, long maxAllo
this.allocationLimit.set(maxAllocation);

if (reservation != 0) {
Preconditions.checkArgument(parent != null, "parent must not be null");
// we will allocate a reservation from our parent.
final AllocationOutcome outcome = parent.allocateBytes(reservation);
if (!outcome.isOk()) {
throw new OutOfMemoryException(String.format(
"Failure trying to allocate initial reservation for Allocator. " +
"Attempted to allocate %d bytes.", reservation), outcome.getDetails());
"Failure trying to allocate initial reservation for Allocator. " +
"Attempted to allocate %d bytes.", reservation), outcome.getDetails());
}
}
}
Expand All @@ -103,7 +105,7 @@ AllocationOutcome allocateBytes(long size) {
}
}

private AllocationOutcome.Status allocateBytesInternal(long size, AllocationOutcomeDetails details) {
private AllocationOutcome.Status allocateBytesInternal(long size, @Nullable AllocationOutcomeDetails details) {
final AllocationOutcome.Status status = allocate(size,
true /*incomingUpdatePeek*/, false /*forceAllocation*/, details);
if (!status.isOk()) {
Expand Down Expand Up @@ -168,7 +170,7 @@ public boolean forceAllocate(long size) {
* @return The outcome of the allocation.
*/
private AllocationOutcome.Status allocate(final long size, final boolean incomingUpdatePeak,
final boolean forceAllocation, AllocationOutcomeDetails details) {
final boolean forceAllocation, @Nullable AllocationOutcomeDetails details) {
final long oldLocal = locallyHeldMemory.getAndAdd(size);
final long newLocal = oldLocal + size;
// Borrowed from Math.addExact (but avoid exception here)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.arrow.memory;

import org.apache.arrow.util.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* An AllocationManager is the implementation of a physical memory allocation.
Expand Down Expand Up @@ -48,8 +49,9 @@ public abstract class AllocationManager {
// This is mostly a semantic constraint on the API user: if the reference count reaches 0 in the owningLedger, then
// there are not supposed to be any references through other allocators. In practice, this doesn't do anything
// as the implementation just forces ownership to be transferred to one of the other extant references.
private volatile BufferLedger owningLedger;
private volatile @Nullable BufferLedger owningLedger;

@SuppressWarnings("nullness:method.invocation") //call to associate(a, b) not allowed on the given receiver
protected AllocationManager(BufferAllocator accountingAllocator) {
Preconditions.checkNotNull(accountingAllocator);
accountingAllocator.assertOpen();
Expand All @@ -61,7 +63,7 @@ protected AllocationManager(BufferAllocator accountingAllocator) {
this.owningLedger = associate(accountingAllocator, false);
}

BufferLedger getOwningLedger() {
@Nullable BufferLedger getOwningLedger() {
return owningLedger;
}

Expand Down Expand Up @@ -133,9 +135,9 @@ void release(final BufferLedger ledger) {
// remove the <BaseAllocator, BufferLedger> mapping for the allocator
// of calling BufferLedger
Preconditions.checkState(map.containsKey(allocator),
"Expecting a mapping for allocator and reference manager");
"Expecting a mapping for allocator and reference manager");
final BufferLedger oldLedger = map.remove(allocator);

Preconditions.checkState(oldLedger != null, "Expecting a mapping for allocator and reference manager");
BufferAllocator oldAllocator = oldLedger.getAllocator();
if (oldAllocator instanceof BaseAllocator) {
// needed for debug only: tell the allocator that AllocationManager is removing a
Expand Down Expand Up @@ -168,7 +170,7 @@ void release(final BufferLedger ledger) {
// the release call was made by a non-owning reference manager, so after remove there have
// to be 1 or more <allocator, reference manager> mappings
Preconditions.checkState(map.size() > 0,
"The final removal of reference manager should be connected to owning reference manager");
"The final removal of reference manager should be connected to owning reference manager");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

import java.util.Optional;

import org.checkerframework.checker.nullness.qual.Nullable;


/**
* Describes the type of outcome that occurred when trying to account for allocation of memory.
*/
public class AllocationOutcome {
private final Status status;
private final AllocationOutcomeDetails details;
private final @Nullable AllocationOutcomeDetails details;
static final AllocationOutcome SUCCESS_INSTANCE = new AllocationOutcome(Status.SUCCESS);

AllocationOutcome(Status status, AllocationOutcomeDetails details) {
AllocationOutcome(Status status, @Nullable AllocationOutcomeDetails details) {
this.status = status;
this.details = details;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.ArrayDeque;
import java.util.Deque;

import org.checkerframework.checker.nullness.qual.Nullable;


/**
* Captures details of allocation for each accountant in the hierarchical chain.
*/
Expand Down Expand Up @@ -47,7 +50,7 @@ void pushEntry(Accountant accountant, long totalUsedBeforeAllocation, long reque
* Get the allocator that caused the failure.
* @return the allocator that caused failure, null if there was no failure.
*/
public BufferAllocator getFailedAllocator() {
public @Nullable BufferAllocator getFailedAllocator() {
Entry top = allocEntries.peekLast();
if (top != null && top.allocationFailed && (top.accountant instanceof BufferAllocator)) {
return (BufferAllocator) top.accountant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.util.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* ArrowBuf serves as a facade over underlying memory by providing
Expand Down Expand Up @@ -68,11 +69,11 @@ public final class ArrowBuf implements AutoCloseable {
private static final int LOG_BYTES_PER_ROW = 10;
private final long id = idGenerator.incrementAndGet();
private final ReferenceManager referenceManager;
private final BufferManager bufferManager;
private final @Nullable BufferManager bufferManager;
private final long addr;
private long readerIndex;
private long writerIndex;
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ?
private final @Nullable HistoricalLog historicalLog = BaseAllocator.DEBUG ?
new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null;
private volatile long capacity;

Expand All @@ -84,7 +85,7 @@ public final class ArrowBuf implements AutoCloseable {
*/
public ArrowBuf(
final ReferenceManager referenceManager,
final BufferManager bufferManager,
final @Nullable BufferManager bufferManager,
final long capacity,
final long memoryAddress) {
this.referenceManager = referenceManager;
Expand All @@ -93,7 +94,7 @@ public ArrowBuf(
this.capacity = capacity;
this.readerIndex = 0;
this.writerIndex = 0;
if (BaseAllocator.DEBUG) {
if (historicalLog != null) {
historicalLog.recordEvent("create()");
}
}
Expand Down Expand Up @@ -244,7 +245,7 @@ public int hashCode() {
}

@Override
public boolean equals(Object obj) {
public boolean equals(@Nullable Object obj) {
// identity equals only.
return this == obj;
}
Expand Down Expand Up @@ -313,7 +314,7 @@ private void checkIndexD(long index, long fieldLength) {
// check bounds
Preconditions.checkArgument(fieldLength >= 0, "expecting non-negative data length");
if (index < 0 || index > capacity() - fieldLength) {
if (BaseAllocator.DEBUG) {
if (historicalLog != null) {
historicalLog.logHistory(logger);
}
throw new IndexOutOfBoundsException(String.format(
Expand Down Expand Up @@ -736,7 +737,7 @@ public void getBytes(long index, byte[] dst, int dstIndex, int length) {
if (length != 0) {
// copy "length" bytes from this ArrowBuf starting at addr(index) address
// into dst byte array at dstIndex onwards
MemoryUtil.UNSAFE.copyMemory(null, addr(index), dst, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + dstIndex, length);
MemoryUtil.copyMemory(null, addr(index), dst, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + dstIndex, length);
}
}

Expand Down Expand Up @@ -773,7 +774,7 @@ public void setBytes(long index, byte[] src, int srcIndex, long length) {
if (length > 0) {
// copy "length" bytes from src byte array at the starting index (srcIndex)
// into this ArrowBuf starting at address "addr(index)"
MemoryUtil.UNSAFE.copyMemory(src, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + srcIndex, null, addr(index), length);
MemoryUtil.copyMemory(src, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + srcIndex, null, addr(index), length);
}
}

Expand All @@ -799,15 +800,15 @@ public void getBytes(long index, ByteBuffer dst) {
// at address srcAddress into the dst ByteBuffer starting at
// address dstAddress
final long dstAddress = MemoryUtil.getByteBufferAddress(dst) + dst.position();
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, dst.remaining());
MemoryUtil.copyMemory(null, srcAddress, null, dstAddress, dst.remaining());
// after copy, bump the next write position for the dst ByteBuffer
dst.position(dst.position() + dst.remaining());
} else if (dst.hasArray()) {
// copy dst.remaining() bytes of data from this ArrowBuf starting
// at address srcAddress into the dst ByteBuffer starting at
// index dstIndex
final int dstIndex = dst.arrayOffset() + dst.position();
MemoryUtil.UNSAFE.copyMemory(
MemoryUtil.copyMemory(
null, srcAddress, dst.array(), MemoryUtil.BYTE_ARRAY_BASE_OFFSET + dstIndex, dst.remaining());
// after copy, bump the next write position for the dst ByteBuffer
dst.position(dst.position() + dst.remaining());
Expand Down Expand Up @@ -836,14 +837,14 @@ public void setBytes(long index, ByteBuffer src) {
// copy src.remaining() bytes of data from src ByteBuffer starting at
// address srcAddress into this ArrowBuf starting at address dstAddress
final long srcAddress = MemoryUtil.getByteBufferAddress(src) + src.position();
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
MemoryUtil.copyMemory(null, srcAddress, null, dstAddress, length);
// after copy, bump the next read position for the src ByteBuffer
src.position(src.position() + length);
} else if (src.hasArray()) {
// copy src.remaining() bytes of data from src ByteBuffer starting at
// index srcIndex into this ArrowBuf starting at address dstAddress
final int srcIndex = src.arrayOffset() + src.position();
MemoryUtil.UNSAFE.copyMemory(
MemoryUtil.copyMemory(
src.array(), MemoryUtil.BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddress, length);
// after copy, bump the next read position for the src ByteBuffer
src.position(src.position() + length);
Expand Down Expand Up @@ -896,7 +897,7 @@ public void setBytes(long index, ByteBuffer src, int srcIndex, int length) {
// srcAddress into this ArrowBuf at address dstAddress
final long srcAddress = MemoryUtil.getByteBufferAddress(src) + srcIndex;
final long dstAddress = addr(index);
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
MemoryUtil.copyMemory(null, srcAddress, null, dstAddress, length);
} else {
if (srcIndex == 0 && src.capacity() == length) {
// copy the entire ByteBuffer from start to end of length
Expand Down Expand Up @@ -936,7 +937,7 @@ public void getBytes(long index, ArrowBuf dst, long dstIndex, int length) {
// dstAddress
final long srcAddress = addr(index);
final long dstAddress = dst.memoryAddress() + (long) dstIndex;
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
MemoryUtil.copyMemory(null, srcAddress, null, dstAddress, length);
}
}

Expand Down Expand Up @@ -966,7 +967,7 @@ public void setBytes(long index, ArrowBuf src, long srcIndex, long length) {
// dstAddress
final long srcAddress = src.memoryAddress() + srcIndex;
final long dstAddress = addr(index);
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
MemoryUtil.copyMemory(null, srcAddress, null, dstAddress, length);
}
}

Expand All @@ -986,7 +987,7 @@ public void setBytes(long index, ArrowBuf src) {
checkIndex(index, length);
final long srcAddress = src.memoryAddress() + src.readerIndex;
final long dstAddress = addr(index);
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
MemoryUtil.copyMemory(null, srcAddress, null, dstAddress, length);
src.readerIndex(src.readerIndex + length);
}

Expand All @@ -1011,7 +1012,7 @@ public int setBytes(long index, InputStream in, int length) throws IOException {
if (readBytes > 0) {
// copy readBytes length of data from the tmp byte array starting
// at srcIndex 0 into this ArrowBuf starting at address addr(index)
MemoryUtil.UNSAFE.copyMemory(tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, null, addr(index), readBytes);
MemoryUtil.copyMemory(tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, null, addr(index), readBytes);
}
}
return readBytes;
Expand All @@ -1033,7 +1034,7 @@ public void getBytes(long index, OutputStream out, int length) throws IOExceptio
// copy length bytes of data from this ArrowBuf starting at
// address addr(index) into the tmp byte array starting at index 0
byte[] tmp = new byte[length];
MemoryUtil.UNSAFE.copyMemory(null, addr(index), tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, length);
MemoryUtil.copyMemory(null, addr(index), tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, length);
// write the copied data to output stream
out.write(tmp);
}
Expand Down Expand Up @@ -1109,7 +1110,7 @@ public long getId() {
public void print(StringBuilder sb, int indent, Verbosity verbosity) {
CommonUtil.indent(sb, indent).append(toString());

if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) {
if (historicalLog != null && verbosity.includeHistoricalLog) {
sb.append("\n");
historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
}
Expand Down
Loading

0 comments on commit ccc79e9

Please sign in to comment.