Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions kmp/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,45 @@ configurations.all {
)
}

// Display resolution helpers — work on X11, Wayland, and Wayland+XWayland.
// Needed when $DISPLAY / $WAYLAND_DISPLAY are not forwarded into the Gradle daemon
// (common on Wayland desktops and in SSH sessions without X forwarding).

// Reads the effective UID from /proc (Linux only); null elsewhere.
fun linuxUid(): String? = try {
IoFile("/proc/self/status").readLines()
.firstOrNull { it.startsWith("Uid:") }
?.split("\\s+".toRegex())?.getOrNull(1)
} catch (_: Exception) { null }

// XDG_RUNTIME_DIR: env var → /proc UID probe → null.
fun resolvedXdgRuntimeDir(): String? =
System.getenv("XDG_RUNTIME_DIR")?.takeIf { IoFile(it).isDirectory }
?: linuxUid()?.let { uid -> "/run/user/$uid".takeIf { IoFile(it).isDirectory } }

// DISPLAY: env var → XWayland lock-file probe → null.
fun resolvedDisplay(): String? {
System.getenv("DISPLAY")?.takeIf { it.isNotBlank() }?.let { return it }
val lockPattern = Regex("\\.X\\d+-lock")
val lockName = IoFile("/tmp").list()?.filter { lockPattern.matches(it) }?.minOrNull()
?: return null
return ":${lockName.removePrefix(".X").removeSuffix("-lock")}"
}

// WAYLAND_DISPLAY: env var → probe wayland-0 socket in XDG_RUNTIME_DIR → null.
fun resolvedWaylandDisplay(): String? {
System.getenv("WAYLAND_DISPLAY")?.takeIf { it.isNotBlank() }?.let { return it }
val runtimeDir = resolvedXdgRuntimeDir() ?: return null
return if (IoFile("$runtimeDir/wayland-0").exists()) "wayland-0" else null
}

// Apply all three to a Test task (call once per task, not per property).
fun Test.configureDisplayEnv() {
resolvedDisplay()?.let { environment("DISPLAY", it) }
resolvedWaylandDisplay()?.let { environment("WAYLAND_DISPLAY", it) }
resolvedXdgRuntimeDir()?.let { environment("XDG_RUNTIME_DIR", it) }
}

