Skip to content

Commit

Permalink
Added a way to invalidate a particular WritableModel id in the ModelC…
Browse files Browse the repository at this point in the history
…ache. Fixed WritableModel Get bug that prevented returning.
  • Loading branch information
bjsvedin committed Mar 6, 2024
1 parent feecfd9 commit 5e668ea
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 45 deletions.
Expand Up @@ -31,6 +31,10 @@ class MockModelCollection<T : HasId<ID>, ID : Comparable<ID>>(val serializer: KS
actionPerformed()
}

override suspend fun invalidate() {
actionPerformed()
}

override fun addListener(listener: () -> Unit): () -> Unit = property.addListener(listener)
override suspend fun awaitRaw(): T? = property.awaitRaw()
override suspend fun set(value: T?) {
Expand Down
Expand Up @@ -21,22 +21,17 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(

inner class WritableModelImpl(val id: ID) : WritableModel<T> {
var live: Int = 0
var lastSet: Double = 0.0
var ready: Boolean = false

override val serializer: KSerializer<T>
get() = this@ModelCache.serializer
private val listeners = ArrayList<() -> Unit>()
private val awaiting = ArrayList<Continuation<T?>>()

val inUse: Boolean get() = listeners.isNotEmpty() || awaiting.isNotEmpty()
override fun addListener(listener: () -> Unit): () -> Unit {
listeners.add(listener)
return {
val pos = listeners.indexOfFirst { it === listener }
if (pos != -1) {
listeners.removeAt(pos)
}
}
}
val upToDate: Boolean get() = (ready && live > 0) || clockMillis() - lastSet < cacheMs

var ready: Boolean = false
var value: T? = null
set(value) {
ready = true
Expand All @@ -46,8 +41,16 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
awaiting.toList().forEach { it.resume(value) }
awaiting.clear()
}
var lastSet: Double = 0.0
val upToDate: Boolean get() = live > 0 || clockMillis() - lastSet < cacheMs

override fun addListener(listener: () -> Unit): () -> Unit {
listeners.add(listener)
return {
val pos = listeners.indexOfFirst { it === listener }
if (pos != -1) {
listeners.removeAt(pos)
}
}
}

override suspend fun awaitRaw(): T? {
if (!upToDate) {
Expand All @@ -58,11 +61,11 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
}
}
}
return if(ready) value else suspendCoroutineCancellable { _ -> {} }
return value
}

override suspend infix fun set(value: T?) {
if(value == null) delete()
if (value == null) delete()
else {
val result = skipCache.replace(id, value)
this.value = result
Expand All @@ -77,7 +80,9 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
override suspend fun modify(modification: Modification<T>): T? {
val result = try {
skipCache.modify(id, modification)
} catch(e: Exception) { null } // TODO: we can do better than this
} catch (e: Exception) {
null
} // TODO: we can do better than this
val oldValue = value
value = result
for (query in queries) {
Expand All @@ -93,6 +98,10 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
virtualDelete()
}

override suspend fun invalidate() {
lastSet = 0.0
}

fun virtualDelete() {
val oldValue = value
value = null
Expand Down Expand Up @@ -141,7 +150,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
}

fun onAdded(item: T) {
if(!query.condition(item)) return
if (!query.condition(item)) return
if (comparator == null) return
if (!complete) {
val lastItem = cache[ids.lastOrNull() ?: return]?.value ?: return
Expand All @@ -161,6 +170,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(

fun onRemoved(id: ID) {
ids.remove(id)
unreportedChanges = true
}

fun reset(ids: Collection<ID>) {
Expand All @@ -178,11 +188,13 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(

val cache = HashMap<ID, WritableModelImpl>()
val queries = HashMap<Query<T>, ListImpl>()

private interface LivingSocket<T> {
fun start(): ()->Unit
fun start(): () -> Unit
val connected: Readable<Boolean>
fun send(condition: Condition<T>)
}

private val currentSocket: Async<LivingSocket<T>?> = asyncGlobal {
(skipCache as? ModelRestEndpointsPlusUpdatesWebsocket<T, ID>)?.updates()?.apply {
onMessage {
Expand All @@ -200,7 +212,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
send(socketCondition)
}
}?.let {
object: LivingSocket<T> {
object : LivingSocket<T> {
override fun start(): () -> Unit = it.start()
override val connected: Readable<Boolean> get() = it.connected
override fun send(condition: Condition<T>) = it.send(condition)
Expand All @@ -216,6 +228,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
}
queries.forEach { it.value.onAdded(new); it.value.refreshIfNeeded() }
}

old != null -> {
cache[old._id]?.virtualDelete()
}
Expand All @@ -225,47 +238,48 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
send(Query(socketCondition, limit = 0))
}
}?.let {
object: LivingSocket<T> {
object : LivingSocket<T> {
override fun start(): () -> Unit = it.start()
override val connected: Readable<Boolean> get() = it.connected
override fun send(condition: Condition<T>) = it.send(Query(condition, limit = 0))
}
}
}
private var socketCondition: Condition<T> = Condition.Never()
private var socketEnder: (()->Unit)? = null
private var socketEnder: (() -> Unit)? = null
suspend fun updateSocket(condition: Condition<T>) {
socketCondition = condition
val socket = currentSocket.await() ?: return
if(socket.connected.await()) {
if (socket.connected.await()) {
socket.send(condition)
}
if(condition is Condition.Never) {
if(socketEnder != null) {
if (condition is Condition.Never) {
if (socketEnder != null) {
socketEnder?.invoke()
socketEnder = null
}
} else {
if(socketEnder == null) {
if (socketEnder == null) {
socketEnder = socket.start()
}
}
}

var listeningDirty = false

override fun get(id: ID): WritableModel<T> = cache.getOrPut(id) { WritableModelImpl(id) }

override suspend fun watch(id: ID): WritableModel<T> {
val original = cache.getOrPut(id) { WritableModelImpl(id) }
return object: WritableModel<T> by original {
return object : WritableModel<T> by original {
override fun addListener(listener: () -> Unit): () -> Unit {
val o = original.addListener(listener)
var once = true
if(original.live++ == 0) listeningDirty = true
if (original.live++ == 0) listeningDirty = true
return {
if(once) {
if (once) {
once = false
if(--original.live == 0) listeningDirty = true
if (--original.live == 0) listeningDirty = true
}
o()
}
Expand All @@ -281,15 +295,15 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
val original = queries.getOrPut(query) {
ListImpl(query)
}
return object: Readable<List<T>> by original {
return object : Readable<List<T>> by original {
override fun addListener(listener: () -> Unit): () -> Unit {
val o = original.addListener(listener)
var once = true
if(original.live++ == 0) listeningDirty = true
if (original.live++ == 0) listeningDirty = true
return {
if(once) {
if (once) {
once = false
if(--original.live == 0) listeningDirty = true
if (--original.live == 0) listeningDirty = true
}
o()
}
Expand Down Expand Up @@ -343,14 +357,14 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
}

suspend fun regularly() {
if(listeningDirty) {
if (listeningDirty) {
listeningDirty = false
val subConditions = queries.values.mapNotNull { if(it.live == 0) null else it.query.condition } +
cache.values.mapNotNull { if(it.live == 0) null else it.id }
val subConditions = queries.values.mapNotNull { if (it.live == 0) null else it.query.condition } +
cache.values.mapNotNull { if (it.live == 0) null else it.id }
.takeUnless { it.isEmpty() }
?.let { Condition.OnField(idProp, Condition.Inside(it)) }
.let(::listOfNotNull)
updateSocket(if(subConditions.isEmpty()) Condition.Never() else Condition.Or(subConditions))
updateSocket(if (subConditions.isEmpty()) Condition.Never() else Condition.Or(subConditions))
}
for (query in queries.values.toList()) {
if (query.inUse && !query.upToDate) {
Expand All @@ -362,22 +376,27 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
}
}
val needsUpdates = cache.values.toList().asSequence().filter { it.inUse && !it.upToDate }.map { it.id }.toSet()
if(needsUpdates.isNotEmpty()) {
val results = skipCache.query(
Query(
DataClassPathSelf(serializer).get(idProp).inside(needsUpdates),
limit = 1000
)
)
needsUpdates.forEach { id ->
if (needsUpdates.isNotEmpty()) {
val limit = 1000
val results = needsUpdates
.chunked(limit)
.flatMap {
skipCache.query(
Query(
DataClassPathSelf(serializer).get(idProp).inside(needsUpdates),
limit = limit
)
)
}
needsUpdates.forEach { id ->
cache.getOrPut(id) { WritableModelImpl(id) }.value = results.find { it._id == id }
}
}
}

init {
launchGlobal {
while(true) {
while (true) {
delay(200)
regularly()
}
Expand Down
Expand Up @@ -38,6 +38,7 @@ interface WritableModel<T> : Writable<T?> {
val serializer: KSerializer<T>
suspend fun modify(modification: Modification<T>): T?
suspend fun delete(): Unit
suspend fun invalidate(): Unit
}

interface ModelCollection<T : HasId<ID>, ID : Comparable<ID>> {
Expand Down

0 comments on commit 5e668ea

Please sign in to comment.