diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PostgrestExtensions.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PostgrestExtensions.kt index d58622f12..b1e422470 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PostgrestExtensions.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PostgrestExtensions.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onCompletion +import kotlin.jvm.JvmName import kotlin.reflect.KProperty1 /** @@ -51,7 +52,11 @@ inline fun PostgrestQueryBuilder.selectSingleValueAs primaryKey: KProperty1, channelName: String? = null, crossinline filter: PostgrestFilterBuilder.() -> Unit -): Flow = selectSingleValueAsFlow(PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() }, channelName, filter) +): Flow = selectSingleValueAsFlow( + PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() }, + channelName, + filter +) /** * Executes vertical filtering with select on [PostgrestQueryBuilder.table] and [PostgrestQueryBuilder.schema] and returns a [Flow] of a list of values matching the [filter]. @@ -65,6 +70,20 @@ inline fun PostgrestQueryBuilder.selectAsFlow( primaryKey: PrimaryKey, channelName: String? = null, filter: FilterOperation? = null, +): Flow> = selectAsFlow(listOf(primaryKey), channelName, filter) + +/** + * Executes vertical filtering with select on [PostgrestQueryBuilder.table] and [PostgrestQueryBuilder.schema] and returns a [Flow] of a list of values matching the [filter]. + * This function listens for changes in the table and emits the new list whenever a change occurs. + * @param primaryKeys the list of primary key of the [Data] type + * @param filter the filter to apply to the select query + * @param channelName the name of the channel to use for the realtime updates. If null, a channel name following the format "schema:table:id" will be used + */ +@SupabaseExperimental +inline fun PostgrestQueryBuilder.selectAsFlow( + primaryKeys: List>, + channelName: String? = null, + filter: FilterOperation? = null, ): Flow> { val realtime = postgrest.supabaseClient.realtime as RealtimeImpl val channel = realtime.channel(channelName ?: defaultChannelName(schema, table, realtime)) @@ -72,7 +91,7 @@ inline fun PostgrestQueryBuilder.selectAsFlow( val dataFlow = channel.postgresListDataFlow( schema = this@selectAsFlow.schema, table = this@selectAsFlow.table, - primaryKey = primaryKey, + primaryKeys = primaryKeys, filter = filter ) channel.subscribe() @@ -94,7 +113,28 @@ inline fun PostgrestQueryBuilder.selectAsFlow( primaryKey: KProperty1, channelName: String? = null, filter: FilterOperation? = null, -): Flow> = selectAsFlow(PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() }, channelName, filter) +): Flow> = + selectAsFlow(listOf(primaryKey), channelName, filter) + +/** + * Executes vertical filtering with select on [PostgrestQueryBuilder.table] and [PostgrestQueryBuilder.schema] and returns a [Flow] of a list of values matching the [filter]. + * This function listens for changes in the table and emits the new list whenever a change occurs. + * @param primaryKeys the list of primary key of the [Data] type + * @param filter the filter to apply to the select query + * @param channelName the name of the channel to use for the realtime updates. If null, a channel name following the format "schema:table:id" will be used + */ +@SupabaseExperimental +@JvmName("selectAsFlowMultiplePks") +inline fun PostgrestQueryBuilder.selectAsFlow( + primaryKeys: List>, + channelName: String? = null, + filter: FilterOperation? = null, +): Flow> = + selectAsFlow(primaryKeys.map { primaryKey -> + PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() } + }, channelName, filter) + @PublishedApi -internal fun defaultChannelName(schema: String, table: String, realtimeImpl: RealtimeImpl) = "$schema:$table:${realtimeImpl.nextIncrementId()}" \ No newline at end of file +internal fun defaultChannelName(schema: String, table: String, realtimeImpl: RealtimeImpl) = + "$schema:$table:${realtimeImpl.nextIncrementId()}" \ No newline at end of file diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeExt.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeExt.kt index fef95f834..fdb7ce7a5 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeExt.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeExt.kt @@ -1,4 +1,5 @@ @file:Suppress("MatchingDeclarationName") + package io.github.jan.supabase.realtime import io.github.jan.supabase.collections.AtomicMutableMap @@ -12,6 +13,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.map import kotlinx.serialization.json.jsonPrimitive +import kotlin.jvm.JvmName import kotlin.reflect.KProperty1 /** @@ -21,6 +23,14 @@ import kotlin.reflect.KProperty1 */ data class PrimaryKey(val columnName: String, val producer: (Data) -> String) +@PublishedApi +internal fun List>.producer(data: Data): String = + fold("") { value, pk -> value + pk.producer(data) } + +@PublishedApi +internal val List>.columnName: String + get() = fold("") { value, pk -> value + pk.columnName } + /** * Listens for presence changes and caches the presences based on their keys. This function automatically handles joins and leaves. * @@ -56,6 +66,23 @@ inline fun RealtimeChannel.postgresListDataFlow( table: String, filter: FilterOperation? = null, primaryKey: PrimaryKey +): Flow> = postgresListDataFlow(schema, table, filter, listOf(primaryKey)) + +/** + * This function retrieves the initial data from the table and then listens for changes. It automatically handles inserts, updates and deletes. + * + * If you want more control, use the [postgresChangeFlow] function. + * @param schema the schema of the table + * @param table the table name + * @param filter an optional filter to filter the data + * @param primaryKeys the list of primary key of the [Data] type + * @return a [Flow] of the current data in a list. This list is updated and emitted whenever a change occurs. + */ +inline fun RealtimeChannel.postgresListDataFlow( + schema: String = "public", + table: String, + filter: FilterOperation? = null, + primaryKeys: List> ): Flow> { val cache = AtomicMutableMap() val changeFlow = postgresChangeFlow(schema) { @@ -75,7 +102,7 @@ inline fun RealtimeChannel.postgresListDataFlow( } val data = result.decodeList() data.forEach { - val key = primaryKey.producer(it) + val key = primaryKeys.producer(it) cache[key] = it } data @@ -87,20 +114,24 @@ inline fun RealtimeChannel.postgresListDataFlow( when (it) { is PostgresAction.Insert -> { val data = it.decodeRecord() - val key = primaryKey.producer(data) + val key = primaryKeys.producer(data) cache[key] = data } + is PostgresAction.Update -> { val data = it.decodeRecord() - val key = primaryKey.producer(data) + val key = primaryKeys.producer(data) cache[key] = data } + is PostgresAction.Delete -> { cache.remove( - it.oldRecord[primaryKey.columnName]?.jsonPrimitive?.content - ?: error("No primary key found") + primaryKeys.map { key -> + it.oldRecord[key.columnName]?.jsonPrimitive?.content + }.joinToString { "" } ) } + else -> {} } trySend(cache.values.toList()) @@ -123,14 +154,30 @@ inline fun RealtimeChannel.postgresListDataFlow( table: String, filter: FilterOperation? = null, primaryKey: KProperty1, +): Flow> = postgresListDataFlow(schema, table, filter, listOf(primaryKey)) + +/** + * This function retrieves the initial data from the table and then listens for changes. It automatically handles inserts, updates and deletes. + * + * If you want more control, use the [postgresChangeFlow] function. + * @param schema the schema of the table + * @param table the table name + * @param filter an optional filter to filter the data + * @param primaryKeys the list of primary keys of the [Data] type + * @return a [Flow] of the current data in a list. This list is updated and emitted whenever a change occurs. + */ +@JvmName("postgresListDataFlowMultiplePks") +inline fun RealtimeChannel.postgresListDataFlow( + schema: String = "public", + table: String, + filter: FilterOperation? = null, + primaryKeys: List>, ): Flow> = postgresListDataFlow( filter = filter, table = table, schema = schema, - primaryKey = PrimaryKey( - primaryKey.name - ){ - primaryKey.get(it).toString() + primaryKeys = primaryKeys.map { primaryKey -> + PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() } } ) @@ -173,13 +220,16 @@ suspend inline fun RealtimeChannel.postgresSingleDataFlow( val data = it.decodeRecord() trySend(data) } + is PostgresAction.Update -> { val data = it.decodeRecord() trySend(data) } + is PostgresAction.Delete -> { close() } + else -> {} } }