// Configure JVM test task for Compose Desktop UI tests
tasks.named<Test>("jvmTest") {
// BlockHound is installed programmatically via BlockHoundTestBase.installBlockHound().
Expand All @@ -390,6 +429,7 @@ tasks.named<Test>("jvmTest") {
"--add-opens=java.base/java.lang=ALL-UNNAMED",
)
jvmArgs("-Djava.awt.headless=false")
configureDisplayEnv()
// Enable software rendering for CI environments
environment("LIBGL_ALWAYS_SOFTWARE", System.getenv("LIBGL_ALWAYS_SOFTWARE") ?: "")
environment("GALLIUM_DRIVER", System.getenv("GALLIUM_DRIVER") ?: "")
Expand Down Expand Up @@ -430,6 +470,7 @@ tasks.register<Test>("jvmTestFast") {
"--add-opens=java.base/java.lang=ALL-UNNAMED",
)
jvmArgs("-Djava.awt.headless=false")
configureDisplayEnv()
environment("LIBGL_ALWAYS_SOFTWARE", System.getenv("LIBGL_ALWAYS_SOFTWARE") ?: "")
environment("GALLIUM_DRIVER", System.getenv("GALLIUM_DRIVER") ?: "")

Expand Down
194 changes: 127 additions & 67 deletions kmp/src/commonMain/kotlin/dev/stapler/stelekit/db/DatabaseWriteActor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import dev.stapler.stelekit.repository.DirectRepositoryWrite
import dev.stapler.stelekit.repository.PageRepository
import dev.stapler.stelekit.coroutines.PlatformDispatcher
import dev.stapler.stelekit.util.UuidGenerator
import arrow.atomic.AtomicInt
import arrow.atomic.update
import arrow.atomic.value
import kotlin.concurrent.Volatile
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -122,19 +125,33 @@ class DatabaseWriteActor(
// @Volatile gives the required single-writer/multi-reader visibility without atomics.
@Volatile private var _isActorProcessing: Boolean = false

/**
* Counts callers that have successfully sent a request but whose [CompletableDeferred.await]
* has not yet returned. Incremented just before [Channel.send] and decremented in the
* finally-block that wraps [CompletableDeferred.await], so the counter stays balanced even
* when [send] throws (e.g. channel closed) or when the calling coroutine is cancelled while
* suspended in [await].
*
* Uses [AtomicInt] (Arrow KMP-safe typealias for [java.util.concurrent.atomic.AtomicInteger]
* on JVM/Android, native atomics on iOS/WASM) to avoid the lost-update race that would occur
* with a plain @Volatile Int incremented by concurrent callers.
*/
private val _activeOps = AtomicInt(0)

/**
* True while any write is queued in the channels or currently being processed.
*
* Thread-safe: [Channel.isEmpty] is safe to call from any thread/coroutine;
* [_isActorProcessing] is written only by the actor coroutine (@Volatile suffices
* for single-writer/multi-reader visibility). A request dequeued but not yet flagged
* is a negligible window — this is intentionally best-effort conflict protection.
* for single-writer/multi-reader visibility); [_activeOps] is an atomic counter
* incremented by every caller before [send] and decremented after [await] returns.
* Together they cover the full lifecycle of a write request from caller to result.
*
* Used by conflict detection: an external file change arriving while a split/merge is
* in-flight must trigger the conflict dialog rather than silently overwriting local data.
*/
val hasPendingWrites: Boolean
get() = !highPriority.isEmpty || !lowPriority.isEmpty || _isActorProcessing
get() = !highPriority.isEmpty || !lowPriority.isEmpty || _isActorProcessing || _activeOps.value != 0

// Own scope so the actor loop survives Compose scope cancellation (e.g. key(activeGraphId)
// graph switch). Callers must call close() to stop the actor.
Expand Down Expand Up @@ -195,65 +212,89 @@ class DatabaseWriteActor(
if (result.isRight()) onWriteSuccess?.invoke(request)
request.deferred.complete(result)
}
is WriteRequest.DeleteBlocksForPage -> {
if (opLogger != null) {
try {
val pageBlocks = blockRepository.getBlocksForPage(request.pageUuid).first().getOrNull()
pageBlocks?.forEach { opLogger.logDelete(it) }
} catch (e: CancellationException) {
throw e
} catch (_: Exception) {
// Non-fatal: op log failure must not block the delete
}
}
val result = blockRepository.deleteBlocksForPage(request.pageUuid)
if (result.isRight()) onWriteSuccess?.invoke(request)
request.deferred.complete(result)
is WriteRequest.DeleteBlocksForPage -> processDeleteBlocksForPage(request)
is WriteRequest.DeleteBlocksForPages -> processDeleteBlocksForPages(request)
is WriteRequest.SaveBlocks -> processSaveBlocks(request)
is WriteRequest.Execute -> processExecute(request)
}
}

private suspend fun processDeleteBlocksForPage(request: WriteRequest.DeleteBlocksForPage) {
if (opLogger != null) {
try {
val pageBlocks = blockRepository.getBlocksForPage(request.pageUuid).first().getOrNull()
pageBlocks?.forEach { opLogger.logDelete(it) }
} catch (e: CancellationException) {
throw e
} catch (_: Exception) {
// Non-fatal: op log failure must not block the delete
}
is WriteRequest.DeleteBlocksForPages -> {
if (opLogger != null) {
// Chunk page UUIDs so we fetch and delete together per chunk rather than
// materializing all blocks across every page before the first delete runs.
// Bounds peak memory to PAGE_DELETE_CHUNK × (blocks per page).
for (chunk in request.pageUuids.chunked(PAGE_DELETE_CHUNK)) {
logDeletesForChunk(chunk)
val chunkResult = blockRepository.deleteBlocksForPages(chunk)
if (chunkResult.isLeft()) {
request.deferred.complete(chunkResult)
return
}
}
onWriteSuccess?.invoke(request)
request.deferred.complete(Unit.right())
} else {
val result = blockRepository.deleteBlocksForPages(request.pageUuids)
if (result.isRight()) onWriteSuccess?.invoke(request)
request.deferred.complete(result)
}
val result = blockRepository.deleteBlocksForPage(request.pageUuid)
if (result.isRight()) onWriteSuccess?.invoke(request)
request.deferred.complete(result)
}

private suspend fun processDeleteBlocksForPages(request: WriteRequest.DeleteBlocksForPages) {
if (opLogger != null) {
// Chunk page UUIDs so we fetch and delete together per chunk rather than
// materializing all blocks across every page before the first delete runs.
// Bounds peak memory to PAGE_DELETE_CHUNK × (blocks per page).
for (chunk in request.pageUuids.chunked(PAGE_DELETE_CHUNK)) {
logDeletedBlocksForChunk(chunk)
val chunkResult = blockRepository.deleteBlocksForPages(chunk)
if (chunkResult.isLeft()) {
request.deferred.complete(chunkResult)
return
}
}
is WriteRequest.SaveBlocks -> processSaveBlocks(request)
is WriteRequest.Execute -> {
val waitMs = HistogramWriter.epochMs() - request.enqueueMs
if (waitMs > 10L) {
ringBuffer?.record(SerializedSpan(
name = "db.queue_wait",
startEpochMs = request.enqueueMs,
endEpochMs = request.enqueueMs + waitMs,
durationMs = waitMs,
attributes = mapOf(
"priority" to request.priority.name,
"session.id" to AppSession.id,
),
statusCode = if (waitMs > 500L) "ERROR" else "OK",
traceId = UuidGenerator.generateV7(),
spanId = UuidGenerator.generateV7(),
))
}
request.deferred.complete(request.op())
onWriteSuccess?.invoke(request)
request.deferred.complete(Unit.right())
} else {
val result = blockRepository.deleteBlocksForPages(request.pageUuids)
if (result.isRight()) onWriteSuccess?.invoke(request)
request.deferred.complete(result)
}
}

private suspend fun logDeletedBlocksForChunk(chunk: List<String>) {
val logger = opLogger ?: return
try {
for (uuid in chunk) {
val pageBlocks = blockRepository.getBlocksForPage(uuid).first().getOrNull()
pageBlocks?.forEach { logger.logDelete(it) }
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
this.logger.warn("Op log pre-delete read failed (non-fatal)", e)
}
}

private suspend fun processExecute(request: WriteRequest.Execute) {
val waitMs = HistogramWriter.epochMs() - request.enqueueMs
if (waitMs > 10L) {
recordQueueWaitSpan(request, waitMs)
}
request.deferred.complete(request.op())
}

private fun recordQueueWaitSpan(request: WriteRequest.Execute, waitMs: Long) {
ringBuffer?.record(SerializedSpan(
name = "db.queue_wait",
startEpochMs = request.enqueueMs,
endEpochMs = request.enqueueMs + waitMs,
durationMs = waitMs,
attributes = mapOf(
"priority" to request.priority.name,
"session.id" to AppSession.id,
),
statusCode = if (waitMs > 500L) "ERROR" else "OK",
traceId = UuidGenerator.generateV7(),
spanId = UuidGenerator.generateV7(),
))
}

/** Log deletes for each UUID in [chunk] via the op-logger (non-fatal). */
private suspend fun logDeletesForChunk(chunk: List<String>) {
try {
Expand Down Expand Up @@ -373,34 +414,29 @@ class DatabaseWriteActor(

suspend fun savePage(page: Page, priority: Priority = Priority.HIGH): Either<DomainError, Unit> {
val req = WriteRequest.SavePage(page, priority)
channelFor(priority).send(req)
return req.deferred.await()
return sendAndAwait(req)
}

suspend fun savePages(pages: List<Page>, priority: Priority = Priority.LOW): Either<DomainError, Unit> {
if (pages.isEmpty()) return Unit.right()
val req = WriteRequest.SavePages(pages, priority)
channelFor(priority).send(req)
return req.deferred.await()
return sendAndAwait(req)
}

suspend fun saveBlocks(blocks: List<Block>, priority: Priority = Priority.HIGH): Either<DomainError, Unit> {
val req = WriteRequest.SaveBlocks(blocks, priority)
channelFor(priority).send(req)
return req.deferred.await()
return sendAndAwait(req)
}

suspend fun deleteBlocksForPage(pageUuid: String, priority: Priority = Priority.HIGH): Either<DomainError, Unit> {
val req = WriteRequest.DeleteBlocksForPage(pageUuid, priority)
channelFor(priority).send(req)
return req.deferred.await()
return sendAndAwait(req)
}

suspend fun deleteBlocksForPages(pageUuids: List<String>, priority: Priority = Priority.LOW): Either<DomainError, Unit> {
if (pageUuids.isEmpty()) return Unit.right()
val req = WriteRequest.DeleteBlocksForPages(pageUuids, priority)
channelFor(priority).send(req)
return req.deferred.await()
return sendAndAwait(req)
}

/**
Expand All @@ -409,8 +445,32 @@ class DatabaseWriteActor(
*/
suspend fun execute(priority: Priority = Priority.HIGH, op: suspend () -> Either<DomainError, Unit>): Either<DomainError, Unit> {
val req = WriteRequest.Execute(op, priority)
channelFor(priority).send(req)
return req.deferred.await()
return sendAndAwait(req)
}

/**
* Increments [_activeOps], sends [req] to its priority channel, then awaits the result.
*
* The counter is decremented in all exit paths:
* - If [Channel.send] throws (channel closed / coroutine cancelled in send), the
* increment is undone immediately and the exception propagates.
* - Once [send] returns, a finally-block around [CompletableDeferred.await] guarantees
* the decrement regardless of whether [await] completes normally, throws, or is
* cancelled — so [_activeOps] can never leak above zero.
*/
private suspend fun sendAndAwait(req: WriteRequest): Either<DomainError, Unit> {
_activeOps.update { it + 1 }
try {
channelFor(req.priority).send(req)
} catch (e: Exception) {
_activeOps.update { it - 1 }
throw e
}
try {
return req.deferred.await()
} finally {
_activeOps.update { it - 1 }
}
}

suspend fun saveBlock(block: Block): Either<DomainError, Unit> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import dev.stapler.stelekit.model.MeasurementUnit
import dev.stapler.stelekit.model.NormalizedPoint
import dev.stapler.stelekit.platform.FileSystem
import dev.stapler.stelekit.platform.sensor.PlatformImageFile
import dev.stapler.stelekit.repository.BlockRepository
import dev.stapler.stelekit.repository.BlockWriteRepository
import dev.stapler.stelekit.repository.DirectRepositoryWrite
import dev.stapler.stelekit.repository.ImageAnnotationRepository
import dev.stapler.stelekit.repository.JournalService
Expand Down Expand Up @@ -43,7 +43,7 @@ import kotlin.time.Clock
class ImageImportService(
private val fileSystem: FileSystem,
private val imageAnnotationRepository: ImageAnnotationRepository? = null,
private val blockRepository: BlockRepository? = null,
private val blockRepository: BlockWriteRepository? = null,
private val sidecarManager: ImageSidecarManager? = null,
private val journalService: JournalService? = null,
private val writeActor: DatabaseWriteActor? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import dev.stapler.stelekit.model.Page
import dev.stapler.stelekit.model.CursorState
import dev.stapler.stelekit.editor.commands.*
import dev.stapler.stelekit.model.Block
import dev.stapler.stelekit.repository.BlockRepository
import dev.stapler.stelekit.repository.BlockReadRepository
import dev.stapler.stelekit.db.GraphWriter
import dev.stapler.stelekit.editor.text.ITextOperations
import dev.stapler.stelekit.editor.state.EditorState
Expand All @@ -36,7 +36,7 @@ import kotlinx.coroutines.CancellationException
* Updated to use UUID-native storage.
*/
class Editor(
private val blockRepository: BlockRepository,
private val blockRepository: BlockReadRepository,
private val textOperations: ITextOperations,
private val blockOperations: IBlockOperations,
private val commandSystem: ICommandSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import dev.stapler.stelekit.error.DomainError

import dev.stapler.stelekit.model.Block
import dev.stapler.stelekit.repository.BlockWithDepth
import dev.stapler.stelekit.repository.BlockRepository
import dev.stapler.stelekit.repository.DirectRepositoryWrite
import dev.stapler.stelekit.util.UuidGenerator
import kotlinx.coroutines.flow.Flow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import dev.stapler.stelekit.model.Block
import dev.stapler.stelekit.repository.BlockRepository
import dev.stapler.stelekit.repository.BlockWriteRepository
import dev.stapler.stelekit.editor.TextFormat
import dev.stapler.stelekit.performance.PerformanceMonitor
import kotlinx.coroutines.CoroutineScope
Expand All @@ -25,7 +25,7 @@ import kotlinx.coroutines.CancellationException
* Implementation of text operations for Logseq KMP editor
*/
class TextOperations(
private val blockRepository: BlockRepository
private val blockRepository: BlockWriteRepository
) : ITextOperations {

private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
Expand Down
Loading
Loading