Skip to content

Commit 8682730

Browse files
authored
Add RSocket support, again (#297)
1 parent 627062c commit 8682730

File tree

12 files changed

+342
-76
lines changed

12 files changed

+342
-76
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
- Updated user agent string formats to allow viewing version distributions in the new PowerSync dashboard.
66
- Sync options: `newClientImplementation` is now the default.
77
- Make `androidx.sqlite:sqlite-bundled` an API dependency of `:core` to avoid toolchain warnings.
8+
- On Apple platforms, use a websocket protocol as a workaround to clients not supporting backpressure in HTTP response
9+
streams.
810

911
## 1.8.1
1012

common/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ kotlin {
165165
implementation(libs.kotlinx.datetime)
166166
implementation(libs.stately.concurrency)
167167
implementation(libs.configuration.annotations)
168+
implementation(libs.rsocket.core)
169+
implementation(libs.rsocket.transport.websocket)
168170
api(libs.ktor.client.core)
169171
api(libs.kermit)
170172
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.powersync.internal
2+
3+
@RequiresOptIn(message = "This API is internal to PowerSync and should never be used outside of the SDK.")
4+
@Retention(AnnotationRetention.BINARY)
5+
@Target(
6+
AnnotationTarget.CLASS,
7+
AnnotationTarget.FUNCTION,
8+
AnnotationTarget.CONSTRUCTOR,
9+
AnnotationTarget.PROPERTY,
10+
)
11+
public annotation class InternalPowerSyncAPI
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.powersync.internal
2+
3+
import io.ktor.client.engine.HttpClientEngine
4+
import io.ktor.client.engine.HttpClientEngineConfig
5+
import kotlin.concurrent.atomics.AtomicReference
6+
import kotlin.concurrent.atomics.ExperimentalAtomicApi
7+
8+
/**
9+
* A hook installed by the `:core` and `:internal:PowerSyncKotlin` projects.
10+
*
11+
* The hook is responsible for determining whether a given [HttpClientEngine] (expressed through
12+
* [HttpClientEngineConfig] because the former is not always public) is known not to support backpressure.
13+
* In particular, this is the case for the `Darwin` HTTP engine.
14+
*
15+
* When an engine is marked to not support backpressure handling, we will use a custom protocol with explicit
16+
* flow control instead of relying on HTTP response streams.
17+
*/
18+
@OptIn(ExperimentalAtomicApi::class)
19+
@InternalPowerSyncAPI
20+
public val httpClientIsKnownToNotSupportBackpressure: AtomicReference<((HttpClientEngineConfig) -> Boolean)?> = AtomicReference(null)
21+
22+
@OptIn(ExperimentalAtomicApi::class, InternalPowerSyncAPI::class)
23+
internal val HttpClientEngineConfig.isKnownToNotSupportBackpressure: Boolean
24+
get() {
25+
val check = httpClientIsKnownToNotSupportBackpressure.load() ?: return false
26+
return check(this)
27+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.powersync.sync
2+
3+
import com.powersync.ExperimentalPowerSyncAPI
4+
import com.powersync.internal.isKnownToNotSupportBackpressure
5+
import com.powersync.sync.StreamingSyncClient.Companion.SOCKET_TIMEOUT
6+
import io.ktor.client.HttpClient
7+
import io.ktor.client.HttpClientConfig
8+
import io.ktor.client.plugins.DefaultRequest
9+
import io.ktor.client.plugins.HttpClientPlugin
10+
import io.ktor.client.plugins.HttpTimeout
11+
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
12+
import io.ktor.client.plugins.websocket.WebSockets
13+
import io.ktor.client.request.headers
14+
import io.ktor.util.AttributeKey
15+
import kotlin.experimental.ExperimentalObjCRefinement
16+
import kotlin.native.HiddenFromObjC
17+
18+
/**
19+
* This API is experimental and may change in future releases.
20+
*
21+
* Configures a [HttpClient] for PowerSync sync operations.
22+
* Configures required plugins and default request headers.
23+
*
24+
* This is currently only necessary when using a [SyncClientConfiguration.ExistingClient] for PowerSync
25+
* network requests.
26+
*
27+
* Example usage:
28+
*
29+
* ```kotlin
30+
* val client = HttpClient() {
31+
* configureSyncHttpClient()
32+
* // Your own config here
33+
* }
34+
* ```
35+
*/
36+
@OptIn(ExperimentalObjCRefinement::class)
37+
@HiddenFromObjC
38+
@ExperimentalPowerSyncAPI
39+
public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userAgent()) {
40+
install(HttpTimeout) {
41+
socketTimeoutMillis = SOCKET_TIMEOUT
42+
}
43+
install(ContentNegotiation)
44+
install(WebSocketIfNecessaryPlugin)
45+
46+
install(DefaultRequest) {
47+
headers {
48+
append("User-Agent", userAgent)
49+
}
50+
}
51+
}
52+
53+
/**
54+
* A client plugin that installs WebSocket support and configures it only if the HTTP client implementation is known not
55+
* to support backpressure properly (since that is the only case in which we need RSocket over WebSockets).
56+
*/
57+
internal object WebSocketIfNecessaryPlugin : HttpClientPlugin<Unit, WebSockets> {
58+
override val key: AttributeKey<WebSockets>
59+
get() = WebSockets.key
60+
61+
val needsRSocketKey = AttributeKey<Boolean>("NeedsRSocketSupport")
62+
63+
override fun prepare(block: Unit.() -> Unit): WebSockets =
64+
WebSockets.prepare {
65+
// RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized
66+
// transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size
67+
maxFrameSize = 16_777_215 + 1024 // + some extra, you never know
68+
}
69+
70+
override fun install(
71+
plugin: WebSockets,
72+
scope: HttpClient,
73+
) {
74+
if (scope.engineConfig.isKnownToNotSupportBackpressure) {
75+
WebSockets.install(plugin, scope)
76+
scope.attributes.put(needsRSocketKey, true)
77+
} else {
78+
scope.attributes.put(needsRSocketKey, false)
79+
}
80+
}
81+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package com.powersync.sync
2+
3+
import com.powersync.ExperimentalPowerSyncAPI
4+
import com.powersync.bucket.PowerSyncControlArguments
5+
import com.powersync.connectors.PowerSyncCredentials
6+
import com.powersync.utils.JsonUtil
7+
import io.ktor.client.HttpClient
8+
import io.ktor.client.plugins.websocket.webSocketSession
9+
import io.ktor.http.URLBuilder
10+
import io.ktor.http.URLProtocol
11+
import io.ktor.http.takeFrom
12+
import io.rsocket.kotlin.core.RSocketConnector
13+
import io.rsocket.kotlin.keepalive.KeepAlive
14+
import io.rsocket.kotlin.payload.PayloadMimeType
15+
import io.rsocket.kotlin.payload.buildPayload
16+
import io.rsocket.kotlin.payload.data
17+
import io.rsocket.kotlin.payload.metadata
18+
import io.rsocket.kotlin.transport.RSocketClientTarget
19+
import io.rsocket.kotlin.transport.RSocketConnection
20+
import io.rsocket.kotlin.transport.RSocketTransportApi
21+
import io.rsocket.kotlin.transport.ktor.websocket.internal.KtorWebSocketConnection
22+
import kotlinx.coroutines.Dispatchers
23+
import kotlinx.coroutines.IO
24+
import kotlinx.coroutines.currentCoroutineContext
25+
import kotlinx.coroutines.flow.Flow
26+
import kotlinx.coroutines.flow.emitAll
27+
import kotlinx.coroutines.flow.flow
28+
import kotlinx.coroutines.flow.flowOn
29+
import kotlinx.coroutines.flow.map
30+
import kotlinx.io.readByteArray
31+
import kotlinx.serialization.SerialName
32+
import kotlinx.serialization.Serializable
33+
import kotlinx.serialization.json.JsonElement
34+
import kotlin.coroutines.CoroutineContext
35+
import kotlin.time.Duration.Companion.seconds
36+
37+
/**
38+
* Connects to the RSocket endpoint for receiving sync lines.
39+
*
40+
* Note that we reconstruct the transport layer for RSocket by opening a WebSocket connection
41+
* manually instead of using the high-level RSocket Ktor integration.
42+
* The reason is that every request to the sync service needs its own metadata and data payload
43+
* (e.g. to transmit the token), but the Ktor integration only supports setting a single payload for
44+
* the entire client.
45+
*/
46+
@OptIn(RSocketTransportApi::class, ExperimentalPowerSyncAPI::class)
47+
internal fun HttpClient.rSocketSyncStream(
48+
userAgent: String,
49+
req: JsonElement,
50+
credentials: PowerSyncCredentials,
51+
): Flow<PowerSyncControlArguments> =
52+
flow {
53+
val flowContext = currentCoroutineContext()
54+
55+
val websocketUri =
56+
URLBuilder(credentials.endpointUri("sync/stream")).apply {
57+
protocol =
58+
when (protocolOrNull) {
59+
URLProtocol.HTTP -> URLProtocol.WS
60+
else -> URLProtocol.WSS
61+
}
62+
}
63+
64+
// Note: We're using a custom connector here because we need to set options for each request
65+
// without creating a new HTTP client each time. The recommended approach would be to add an
66+
// RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for
67+
// all connections (bad because we need a short-lived token in there).
68+
// https://github.com/rsocket/rsocket-kotlin/issues/311
69+
val target =
70+
object : RSocketClientTarget {
71+
@RSocketTransportApi
72+
override suspend fun connectClient(): RSocketConnection {
73+
val ws =
74+
webSocketSession {
75+
url.takeFrom(websocketUri)
76+
}
77+
return KtorWebSocketConnection(ws)
78+
}
79+
80+
override val coroutineContext: CoroutineContext
81+
get() = flowContext
82+
}
83+
84+
val connector =
85+
RSocketConnector {
86+
connectionConfig {
87+
payloadMimeType =
88+
PayloadMimeType(
89+
metadata = "application/json",
90+
data = "application/json",
91+
)
92+
93+
setupPayload {
94+
buildPayload {
95+
data("{}")
96+
metadata(
97+
JsonUtil.json.encodeToString(
98+
ConnectionSetupMetadata(
99+
token = "Bearer ${credentials.token}",
100+
userAgent = userAgent,
101+
),
102+
),
103+
)
104+
}
105+
}
106+
107+
keepAlive = KeepAlive(interval = 20.0.seconds, maxLifetime = 30.0.seconds)
108+
}
109+
}
110+
111+
val rSocket = connector.connect(target)
112+
emit(PowerSyncControlArguments.ConnectionEstablished)
113+
val syncStream =
114+
rSocket.requestStream(
115+
buildPayload {
116+
data(JsonUtil.json.encodeToString(req))
117+
metadata(JsonUtil.json.encodeToString(RequestStreamMetadata("/sync/stream")))
118+
},
119+
)
120+
121+
emitAll(
122+
syncStream
123+
.map {
124+
PowerSyncControlArguments.BinaryLine(it.data.readByteArray())
125+
}.flowOn(Dispatchers.IO),
126+
)
127+
emit(PowerSyncControlArguments.ResponseStreamEnd)
128+
}
129+
130+
/**
131+
* The metadata payload we need to use when connecting with RSocket.
132+
*
133+
* This corresponds to `RSocketContextMeta` on the sync service.
134+
*/
135+
@Serializable
136+
private class ConnectionSetupMetadata(
137+
val token: String,
138+
@SerialName("user_agent")
139+
val userAgent: String,
140+
)
141+
142+
/**
143+
* The metadata payload we send for the `REQUEST_STREAM` frame.
144+
*/
145+
@Serializable
146+
private class RequestStreamMetadata(
147+
val path: String,
148+
)

0 commit comments

Comments
 (0)