Skip to content

Commit

Permalink
fix: Execute WeakReference post-cleanup callbacks only in `java.lan…
Browse files Browse the repository at this point in the history
…g.Thread` (#3815)

* Execute `WeakReference` post-cleanup callbacks only in `java.lang.Thread`
* Restrict access to GC ffi
* Adjust MinimalRequiredSymbolsTest - increase in found symbols is fine: previously there was no threads created though large part of multithreading related code base was not linked
* Fix clangfmt to ignore removed files
  • Loading branch information
WojciechMazur committed Mar 7, 2024
1 parent e94358d commit 12f886c
Show file tree
Hide file tree
Showing 33 changed files with 196 additions and 388 deletions.
4 changes: 2 additions & 2 deletions javalib/src/main/scala/java/lang/System.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.{Collections, HashMap, Map, Properties, WindowsHelperMethods}
import scala.scalanative.posix.pwdOps._
import scala.scalanative.posix.{pwd, unistd}
import scala.scalanative.meta.LinktimeInfo.isWindows
import scala.scalanative.runtime.{GC, Intrinsics, Platform}
import scala.scalanative.runtime.{Proxy, Intrinsics, Platform}
import scala.scalanative.ffi.time
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._
Expand Down Expand Up @@ -84,7 +84,7 @@ object System {
def setErr(err: PrintStream): Unit =
this.err = err

def gc(): Unit = GC.collect()
def gc(): Unit = Proxy.GC_collect()
}

// Extract mutable fields to custom object allowing to skip allocations of unused features
Expand Down
23 changes: 11 additions & 12 deletions javalib/src/main/scala/java/lang/ref/ReferenceQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class ReferenceQueue[T] {
private[ref] def enqueue(reference: Reference[T]): Unit =
synchronized {
underlying += reference
notify()
notifyAll()
}

def poll(): Reference[T] = {
Expand All @@ -20,25 +20,24 @@ class ReferenceQueue[T] {
}
}

def remove(): Reference[_ <: T] =
remove(0)

def remove(): Reference[_ <: T] = remove(None)
def remove(timeout: Long): Reference[_ <: T] = {
if (timeout < 0) throw new IllegalArgumentException()
remove(Some(timeout))
}

private def remove(timeout: Option[Long]): Reference[_ <: T] =
synchronized[Reference[_ <: T]] {
def now() = System.currentTimeMillis()
val deadline = now() + timeout
def timeoutExceeded(current: Long): Boolean = {
if (timeout == 0) false
else current > deadline
}
val hasTimeout = timeout.isDefined
val deadline = now() + timeout.getOrElse(0L)
def timeoutExceeded(current: Long): Boolean =
hasTimeout && current > deadline

while (underlying.isEmpty && !timeoutExceeded(now())) {
val timeoutMillis = (deadline - now()).min(0L)
wait(timeoutMillis)
if (hasTimeout) wait((deadline - now()).min(0L))
else wait()
}
poll()
}
}
}
105 changes: 62 additions & 43 deletions javalib/src/main/scala/java/lang/ref/WeakReferenceRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package java.lang.ref

import scala.scalanative.unsafe._
import scala.scalanative.meta.LinktimeInfo.isWeakReferenceSupported
import scala.scalanative.runtime.GC
import scala.scalanative.meta.LinktimeInfo.isMultithreadingEnabled
import scala.scalanative.runtime.Proxy
import scala.scalanative.libc.stdatomic._
import scala.scalanative.runtime.fromRawPtr
import scala.scalanative.runtime.Intrinsics.classFieldRawPtr
import scala.scalanative.annotation.alwaysinline
import scala.util.control.NonFatal
import java.util.concurrent.locks.LockSupport

