Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion storm-cli/storm.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ function renderFrame(now) {
const mi = matrixIntensityAt(mx, y);
if (hasBolt(x, y)) {
if (mi >= 3) out += matrixColor(mi) + matrixGlyph(mx, y, now) + RESET;
else if (mi > 0) out += matrixColor(mi) + '#' + RESET;
else out += dbColor(y, false) + '#' + RESET;
} else {
out += dbColor(y, false) + '@' + RESET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
require(context is JdbcTransactionContext) { "Transaction context must be of type JdbcTransactionContext." }
validateState()
val connection = context.getConnection(dataSource)
ConcurrencyDetector.beforeAccess(connection)
ConcurrencyDetector.beforeAccess(connection, context)
return connection
}
// If no programmatic transaction is active, obtain a new connection from the data source.
Expand All @@ -50,7 +50,7 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
if (context.currentConnection() == connection) {
// If this connection is the current transaction connection, do not close it. It will be closed when the
// outermost transaction ends.
ConcurrencyDetector.afterAccess(connection)
ConcurrencyDetector.afterAccess(connection, context)
return
}
}
Expand Down Expand Up @@ -111,8 +111,13 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
}

/**
* Detects concurrent access to transaction-scoped connections. Note that the same thread can access the same
* connection multiple times.
* Detects concurrent access to transaction-scoped connections.
*
* Ownership is tracked by transaction context identity rather than thread identity, because coroutines may resume
* on a different virtual thread after suspension (especially with OpenTelemetry or other javaagent instrumentation
* that wraps dispatched tasks).
*
* The same context can access the same connection multiple times (re-entrancy).
*/
object ConcurrencyDetector {
private class ConnectionIdentity(connection: Connection, queue: ReferenceQueue<Connection>) : WeakReference<Connection>(connection, queue) {
Expand All @@ -121,7 +126,7 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
override fun equals(other: Any?) = other is ConnectionIdentity && this.get() === other.get() && this.get() != null
}

private data class Owner(var thread: Thread? = null, var depth: Int = 0)
private data class Owner(var context: TransactionContext? = null, var depth: Int = 0)
private val queue = ReferenceQueue<Connection>()
private val owners = ConcurrentHashMap<ConnectionIdentity, Owner>()

Expand All @@ -132,33 +137,31 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
}
}

fun beforeAccess(connection: Connection) {
fun beforeAccess(connection: Connection, context: TransactionContext) {
reap()
val key = ConnectionIdentity(connection, queue)
val owner = owners.computeIfAbsent(key) { Owner() }
val t = Thread.currentThread()
synchronized(owner) {
when (owner.thread) {
when (owner.context) {
null -> {
owner.thread = t
owner.context = context
owner.depth = 1
}
t -> owner.depth++
context -> owner.depth++
else -> throw PersistenceException("Concurrent access on $connection.")
}
}
}

fun afterAccess(connection: Connection) {
fun afterAccess(connection: Connection, context: TransactionContext) {
reap()
val key = ConnectionIdentity(connection, queue)
val owner = owners[key] ?: return
val t = Thread.currentThread()
var clear = false
synchronized(owner) {
if (owner.thread !== t) return
if (owner.context !== context) return
if (--owner.depth == 0) {
owner.thread = null
owner.context = null
clear = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.jdbc.Sql
import org.springframework.test.context.junit.jupiter.SpringExtension
import st.orm.PersistenceException
import st.orm.core.spi.TransactionContext
import st.orm.repository.countAll
import st.orm.template.impl.CoroutineAwareConnectionProviderImpl
import st.orm.template.model.City
Expand All @@ -31,8 +32,6 @@ open class ConnectionProviderTest(
@Autowired val dataSource: DataSource,
) {

// Connection acquisition without transaction

@Test
fun `getConnection without transaction should return new connection`() {
val provider = CoroutineAwareConnectionProviderImpl()
Expand All @@ -52,8 +51,6 @@ open class ConnectionProviderTest(
connection.isClosed.shouldBeTrue()
}

// Connection within transaction

@Test
fun `getConnection within transaction should reuse transaction connection`(): Unit = runBlocking {
transactionBlocking {
Expand All @@ -72,48 +69,77 @@ open class ConnectionProviderTest(
}
}

// ConcurrencyDetector
private fun stubContext(): TransactionContext = object : TransactionContext {
override fun entityCache(entityType: Class<out st.orm.Entity<*>>, retention: st.orm.core.spi.CacheRetention) = throw UnsupportedOperationException()
override fun getEntityCache(entityType: Class<out st.orm.Entity<*>>) = throw UnsupportedOperationException()
override fun findEntityCache(entityType: Class<out st.orm.Entity<*>>) = null
override fun clearAllEntityCaches() {}
override fun <T : Any?> getDecorator(resourceType: Class<T>): TransactionContext.Decorator<T> = TransactionContext.Decorator { it }
}

@Test
fun `ConcurrencyDetector beforeAccess and afterAccess on same thread should succeed`() {
fun `ConcurrencyDetector beforeAccess and afterAccess with same context should succeed`() {
val connection = dataSource.connection
val context = stubContext()
try {
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
} finally {
connection.close()
}
}

@Test
fun `ConcurrencyDetector should allow nested access on same thread`() {
fun `ConcurrencyDetector should allow nested access with same context`() {
val connection = dataSource.connection
val context = stubContext()
try {
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
} finally {
connection.close()
}
}

@Test
fun `ConcurrencyDetector should allow same context from different thread`() {
val connection = dataSource.connection
val context = stubContext()
try {
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
// Same context, different thread — simulates virtual thread migration.
val thread = Thread {
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
}
thread.start()
thread.join()
} finally {
connection.close()
}
}

@Test
fun `ConcurrencyDetector should detect concurrent access from different threads`() {
fun `ConcurrencyDetector should detect concurrent access from different contexts`() {
val connection = dataSource.connection
val context1 = stubContext()
val context2 = stubContext()
try {
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context1)
var caughtException: Throwable? = null
val thread = Thread {
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context2)
}
thread.setUncaughtExceptionHandler { _, throwable -> caughtException = throwable }
thread.start()
thread.join()
assertThrows<PersistenceException> {
caughtException?.let { throw it }
}
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context1)
} finally {
connection.close()
}
Expand All @@ -122,9 +148,10 @@ open class ConnectionProviderTest(
@Test
fun `ConcurrencyDetector afterAccess on unknown connection should be no-op`() {
val connection = dataSource.connection
val context = stubContext()
try {
// afterAccess on a connection never registered should not throw
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
} finally {
connection.close()
}
Expand Down
Loading