Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlin.jvm.JvmName
import kotlin.reflect.KProperty1

/**
Expand Down Expand Up @@ -51,7 +52,11 @@ inline fun <reified Data : Any, Value> PostgrestQueryBuilder.selectSingleValueAs
primaryKey: KProperty1<Data, Value>,
channelName: String? = null,
crossinline filter: PostgrestFilterBuilder.() -> Unit
): Flow<Data> = selectSingleValueAsFlow(PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() }, channelName, filter)
): Flow<Data> = selectSingleValueAsFlow(
PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() },
channelName,
filter
)

/**
* Executes vertical filtering with select on [PostgrestQueryBuilder.table] and [PostgrestQueryBuilder.schema] and returns a [Flow] of a list of values matching the [filter].
Expand All @@ -65,14 +70,28 @@ inline fun <reified Data : Any> PostgrestQueryBuilder.selectAsFlow(
primaryKey: PrimaryKey<Data>,
channelName: String? = null,
filter: FilterOperation? = null,
): Flow<List<Data>> = selectAsFlow(listOf(primaryKey), channelName, filter)

/**
* Executes vertical filtering with select on [PostgrestQueryBuilder.table] and [PostgrestQueryBuilder.schema] and returns a [Flow] of a list of values matching the [filter].
* This function listens for changes in the table and emits the new list whenever a change occurs.
* @param primaryKeys the list of primary key of the [Data] type
* @param filter the filter to apply to the select query
* @param channelName the name of the channel to use for the realtime updates. If null, a channel name following the format "schema:table:id" will be used
*/
@SupabaseExperimental
inline fun <reified Data : Any> PostgrestQueryBuilder.selectAsFlow(
primaryKeys: List<PrimaryKey<Data>>,
channelName: String? = null,
filter: FilterOperation? = null,
): Flow<List<Data>> {
val realtime = postgrest.supabaseClient.realtime as RealtimeImpl
val channel = realtime.channel(channelName ?: defaultChannelName(schema, table, realtime))
return flow {
val dataFlow = channel.postgresListDataFlow(
schema = this@selectAsFlow.schema,
table = this@selectAsFlow.table,
primaryKey = primaryKey,
primaryKeys = primaryKeys,
filter = filter
)
channel.subscribe()
Expand All @@ -94,7 +113,28 @@ inline fun <reified Data : Any, Value> PostgrestQueryBuilder.selectAsFlow(
primaryKey: KProperty1<Data, Value>,
channelName: String? = null,
filter: FilterOperation? = null,
): Flow<List<Data>> = selectAsFlow(PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() }, channelName, filter)
): Flow<List<Data>> =
selectAsFlow(listOf(primaryKey), channelName, filter)

/**
* Executes vertical filtering with select on [PostgrestQueryBuilder.table] and [PostgrestQueryBuilder.schema] and returns a [Flow] of a list of values matching the [filter].
* This function listens for changes in the table and emits the new list whenever a change occurs.
* @param primaryKeys the list of primary key of the [Data] type
* @param filter the filter to apply to the select query
* @param channelName the name of the channel to use for the realtime updates. If null, a channel name following the format "schema:table:id" will be used
*/
@SupabaseExperimental
@JvmName("selectAsFlowMultiplePks")
inline fun <reified Data : Any, Value> PostgrestQueryBuilder.selectAsFlow(
primaryKeys: List<KProperty1<Data, Value>>,
channelName: String? = null,
filter: FilterOperation? = null,
): Flow<List<Data>> =
selectAsFlow(primaryKeys.map { primaryKey ->
PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() }
}, channelName, filter)


@PublishedApi
internal fun defaultChannelName(schema: String, table: String, realtimeImpl: RealtimeImpl) = "$schema:$table:${realtimeImpl.nextIncrementId()}"
internal fun defaultChannelName(schema: String, table: String, realtimeImpl: RealtimeImpl) =
"$schema:$table:${realtimeImpl.nextIncrementId()}"
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
@file:Suppress("MatchingDeclarationName")

package io.github.jan.supabase.realtime

import io.github.jan.supabase.collections.AtomicMutableMap
Expand All @@ -12,6 +13,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.map
import kotlinx.serialization.json.jsonPrimitive
import kotlin.jvm.JvmName
import kotlin.reflect.KProperty1

/**
Expand All @@ -21,6 +23,14 @@ import kotlin.reflect.KProperty1
*/
data class PrimaryKey<Data>(val columnName: String, val producer: (Data) -> String)