/* Should always be treated as a module by the compiler.
* _gc_modified_postGCControlField is explicitly acccessed
Expand All @@ -20,53 +22,70 @@ private[java] object WeakReferenceRegistry {
classFieldRawPtr(this, "weakRefsHead")
)

if (isWeakReferenceSupported) {
GC.registerWeakReferenceHandler(() => {
// This method is designed for calls from C and therefore should not include
// non statically reachable fields or methods.
private def handleCollectedReferences() = {
// This method is designed for calls from C and therefore should not include
// non statically reachable fields or methods.

// Detach current weak refs linked-list to allow for unsynchronized updated
val expected = stackalloc[WeakReference[_]]()
var detached = null.asInstanceOf[WeakReference[_]]
while ({
detached = weakRefsHead
!expected = detached
!atomic_compare_exchange_strong(weakRefsHeadPtr, expected, null)
}) ()
// Detach current weak refs linked-list to allow for unsynchronized updated
val expected = stackalloc[WeakReference[_]]()
var detached = null.asInstanceOf[WeakReference[_]]
while ({
detached = weakRefsHead
!expected = detached
!atomic_compare_exchange_strong(weakRefsHeadPtr, expected, null)
}) ()

var current = detached
var prev = null.asInstanceOf[WeakReference[_]]
while (current != null) {
// Actual post GC logic
val wasCollected = current.get() == null
if (wasCollected) {
current.enqueue()
val handler = current.postGCHandler
if (handler != null) {
try handler()
catch {
case NonFatal(err) =>
val thread = Thread.currentThread()
thread
.getUncaughtExceptionHandler()
.uncaughtException(thread, err)
}
var current = detached
var prev = null.asInstanceOf[WeakReference[_]]
while (current != null) {
// Actual post GC logic
val wasCollected = current.get() == null
if (wasCollected) {
current.enqueue()
val handler = current.postGCHandler
if (handler != null) {
try handler()
catch {
case NonFatal(err) =>
val thread = Thread.currentThread()
thread
.getUncaughtExceptionHandler()
.uncaughtException(thread, err)
}
// Update the detached linked list
if (prev == null) detached = current.nextReference
else prev.nextReference = current.nextReference
} else prev = current
current = current.nextReference
}
// Update the detached linked list
if (prev == null) detached = current.nextReference
else prev.nextReference = current.nextReference
} else prev = current
current = current.nextReference
}

// Reattach the weak refs list to the possibly updated head
if (detached != null) while ({
val currentHead = weakRefsHead
!expected = currentHead
prev.nextReference = currentHead
!atomic_compare_exchange_strong(weakRefsHeadPtr, expected, detached)
}) ()
}

private lazy val referenceHandlerThread = Thread
.ofPlatform()
.daemon()
.group(ThreadGroup.System)
.name("GC-WeakReferenceHandler")
.start(() =>
while (true) {
handleCollectedReferences()
LockSupport.park()
}
)

// Reattach the weak refs list to the possibly updated head
if (detached != null) while ({
val currentHead = weakRefsHead
!expected = currentHead
prev.nextReference = currentHead
!atomic_compare_exchange_strong(weakRefsHeadPtr, expected, detached)
}) ()
})
if (isWeakReferenceSupported) {
Proxy.GC_setWeakReferencesCollectedCallback { () =>
if (isMultithreadingEnabled) LockSupport.unpark(referenceHandlerThread)
else handleCollectedReferences()
}
}

private[ref] def add(weakRef: WeakReference[_]): Unit =
Expand Down
6 changes: 6 additions & 0 deletions javalib/src/main/scala/scala/scalanative/runtime/Proxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ object Proxy {
thread = thread,
throwable = ex
)

def GC_collect(): Unit = GC.collect()
type GCWeakReferencesCollectedCallback = GC.WeakReferencesCollectedCallback
def GC_setWeakReferencesCollectedCallback(
callback: GCWeakReferencesCollectedCallback
): Unit = GC.setWeakReferencesCollectedCallback(callback)
}
3 changes: 2 additions & 1 deletion nativelib/src/main/resources/scala-native/gc/boehm/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ size_t scalanative_GC_get_max_heapsize() {

void scalanative_GC_collect() { GC_gcollect(); }

void scalanative_GC_register_weak_reference_handler(void *handler) {}
void scalanative_GC_set_weak_references_collected_callback(
WeakReferencesCollectedCallback callback) {}

#ifdef SCALANATIVE_MULTITHREADING_ENABLED
#ifdef _WIN32
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "Constants.h"
#include "Settings.h"
#include "GCThread.h"
#include "WeakRefGreyList.h"
#include "WeakReferences.h"
#include "Sweeper.h"
#include "immix_commix/Synchronizer.h"

Expand Down Expand Up @@ -43,7 +43,6 @@ NOINLINE void scalanative_GC_init() {
Heap_Init(&heap, Settings_MinHeapSize(), Settings_MaxHeapSize());
#ifdef SCALANATIVE_MULTITHREADING_ENABLED
Synchronizer_init();
weakRefsHandlerThread = GCThread_WeakThreadsHandler_Start();
#endif
MutatorThreads_init();
MutatorThread_init((word_t **)dummy); // approximate stack bottom
Expand Down Expand Up @@ -94,8 +93,9 @@ INLINE void *scalanative_GC_alloc_array(Rtti *info, size_t length,

INLINE void scalanative_GC_collect() { Heap_Collect(&heap); }

INLINE void scalanative_GC_register_weak_reference_handler(void *handler) {
WeakRefGreyList_SetHandler(handler);
INLINE void scalanative_GC_set_weak_references_collected_callback(
WeakReferencesCollectedCallback callback) {
WeakReferences_SetGCFinishedCallback(callback);
}

/* Get the minimum heap size */
Expand Down
84 changes: 3 additions & 81 deletions nativelib/src/main/resources/scala-native/gc/commix/GCThread.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "Sweeper.h"
#include "Marker.h"
#include "Phase.h"
#include "WeakRefGreyList.h"
#include "WeakReferences.h"
#include <errno.h>
#include <stdlib.h>
#include "State.h"
Expand Down Expand Up @@ -54,7 +54,7 @@ static inline void GCThread_nullify(Heap *heap, Stats *stats) {
Stats_RecordTime(stats, start_ns);
Stats_PhaseStarted(stats);

WeakRefGreyList_Nullify(heap, stats);
WeakReferences_Nullify(heap, stats);

Stats_RecordTime(stats, end_ns);
Stats_RecordEvent(stats, event_concurrent_nullify, start_ns, end_ns);
Expand All @@ -64,7 +64,7 @@ static inline void GCThread_nullifyMaster(Heap *heap, Stats *stats) {
Stats_RecordTime(stats, start_ns);
Stats_PhaseStarted(stats);

WeakRefGreyList_NullifyAndScale(heap, stats);
WeakReferences_NullifyAndScale(heap, stats);

Stats_RecordTime(stats, end_ns);
Stats_RecordEvent(stats, event_concurrent_nullify, start_ns, end_ns);
Expand Down Expand Up @@ -275,82 +275,4 @@ void GCThread_ScaleMarkerThreads(Heap *heap, uint32_t remainingFullPackets) {
}
}

static void
GCThread_WeakThreadsHandler_init(struct GCWeakRefsHandlerThread *self) {
MutatorThread_init((word_t **)&self);
MutatorThread_switchState(currentMutatorThread,
GC_MutatorThreadState_Unmanaged);
#ifdef _WIN32
self->resumeEvent = CreateEvent(NULL, true, false, NULL);
if (self->resumeEvent == NULL) {
fprintf(stderr,
"Failed to setup GC weak refs threads event: error=%" PRIdErr
"\n",
LastError);
exit(ExitValue);
}
#else
if (pthread_mutex_init(&self->resumeEvent.lock, NULL) != 0 ||
pthread_cond_init(&self->resumeEvent.cond, NULL) != 0) {
perror("Failed to setup GC weak refs thread");
exit(1);
}
#endif
}

// ----------------
// Weak Refs handler
// -----------------
static void *GCThread_WeakThreadsHandlerLoop(void *arg) {
struct GCWeakRefsHandlerThread *self =
(struct GCWeakRefsHandlerThread *)arg;
GCThread_WeakThreadsHandler_init(self);
// main loop
while (true) {
// Wait for dispatch
#ifdef _WIN32
while (!atomic_load(&self->isActive)) {
WaitForSingleObject(self->resumeEvent, INFINITE);
ResetEvent(self->resumeEvent);
}
#else
pthread_mutex_lock(&self->resumeEvent.lock);
while (!atomic_load(&self->isActive)) {
pthread_cond_wait(&self->resumeEvent.cond, &self->resumeEvent.lock);
}
pthread_mutex_unlock(&self->resumeEvent.lock);
#endif
MutatorThread_switchState(currentMutatorThread,
GC_MutatorThreadState_Managed);
WeakRefGreyList_CallHandlers();
MutatorThread_switchState(currentMutatorThread,
GC_MutatorThreadState_Unmanaged);
atomic_store(&self->isActive, false);
}
free(self);
}

struct GCWeakRefsHandlerThread *GCThread_WeakThreadsHandler_Start() {
struct GCWeakRefsHandlerThread *thread =
(struct GCWeakRefsHandlerThread *)malloc(
sizeof(struct GCWeakRefsHandlerThread));
thread_create(&thread->handle, GCThread_WeakThreadsHandlerLoop,
(void *)thread);
return thread;
}

void GCThread_WeakThreadsHandler_Resume(
struct GCWeakRefsHandlerThread *thread) {
bool expected = false;
if (atomic_compare_exchange_weak(&thread->isActive, &expected, true)) {
#ifdef _WIN32
SetEvent(thread->resumeEvent);
#else
pthread_mutex_lock(&thread->resumeEvent.lock);
pthread_cond_signal(&thread->resumeEvent.cond);
pthread_mutex_unlock(&thread->resumeEvent.lock);
#endif
}
}

#endif
16 changes: 0 additions & 16 deletions nativelib/src/main/resources/scala-native/gc/commix/GCThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,4 @@ void GCThread_WakeMaster(Heap *heap);
void GCThread_WakeWorkers(Heap *heap, int toWake);
void GCThread_ScaleMarkerThreads(Heap *heap, uint32_t remainingFullPackets);

struct GCWeakRefsHandlerThread {
thread_t handle;
atomic_bool isActive;
#ifdef _WIN32
HANDLE resumeEvent;
#else
struct {
pthread_mutex_t lock;
pthread_cond_t cond;
} resumeEvent;
#endif
};

struct GCWeakRefsHandlerThread *GCThread_WeakThreadsHandler_Start();
void GCThread_WeakThreadsHandler_Resume(struct GCWeakRefsHandlerThread *);

#endif // IMMIX_GCTHREAD_H
5 changes: 2 additions & 3 deletions nativelib/src/main/resources/scala-native/gc/commix/Heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <memory.h>
#include <time.h>
#include <inttypes.h>
#include "WeakRefGreyList.h"
#include "WeakReferences.h"
#include "immix_commix/Synchronizer.h"

void Heap_exitWithOutOfMemory(const char *details) {
Expand Down Expand Up @@ -257,12 +257,11 @@ void Heap_Collect(Heap *heap) {
Phase_StartSweep(heap);
#ifdef SCALANATIVE_MULTITHREADING_ENABLED
Synchronizer_release();
GCThread_WeakThreadsHandler_Resume(weakRefsHandlerThread);
#else
MutatorThread_switchState(currentMutatorThread,
GC_MutatorThreadState_Managed);
WeakRefGreyList_CallHandlers();
#endif
WeakReferences_InvokeGCFinishedCallback();
}

bool Heap_shouldGrow(Heap *heap) {
Expand Down

0 comments on commit 12f886c

Please sign in to comment.