@PublishedApi
internal fun <Data> List<PrimaryKey<Data>>.producer(data: Data): String =
fold("") { value, pk -> value + pk.producer(data) }

@PublishedApi
internal val <Data> List<PrimaryKey<Data>>.columnName: String
get() = fold("") { value, pk -> value + pk.columnName }

/**
* Listens for presence changes and caches the presences based on their keys. This function automatically handles joins and leaves.
*
Expand Down Expand Up @@ -56,6 +66,23 @@ inline fun <reified Data : Any> RealtimeChannel.postgresListDataFlow(
table: String,
filter: FilterOperation? = null,
primaryKey: PrimaryKey<Data>
): Flow<List<Data>> = postgresListDataFlow(schema, table, filter, listOf(primaryKey))

/**
* This function retrieves the initial data from the table and then listens for changes. It automatically handles inserts, updates and deletes.
*
* If you want more control, use the [postgresChangeFlow] function.
* @param schema the schema of the table
* @param table the table name
* @param filter an optional filter to filter the data
* @param primaryKeys the list of primary key of the [Data] type
* @return a [Flow] of the current data in a list. This list is updated and emitted whenever a change occurs.
*/
inline fun <reified Data : Any> RealtimeChannel.postgresListDataFlow(
schema: String = "public",
table: String,
filter: FilterOperation? = null,
primaryKeys: List<PrimaryKey<Data>>
): Flow<List<Data>> {
val cache = AtomicMutableMap<String, Data>()
val changeFlow = postgresChangeFlow<PostgresAction>(schema) {
Expand All @@ -75,7 +102,7 @@ inline fun <reified Data : Any> RealtimeChannel.postgresListDataFlow(
}
val data = result.decodeList<Data>()
data.forEach {
val key = primaryKey.producer(it)
val key = primaryKeys.producer(it)
cache[key] = it
}
data
Expand All @@ -87,20 +114,24 @@ inline fun <reified Data : Any> RealtimeChannel.postgresListDataFlow(
when (it) {
is PostgresAction.Insert -> {
val data = it.decodeRecord<Data>()
val key = primaryKey.producer(data)
val key = primaryKeys.producer(data)
cache[key] = data
}

is PostgresAction.Update -> {
val data = it.decodeRecord<Data>()
val key = primaryKey.producer(data)
val key = primaryKeys.producer(data)
cache[key] = data
}

is PostgresAction.Delete -> {
cache.remove(
it.oldRecord[primaryKey.columnName]?.jsonPrimitive?.content
?: error("No primary key found")
primaryKeys.map { key ->
it.oldRecord[key.columnName]?.jsonPrimitive?.content
}.joinToString { "" }
)
}

else -> {}
}
trySend(cache.values.toList())
Expand All @@ -123,14 +154,30 @@ inline fun <reified Data : Any, Value> RealtimeChannel.postgresListDataFlow(
table: String,
filter: FilterOperation? = null,
primaryKey: KProperty1<Data, Value>,
): Flow<List<Data>> = postgresListDataFlow(schema, table, filter, listOf(primaryKey))

/**
* This function retrieves the initial data from the table and then listens for changes. It automatically handles inserts, updates and deletes.
*
* If you want more control, use the [postgresChangeFlow] function.
* @param schema the schema of the table
* @param table the table name
* @param filter an optional filter to filter the data
* @param primaryKeys the list of primary keys of the [Data] type
* @return a [Flow] of the current data in a list. This list is updated and emitted whenever a change occurs.
*/
@JvmName("postgresListDataFlowMultiplePks")
inline fun <reified Data : Any, Value> RealtimeChannel.postgresListDataFlow(
schema: String = "public",
table: String,
filter: FilterOperation? = null,
primaryKeys: List<KProperty1<Data, Value>>,
): Flow<List<Data>> = postgresListDataFlow<Data>(
filter = filter,
table = table,
schema = schema,
primaryKey = PrimaryKey(
primaryKey.name
){
primaryKey.get(it).toString()
primaryKeys = primaryKeys.map { primaryKey ->
PrimaryKey(primaryKey.name) { primaryKey.get(it).toString() }
}
)

Expand Down Expand Up @@ -173,13 +220,16 @@ suspend inline fun <reified Data : Any> RealtimeChannel.postgresSingleDataFlow(
val data = it.decodeRecord<Data>()
trySend(data)
}

is PostgresAction.Update -> {
val data = it.decodeRecord<Data>()
trySend(data)
}

is PostgresAction.Delete -> {
close()
}

else -> {}
}
}
Expand Down