diff --git a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/auth/CredentialsStore.kt b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/auth/CredentialsStore.kt index df3b119f..4b7bee7a 100644 --- a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/auth/CredentialsStore.kt +++ b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/auth/CredentialsStore.kt @@ -46,7 +46,7 @@ class CredentialsStore(private val context: Context) { fun getJwtInfoFlow() = context.dataStore.data.map { preferences -> val accountId = preferences[ACCOUNT_ID_KEY] ?: return@map null val jwt = preferences[JWT_KEY] ?: return@map null - val userName = preferences[USER_NAME_KEY] ?: return@map null + val userName = preferences[USER_NAME_KEY] ?: "" SavedCredentials(accountId, jwt, userName) }.distinctUntilChanged() } diff --git a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/sendpayment/SendPaymentViewModel.kt b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/sendpayment/SendPaymentViewModel.kt index e49e6510..9ea2b734 100644 --- a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/sendpayment/SendPaymentViewModel.kt +++ b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/sendpayment/SendPaymentViewModel.kt @@ -4,10 +4,12 @@ import android.util.Log import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope import com.lightspark.androidwalletdemo.util.CurrencyAmountArg +import com.lightspark.androidwalletdemo.util.currencyAmountSats import com.lightspark.androidwalletdemo.util.zeroCurrencyAmount import com.lightspark.androidwalletdemo.util.zeroCurrencyAmountArg import com.lightspark.androidwalletdemo.wallet.PaymentRepository import com.lightspark.sdk.core.Lce +import com.lightspark.sdk.wallet.model.TransactionStatus import dagger.hilt.android.lifecycle.HiltViewModel import javax.inject.Inject import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -42,7 +44,16 @@ class SendPaymentViewModel @Inject constructor( } }.filterNotNull().map { when (it) { - is Lce.Content -> PaymentStatus.SUCCESS + is Lce.Content -> { + when (it.data.status) { + TransactionStatus.PENDING -> PaymentStatus.PENDING + TransactionStatus.SUCCESS -> PaymentStatus.SUCCESS + else -> { + Log.e("SendPaymentViewModel", "Error sending payment") + PaymentStatus.FAILURE + } + } + } is Lce.Error -> { Log.e("SendPaymentViewModel", "Error sending payment", it.exception) PaymentStatus.FAILURE @@ -62,7 +73,9 @@ class SendPaymentViewModel @Inject constructor( .onEach { (it as? Lce.Content)?.let { invoiceData -> invoiceData.data?.amount?.let { decodedInvoiceAmount -> - paymentAmountFlow.tryEmit(decodedInvoiceAmount) + // Default to paying 10 sats for 0 amount invoices. + val amount = if (decodedInvoiceAmount.originalValue > 0) decodedInvoiceAmount else currencyAmountSats(10) + paymentAmountFlow.tryEmit(amount) } } } diff --git a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/PaymentRepository.kt b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/PaymentRepository.kt index 17f0d0c9..b403af16 100644 --- a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/PaymentRepository.kt +++ b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/PaymentRepository.kt @@ -1,6 +1,7 @@ package com.lightspark.androidwalletdemo.wallet import com.lightspark.androidwalletdemo.util.CurrencyAmountArg +import com.lightspark.sdk.core.asLce import com.lightspark.sdk.core.wrapWithLceFlow import com.lightspark.sdk.wallet.LightsparkCoroutinesWalletClient import com.lightspark.sdk.wallet.model.InvoiceType @@ -16,8 +17,8 @@ class PaymentRepository @Inject constructor(private val lightsparkClient: Lights lightsparkClient.createInvoice(amountMillis, memo, type) }.flowOn(Dispatchers.IO) - fun payInvoice(invoice: String) = - wrapWithLceFlow { lightsparkClient.payInvoice(invoice, 1000000) }.flowOn(Dispatchers.IO) + suspend fun payInvoice(invoice: String) = + lightsparkClient.payInvoiceAndAwaitCompletion(invoice, 1000000).asLce().flowOn(Dispatchers.IO) fun decodeInvoice(encodedInvoice: String) = wrapWithLceFlow { lightsparkClient.decodeInvoice(encodedInvoice) }.flowOn(Dispatchers.IO) diff --git a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/WalletRepository.kt b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/WalletRepository.kt index 20e965fd..1cd037a6 100644 --- a/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/WalletRepository.kt +++ b/androidwalletdemo/src/main/java/com/lightspark/androidwalletdemo/wallet/WalletRepository.kt @@ -6,6 +6,7 @@ import com.lightspark.sdk.core.Lce import com.lightspark.sdk.core.asLce import com.lightspark.sdk.core.auth.AuthProvider import com.lightspark.sdk.core.crypto.androidKeystoreContainsPrivateKeyForAlias +import com.lightspark.sdk.core.crypto.generateSigningKeyPair import com.lightspark.sdk.core.crypto.generateSigningKeyPairInAndroidKeyStore import com.lightspark.sdk.core.requester.ServerEnvironment import com.lightspark.sdk.core.wrapWithLceFlow @@ -51,8 +52,8 @@ class WalletRepository @Inject constructor( // able to export that key for the user, but it does come with increased security. If you'd like to manage your // own keys or store them in some other way in your own app code, you can still generate a valid key pair using // the [generateSigningKeyPair] function in the SDK. - val keyPair = generateSigningKeyPairInAndroidKeyStore(LIGHTSPARK_SIGNING_KEY_ALIAS) - walletClient.loadWalletSigningKeyAlias(LIGHTSPARK_SIGNING_KEY_ALIAS) + val keyPair = generateSigningKeyPair() + walletClient.loadWalletSigningKey(keyPair.private.encoded) return walletClient.initializeWalletAndWaitForInitialized( keyType = KeyType.RSA_OAEP, signingPublicKey = Base64.encodeToString(keyPair.public.encoded, Base64.NO_WRAP), diff --git a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/GraphQLWebsocketProtocol.kt b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/GraphQLWebsocketProtocol.kt new file mode 100644 index 00000000..2125f7ff --- /dev/null +++ b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/GraphQLWebsocketProtocol.kt @@ -0,0 +1,185 @@ +package com.lightspark.sdk.core.requester + +import com.lightspark.sdk.core.LightsparkErrorCode +import com.lightspark.sdk.core.LightsparkException +import io.ktor.client.plugins.websocket.ClientWebSocketSession +import io.ktor.websocket.close +import io.ktor.websocket.send +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.withTimeout +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive + +const val CONNECTION_INIT_TIMEOUT = 10_000L + +/** + * An implementation of https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md for use with a Ktor client. + * It can carry queries in addition to subscriptions over the websocket + */ +internal class GraphQLWebsocketProtocol( + private val webSocketSession: ClientWebSocketSession, + private val listener: GraphQLWebsocketListener, + private val jsonSerialFormat: Json, + private val connectionPayload: suspend () -> JsonObject? = { null }, +) { + suspend fun connectionInit() { + val payload = connectionPayload() + + sendMessage { + add("type", "connection_init") + payload?.let { add("payload", it) } + } + waitForConnectionAck() + } + + suspend fun run() { + // TODO(Jeremy): Consider adding client-side ping. + webSocketSession.incoming + .receiveAsFlow() + .catch { cause -> + listener.onError(cause) + } + .map { + jsonSerialFormat.decodeFromString(JsonObject.serializer(), it.data.decodeToString()) + }.collect { + handleServerMessage(it) + } + } + + suspend fun sendQuery(id: String, query: Query) { + val operationNameRegex = + Regex("^\\s*(query|mutation|subscription)\\s+(\\w+)", RegexOption.IGNORE_CASE) + val operationMatch = operationNameRegex.find(query.queryPayload) + if (operationMatch == null || operationMatch.groupValues.size < 3) { + throw LightsparkException("Invalid query payload", LightsparkErrorCode.INVALID_QUERY) + } + val operation = operationMatch.groupValues[2] + // TODO(Jeremy): Handle the signing node ID. + sendMessage { + add("id", id) + add("type", "subscribe") + val payload = buildJsonObject(jsonSerialFormat) { + add("query", query.queryPayload) + add("variables", buildJsonObject(jsonSerialFormat, query.variableBuilder)) + add("operationName", operation) + } + add("payload", payload) + } + } + + suspend fun stopQuery(id: String) { + sendMessage { + add("id", id) + add("type", "complete") + } + } + + suspend fun close() { + webSocketSession.close() + } + + private suspend fun waitForConnectionAck() { + withTimeout(CONNECTION_INIT_TIMEOUT) { + while (true) { + val received = webSocketSession.incoming.receive() + try { + val receivedText = received.data.decodeToString() + val receivedJson = jsonSerialFormat.decodeFromString( + receivedText, + ) + when (val type = receivedJson["type"]?.jsonPrimitive?.content) { + "connection_ack" -> return@withTimeout + "ping" -> sendPong() + else -> { + listener.onError(Exception("Unexpected message type: $type")) + return@withTimeout + } + } + } catch (e: Exception) { + listener.onError(e) + return@withTimeout + } + } + } + } + + private suspend fun handleServerMessage(messageJson: JsonObject) { + when (messageJson["type"]?.jsonPrimitive?.content) { + "next" -> { + val payload = messageJson["payload"]?.jsonObject + val id = messageJson["id"]?.jsonPrimitive?.content + if (id != null && payload != null) { + listener.onOperationMessage(id, payload) + } + } + + "error" -> { + val payload = messageJson["payload"]?.jsonObject + val id = messageJson["id"]?.jsonPrimitive?.content + val errors = payload?.get("errors")?.jsonObject + if (id != null && errors != null) { + listener.onOperationError(id, errors) + } + } + + "complete" -> { + messageJson["id"]?.jsonPrimitive?.content?.let { + listener.operationComplete(it) + } + } + + "ping" -> sendPong() + "pong" -> Unit // Nothing to do, the server acknowledged one of our pings + else -> Unit // Unknown message + } + } + + private suspend fun sendPong() { + sendMessage { + add("type", "pong") + } + } + + private suspend fun sendMessage(payloadBuilder: JsonObjectBuilder.() -> Unit) { + val jsonObjectBuilder = buildJsonObject(jsonSerialFormat, payloadBuilder) + webSocketSession.send(jsonObjectBuilder.toString()) + } +} + +internal interface GraphQLWebsocketListener { + fun onOperationMessage(id: String, payload: JsonObject) + fun onOperationError(id: String, payload: JsonObject) + fun operationComplete(id: String) + fun onError(error: Throwable) + fun onClose(code: Int, reason: String) +} + +@Suppress("unused") +enum class CloseCode(val code: Int) { + NormalClosure(1000), + GoingAway(1001), + AbnormalClosure(1006), + NoStatusReceived(1005), + ServiceRestart(1012), + TryAgainLater(1013), + BadGateway(1013), + + InternalServerError(4500), + InternalClientError(4005), + BadRequest(4400), + BadResponse(4004), + Unauthorized(4401), + Forbidden(4403), + SubprotocolNotAcceptable(4406), + ConnectionInitialisationTimeout(4408), + ConnectionAcknowledgementTimeout(4504), + SubscriberAlreadyExists(4409), + TooManyInitialisationRequests(4429), + + Terminated(4499), +} diff --git a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/VariableBuilder.kt b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/JsonObjectBuilder.kt similarity index 65% rename from core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/VariableBuilder.kt rename to core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/JsonObjectBuilder.kt index d151c51f..74a9836f 100644 --- a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/VariableBuilder.kt +++ b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/JsonObjectBuilder.kt @@ -6,7 +6,7 @@ import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.JsonPrimitive import kotlinx.serialization.json.encodeToJsonElement -class VariableBuilder(val jsonSerialFormat: Json) { +class JsonObjectBuilder(val jsonSerialFormat: Json) { val variables = mutableMapOf() fun add(name: String, value: JsonElement) { @@ -38,8 +38,16 @@ class VariableBuilder(val jsonSerialFormat: Json) { } } -fun variables(jsonSerialFormat: Json, builder: VariableBuilder.() -> Unit): JsonObject { - val variableBuilder = VariableBuilder(jsonSerialFormat) - variableBuilder.builder() - return variableBuilder.build() +fun buildJsonObject(jsonSerialFormat: Json, builder: JsonObjectBuilder.() -> Unit): JsonObject { + val jsonObjectBuilder = JsonObjectBuilder(jsonSerialFormat) + jsonObjectBuilder.builder() + return jsonObjectBuilder.build() +} + +fun Map.toJsonObject(jsonSerialFormat: Json): JsonObject { + val jsonObjectBuilder = JsonObjectBuilder(jsonSerialFormat) + forEach { (key, value) -> + jsonObjectBuilder.add(key, value) + } + return jsonObjectBuilder.build() } diff --git a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Query.kt b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Query.kt index 13a941e8..0922bda3 100644 --- a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Query.kt +++ b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Query.kt @@ -1,5 +1,6 @@ package com.lightspark.sdk.core.requester +import java.util.UUID import kotlin.jvm.JvmOverloads import kotlinx.serialization.json.JsonObject @@ -9,10 +10,12 @@ interface StringDeserializer { data class Query( val queryPayload: String, - val variableBuilder: VariableBuilder.() -> Unit, + val variableBuilder: JsonObjectBuilder.() -> Unit, val signingNodeId: String? = null, val deserializer: (JsonObject) -> T, ) { + val id = UUID.randomUUID().toString() + /** * This constructor is for convenience when calling from Java rather than Kotlin. The primary constructor is * simpler to use from Kotlin if possible. diff --git a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Requester.kt b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Requester.kt index cfd04bba..29c04acb 100644 --- a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Requester.kt +++ b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/Requester.kt @@ -1,6 +1,7 @@ package com.lightspark.sdk.core.requester import com.chrynan.krypt.csprng.SecureRandom +import com.lightspark.sdk.core.Lce import com.lightspark.sdk.core.LightsparkCoreConfig import com.lightspark.sdk.core.LightsparkErrorCode import com.lightspark.sdk.core.LightsparkException @@ -12,6 +13,7 @@ import com.lightspark.sdk.core.crypto.signPayload import com.lightspark.sdk.core.crypto.signUsingAlias import com.lightspark.sdk.core.util.getPlatform import io.ktor.client.HttpClient +import io.ktor.client.plugins.websocket.WebSockets import io.ktor.client.request.headers import io.ktor.client.request.post import io.ktor.client.request.setBody @@ -22,6 +24,8 @@ import kotlin.collections.component2 import kotlin.collections.set import kotlin.coroutines.cancellation.CancellationException import kotlin.time.Duration.Companion.hours +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import kotlinx.datetime.Clock import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json @@ -43,7 +47,9 @@ class Requester constructor( private val schemaEndpoint: String, private val baseUrl: String = DEFAULT_BASE_URL, ) { - private val httpClient = HttpClient() + private val httpClient = HttpClient { + install(WebSockets) + } private val userAgent = "lightspark-kotlin-sdk/${LightsparkCoreConfig.VERSION} ${getPlatform().platformName}/${getPlatform().version}" private val defaultHeaders = mapOf( @@ -53,13 +59,14 @@ class Requester constructor( "X-Lightspark-SDK" to userAgent, ) private val secureRandom = SecureRandom() + private var websocketConnectionHandler: WebsocketConnectionHandler? = null @Throws(LightsparkException::class, CancellationException::class) suspend fun executeQuery(query: Query): T { val response = makeRawRequest( query.queryPayload, - variables(jsonSerialFormat, query.variableBuilder), + buildJsonObject(jsonSerialFormat, query.variableBuilder), query.signingNodeId, ) return query.deserializer(response) @@ -143,6 +150,51 @@ class Requester constructor( return responseData.jsonObject } + @Throws(LightsparkException::class, CancellationException::class) + inline fun executeAsSubscription(query: Query): Flow> { + return makeRawSubscription(query).map { response -> + try { + when (response) { + is Lce.Error -> throw response.exception ?: LightsparkException( + "Unknown error", + LightsparkErrorCode.UNKNOWN, + ) + + is Lce.Loading -> Lce.Loading + is Lce.Content -> { + val dataJson = response.data["data"] ?: throw LightsparkException( + "Invalid response", + LightsparkErrorCode.REQUEST_FAILED, + ) + Lce.Content(query.deserializer(dataJson.jsonObject)) + } + } + } catch (e: Exception) { + Lce.Error(e) + } + } + } + + @Throws(LightsparkException::class, CancellationException::class) + fun makeRawSubscription( + query: Query<*>, + ): Flow> { + if (websocketConnectionHandler == null) { + websocketConnectionHandler = WebsocketConnectionHandler( + httpClient, + "wss://$baseUrl/$schemaEndpoint", + jsonSerialFormat, + { + buildJsonObject(jsonSerialFormat) { + authProvider.withValidAuthToken { add("access_token", it) } + } + }, + defaultHeaders, + ) + } + return websocketConnectionHandler!!.execute(query) + } + private fun addSigningDataIfNeeded( bodyData: JsonObject, headers: Map, diff --git a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/WebsocketConnectionHandler.kt b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/WebsocketConnectionHandler.kt new file mode 100644 index 00000000..1ed703a5 --- /dev/null +++ b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/WebsocketConnectionHandler.kt @@ -0,0 +1,300 @@ +package com.lightspark.sdk.core.requester + +import com.lightspark.sdk.core.Lce +import com.lightspark.sdk.core.LightsparkErrorCode +import com.lightspark.sdk.core.LightsparkException +import io.ktor.client.HttpClient +import io.ktor.client.plugins.websocket.wss +import java.util.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onSubscription +import kotlinx.coroutines.flow.transformWhile +import kotlinx.coroutines.launch +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject + +/** + * Handles the connection to the GraphQL WebSocket endpoint. + * + * This class is responsible for: + * - Establishing the connection and managing its lifecycle + * - Dealing with the protocol handshake + * - Sending and receiving messages via the correct protocol + * - Reconnecting when the connection is lost + * - Dispatching events to the correct subscription flows + * + * It is based partially on Apollo's wonderful [WebSocketNetworkTransport](https://github.com/apollographql/apollo-kotlin/blob/main/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo3/network/ws/WebSocketNetworkTransport.kt), + * but has been modified to fit with the Lightspark SDKs. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class WebsocketConnectionHandler( + private val httpClient: HttpClient, + private val url: String, + private val jsonSerialFormat: Json, + private val connectionPayload: suspend () -> JsonObject? = { null }, + private val extraHeaders: Map = emptyMap(), + private val idleTimeoutMillis: Long = 60_000L, + private val reopenWhen: (suspend (Throwable, attempt: Long) -> Boolean)? = null, +) { + private val coroutineScope = CoroutineScope(Dispatchers.IO.limitedParallelism(1)) + + /** + * The message queue read by the supervisor. + * + * SubscriptionFlows write [Command]s + * The WebSocket coroutine writes [Event]s + * + * Use unlimited buffers so that we never have to suspend when writing a command or an event, + * and we avoid deadlocks. This might be overkill but is most likely never going to be a problem in practice. + */ + private val messages = Channel(Channel.UNLIMITED) + + /** + * The SharedFlow read by SubscriptionFlows + * + * The Supervisor coroutine writes [Event]s + */ + private val mutableEvents = MutableSharedFlow(0, Int.MAX_VALUE, BufferOverflow.SUSPEND) + private val events = mutableEvents.asSharedFlow() + + private val listener = object : GraphQLWebsocketListener { + override fun onOperationMessage(id: String, payload: JsonObject) { + messages.trySend(OperationResponse(id, payload)) + } + + override fun onOperationError(id: String, payload: JsonObject) { + messages.trySend(OperationError(id, payload)) + } + + override fun operationComplete(id: String) { + messages.trySend(OperationComplete(id)) + } + + override fun onError(error: Throwable) { + messages.trySend(NetworkError(error)) + } + + override fun onClose(code: Int, reason: String) { + messages.trySend(Dispose) + } + } + + init { + coroutineScope.launch { + supervise(this) + } + } + + private suspend fun supervise(scope: CoroutineScope) { + var idleJob: Job? = null + var connectionJob: Job? = null + var protocol: GraphQLWebsocketProtocol? = null + var reopenAttemptCount = 0L + val activeMessages = mutableMapOf>() + + /** + * This happens: + * - when this coroutine receives a [Dispose] message + * - when the idleJob completes + * - when there is an error reading the WebSocket and this coroutine receives a [NetworkError] message + */ + suspend fun closeProtocol() { + protocol?.close() + protocol = null + connectionJob?.cancel() + connectionJob = null + idleJob?.cancel() + idleJob = null + } + + supervisorLoop@ while (true) { + when (val message = messages.receive()) { + is Event -> { + if (message is NetworkError) { + closeProtocol() + + if (reopenWhen?.invoke(message.cause, reopenAttemptCount) == true) { + reopenAttemptCount++ + messages.send(RestartConnection) + } else { + reopenAttemptCount = 0L + // forward the NetworkError downstream. Active flows will throw + mutableEvents.tryEmit(message) + } + } else if (message is ConnectionReEstablished) { + reopenAttemptCount = 0L + activeMessages.values.forEach { + // Re-queue all start messages + // This will restart the websocket + messages.trySend(it) + } + } else { + reopenAttemptCount = 0L + mutableEvents.tryEmit(message) + } + } + + is Command -> { + if (message is Dispose) { + closeProtocol() + // Exit the loop and the coroutine scope + return + } + + if (protocol == null) { + if (message is StopOperation) { + // A stop was received, but we don't have a connection. Ignore it + activeMessages.remove(message.id) + continue + } + + try { + connectionJob = coroutineScope.launch { + httpClient.wss( + urlString = url, + request = { + extraHeaders.forEach { (key, value) -> + headers.append(key, value) + } + headers.append("Sec-WebSocket-Protocol", "graphql-transport-ws") + }, + ) { + try { + protocol = GraphQLWebsocketProtocol( + webSocketSession = this, + connectionPayload = connectionPayload, + listener = listener, + jsonSerialFormat = jsonSerialFormat, + ).apply { connectionInit() } + } catch (e: Exception) { + protocol = null + messages.send(NetworkError(e)) + return@wss + } + protocol?.run() + closeProtocol() + } + } + } catch (e: Exception) { + // Error opening the websocket + messages.send(NetworkError(e)) + continue + } + } + + // Note: This is a bit of a hack. We need to wait for the protocol to be initialized by the websocket + // coroutine before we can send messages to it. + val protocolWaitInterval = 50L + val protocolWaitTimeLimit = 10_000L + var protocolWaitCount = 0L + while (protocol == null) { + if (protocolWaitCount >= protocolWaitTimeLimit) { + messages.send(NetworkError(Exception("Timed out waiting for protocol to be initialized"))) + continue@supervisorLoop + } + delay(protocolWaitInterval) + protocolWaitCount += protocolWaitInterval + } + + when (message) { + is StartOperation<*> -> { + activeMessages[message.id] = message + protocol!!.sendQuery(message.id, message.query) + } + + is StopOperation -> { + activeMessages.remove(message.id) + protocol!!.stopQuery(message.id) + } + + is RestartConnection -> { + messages.send(ConnectionReEstablished()) + } + + else -> { + // Other cases have been handled above + } + } + + idleJob = if (activeMessages.isEmpty()) { + scope.launch { + delay(idleTimeoutMillis) + closeProtocol() + } + } else { + idleJob?.cancel() + null + } + } + } + } + } + + /** + * NOTE: Intentionally avoiding doing the deserialization in this function to avoid issues with reified types and + * inline functions. See https://kotlinlang.org/docs/inline-functions.html#restrictions-for-public-api-inline-functions. + */ + fun execute(query: Query<*>): Flow> { + return events.onSubscription { + messages.send(StartOperation(query.id, query)) + }.filter { + it.id == query.id || it.id == null + }.transformWhile { + when (it) { + is OperationComplete -> { + false + } + + is ConnectionReEstablished -> { + // means we are in the process of restarting the connection + false + } + + is NetworkError -> { + emit(it) + false + } + + else -> { + emit(it) + true + } + } + }.map { response -> + when (response) { + is OperationResponse -> { + val responsePayload = response.payload + Lce.Content(responsePayload) + } + + is OperationError -> Lce.Error( + LightsparkException("Request ${response.id} failed", LightsparkErrorCode.REQUEST_FAILED), + ) + is NetworkError -> Lce.Error( + LightsparkException( + "Network error while executing ${response.id}", + LightsparkErrorCode.REQUEST_FAILED, + response.cause, + ), + ) + + // Cannot happen as these events are filtered out upstream + is ConnectionReEstablished, is OperationComplete -> error("Unexpected event $response") + } + }.onCompletion { + messages.send(StopOperation(query.id)) + } + } +} diff --git a/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/WebsocketMessages.kt b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/WebsocketMessages.kt new file mode 100644 index 00000000..27e26d4d --- /dev/null +++ b/core/src/commonMain/kotlin/com/lightspark/sdk/core/requester/WebsocketMessages.kt @@ -0,0 +1,30 @@ +package com.lightspark.sdk.core.requester + +import kotlinx.serialization.json.JsonObject + +internal sealed interface Message + +internal sealed interface Command : Message +internal class StartOperation(val id: String, val query: Query) : Command +internal class StopOperation(val id: String) : Command +internal object RestartConnection : Command +internal object Dispose : Command + +internal sealed interface Event : Message { + /** + * the id of the operation + * Might be null for general errors or network errors that are broadcast to all listeners + */ + val id: String? +} + +internal class OperationResponse(override val id: String?, val payload: JsonObject) : Event +internal class OperationError(override val id: String?, val payload: Map?) : Event +internal class OperationComplete(override val id: String?) : Event +internal class ConnectionReEstablished : Event { + override val id: String? = null +} + +internal class NetworkError(val cause: Throwable) : Event { + override val id: String? = null +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ff8ce3c4..f1163f54 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -84,6 +84,7 @@ ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" } ktor-client-serialization = { module = "io.ktor:ktor-client-serialization", version.ref = "ktor" } ktor-client-okhttp = { module = "io.ktor:ktor-client-okhttp", version.ref = "ktor" } ktor-client-darwin = { module = "io.ktor:ktor-client-darwin", version.ref = "ktor" } +ktor-client-websockets = { module = "io.ktor:ktor-client-websockets", version.ref = "ktor" } kotest-assertions = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest" } diff --git a/javatest/build.gradle.kts b/javatest/build.gradle.kts index fb87bcf8..d439ea9e 100644 --- a/javatest/build.gradle.kts +++ b/javatest/build.gradle.kts @@ -3,10 +3,6 @@ plugins { kotlin("jvm") } -repositories { - mavenCentral() -} - java { sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 @@ -15,6 +11,7 @@ java { testImplementation("com.google.code.gson:gson:2.10.1") testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") implementation(project(":lightspark-sdk")) + implementation(project(":wallet-sdk")) implementation(project(":core")) } } diff --git a/javatest/src/test/java/com/lightspark/javatest/LightsparkFuturesWalletClientTest.java b/javatest/src/test/java/com/lightspark/javatest/LightsparkFuturesWalletClientTest.java new file mode 100644 index 00000000..06f6cc67 --- /dev/null +++ b/javatest/src/test/java/com/lightspark/javatest/LightsparkFuturesWalletClientTest.java @@ -0,0 +1,86 @@ +package com.lightspark.javatest; + +import static com.lightspark.sdk.core.crypto.SigningKt.generateSigningKeyPair; +import static com.lightspark.sdk.core.util.PlatformKt.getPlatform; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import com.lightspark.sdk.wallet.ClientConfig; +import com.lightspark.sdk.wallet.LightsparkFuturesWalletClient; +import com.lightspark.sdk.wallet.LightsparkSyncWalletClient; +import com.lightspark.sdk.wallet.auth.jwt.CustomJwtAuthProvider; +import com.lightspark.sdk.wallet.auth.jwt.InMemoryJwtStorage; +import com.lightspark.sdk.wallet.auth.jwt.JwtStorage; +import com.lightspark.sdk.wallet.model.KeyType; +import com.lightspark.sdk.wallet.model.LoginWithJWTOutput; +import com.lightspark.sdk.wallet.model.Wallet; +import com.lightspark.sdk.wallet.model.WalletStatus; + +import org.junit.jupiter.api.Test; + +import java.security.KeyPair; +import java.util.Base64; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import kotlin.Unit; + +public class LightsparkFuturesWalletClientTest { + private String apiAccountId = getPlatform().getEnv("LIGHTSPARK_ACCOUNT_ID"); + private String apiJwt = getPlatform().getEnv("LIGHTSPARK_JWT"); + private String signingPubKey = getPlatform().getEnv("LIGHTSPARK_WALLET_PUB_KEY"); + private String signingPrivKey = getPlatform().getEnv("LIGHTSPARK_WALLET_PRIV_KEY"); + private JwtStorage jwtStorage = new InMemoryJwtStorage(); + private final ClientConfig config = new ClientConfig() + .setAuthProvider(new CustomJwtAuthProvider(jwtStorage)); + private final LightsparkFuturesWalletClient client; + + LightsparkFuturesWalletClientTest() { + String baseUrl = getPlatform().getEnv("LIGHTSPARK_EXAMPLE_BASE_URL"); + if (baseUrl != null) { + config.setServerUrl(baseUrl); + } + client = new LightsparkFuturesWalletClient(config); + } + + + @Test + void deployAndInitializeWallet() throws Exception { + LoginWithJWTOutput output = client.loginWithJWT(apiAccountId, apiJwt, jwtStorage).get(5, TimeUnit.SECONDS); + AtomicReference currentWallet = new AtomicReference<>(output.getWallet()); + if (currentWallet.get().getStatus() == WalletStatus.NOT_SETUP || currentWallet.get().getStatus() == WalletStatus.TERMINATED) { + client.deployWalletAndAwaitDeployed(wallet -> { + System.out.println("Wallet update: " + wallet.getStatus()); + assertNotEquals(wallet.getStatus(), WalletStatus.FAILED); + if (wallet.getStatus() == WalletStatus.DEPLOYED) { + System.out.println("Wallet deployed!"); + } + currentWallet.set(wallet); + return Unit.INSTANCE; + } + ).get(30, TimeUnit.SECONDS); + } + + if (currentWallet.get().getStatus() == WalletStatus.DEPLOYED) { + KeyPair keypair = generateSigningKeyPair(); + signingPubKey = Base64.getEncoder().encodeToString(keypair.getPublic().getEncoded()); + signingPrivKey = Base64.getEncoder().encodeToString(keypair.getPrivate().getEncoded()); + System.out.println("Save these keys:"); + System.out.println(signingPubKey); + System.out.println(signingPrivKey); + + client.loadWalletSigningKey(keypair.getPrivate().getEncoded()); + client.initializeWalletAndWaitForInitialized(KeyType.RSA_OAEP, signingPubKey, signingPrivKey, wallet -> { + System.out.println("Wallet update: " + wallet.getStatus()); + assertNotEquals(wallet.getStatus(), WalletStatus.FAILED); + if (wallet.getStatus() == WalletStatus.READY) { + System.out.println("Wallet initialized!"); + } + currentWallet.set(wallet); + return Unit.INSTANCE; + } + ).get(5, TimeUnit.MINUTES); + } + + System.out.println("Post-initialized wallet:" + currentWallet.get()); + } +} diff --git a/javatest/src/test/java/com/lightspark/javatest/LightsparkSyncWalletClientTest.java b/javatest/src/test/java/com/lightspark/javatest/LightsparkSyncWalletClientTest.java index dc1e5ea5..902b5c8a 100644 --- a/javatest/src/test/java/com/lightspark/javatest/LightsparkSyncWalletClientTest.java +++ b/javatest/src/test/java/com/lightspark/javatest/LightsparkSyncWalletClientTest.java @@ -1,146 +1,84 @@ package com.lightspark.javatest; +import static com.lightspark.sdk.core.crypto.SigningKt.generateSigningKeyPair; import static com.lightspark.sdk.core.util.PlatformKt.getPlatform; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.google.gson.Gson; -import com.lightspark.sdk.ClientConfig; -import com.lightspark.sdk.LightsparkSyncClient; -import com.lightspark.sdk.auth.AccountApiTokenAuthProvider; -import com.lightspark.sdk.core.requester.Query; -import com.lightspark.sdk.core.requester.ServerEnvironment; -import com.lightspark.sdk.graphql.AccountDashboard; -import com.lightspark.sdk.model.Account; -import com.lightspark.sdk.model.AccountToNodesConnection; -import com.lightspark.sdk.model.AccountToTransactionsConnection; -import com.lightspark.sdk.model.BitcoinNetwork; -import com.lightspark.sdk.model.LightsparkNode; -import com.lightspark.sdk.model.Transaction; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import com.lightspark.sdk.wallet.ClientConfig; +import com.lightspark.sdk.wallet.LightsparkSyncWalletClient; +import com.lightspark.sdk.wallet.auth.jwt.CustomJwtAuthProvider; +import com.lightspark.sdk.wallet.auth.jwt.InMemoryJwtStorage; +import com.lightspark.sdk.wallet.auth.jwt.JwtStorage; +import com.lightspark.sdk.wallet.model.KeyType; +import com.lightspark.sdk.wallet.model.LoginWithJWTOutput; +import com.lightspark.sdk.wallet.model.Wallet; +import com.lightspark.sdk.wallet.model.WalletStatus; import org.junit.jupiter.api.Test; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.security.KeyPair; +import java.util.Base64; +import java.util.concurrent.atomic.AtomicReference; -public class LightsparkSyncClientTest { - private static final String API_TOKEN_CLIENT_ID = getPlatform().getEnv("LIGHTSPARK_API_TOKEN_CLIENT_ID"); - private static final String API_TOKEN_CLIENT_SECRET = getPlatform().getEnv("LIGHTSPARK_API_TOKEN_CLIENT_SECRET"); - private final ClientConfig config = new ClientConfig() - .setAuthProvider(new AccountApiTokenAuthProvider(API_TOKEN_CLIENT_ID, API_TOKEN_CLIENT_SECRET)) - .setDefaultBitcoinNetwork(BitcoinNetwork.REGTEST) - .setServerUrl(ServerEnvironment.DEV.getGraphQLUrl()); +import kotlin.Unit; - private final LightsparkSyncClient client = new LightsparkSyncClient(config); +public class LightsparkSyncWalletClientTest { + private String apiAccountId = getPlatform().getEnv("LIGHTSPARK_ACCOUNT_ID"); + private String apiJwt = getPlatform().getEnv("LIGHTSPARK_JWT"); + private String signingPubKey = getPlatform().getEnv("LIGHTSPARK_WALLET_PUB_KEY"); + private String signingPrivKey = getPlatform().getEnv("LIGHTSPARK_WALLET_PRIV_KEY"); + private JwtStorage jwtStorage = new InMemoryJwtStorage(); + private final ClientConfig config = new ClientConfig() + .setAuthProvider(new CustomJwtAuthProvider(jwtStorage)); + private final LightsparkSyncWalletClient client; - @Test - public void testAccountDashboard() throws Exception { - AccountDashboard dashboard = client.getFullAccountDashboard(BitcoinNetwork.REGTEST, null); - assertNotNull(dashboard); - System.out.println(dashboard); + LightsparkSyncWalletClientTest() { + String baseUrl = getPlatform().getEnv("LIGHTSPARK_EXAMPLE_BASE_URL"); + if (baseUrl != null) { + config.setServerUrl(baseUrl); + } + client = new LightsparkSyncWalletClient(config); } - @Test - public void testNodeQueries() throws Exception { - Account account = client.getCurrentAccount(); - assertNotNull(account); - List nodes = Objects.requireNonNull(client.executeQuery(account.getNodesQuery())).getEntities(); - assertNotNull(nodes); - assertTrue(nodes.size() > 0); - System.out.println(nodes.get(0)); - } @Test - public void testRawQuery() throws Exception { - Map variables = new HashMap<>(); - variables.put("network", BitcoinNetwork.REGTEST.name()); - Integer conductivity = client.executeQuery(new Query<>( - "query MyCustomQuery($network: BitcoinNetwork!) {" + - " current_account {" + - " conductivity(bitcoin_networks: [$network])" + - " }" + - " }", - variables, - jsonString -> { - Gson gson = new Gson(); - MyCustomQuery result = gson.fromJson(jsonString, MyCustomQuery.class); - return result.current_account.conductivity; - + void deployAndInitializeWallet() throws Exception { + LoginWithJWTOutput output = client.loginWithJWT(apiAccountId, apiJwt, jwtStorage); + AtomicReference currentWallet = new AtomicReference<>(output.getWallet()); + if (currentWallet.get().getStatus() == WalletStatus.NOT_SETUP || currentWallet.get().getStatus() == WalletStatus.TERMINATED) { + client.deployWalletAndAwaitDeployed(wallet -> { + System.out.println("Wallet update: " + wallet.getStatus()); + assertNotEquals(wallet.getStatus(), WalletStatus.FAILED); + if (wallet.getStatus() == WalletStatus.DEPLOYED) { + System.out.println("Wallet deployed!"); } - ) - ); - System.out.println("My conductivity is " + conductivity); - } - - @Test - public void testFirstPageOfPagination() throws Exception { - Account account = client.getCurrentAccount(); - assertNotNull(account); - AccountToTransactionsConnection connection = client.executeQuery( - account.getTransactionsQuery(20) - ); - - assertNotNull(connection); - assertTrue(connection.getEntities().size() > 0); - System.out.println("The total number of transactions is " + connection.getCount()); - - // Let's print the IDs for the current page (the first 20 transactions) - for (Transaction transaction : connection.getEntities()) { - System.out.println("Transaction ID = " + transaction.getId()); + currentWallet.set(wallet); + return Unit.INSTANCE; + } + ); } - } - - @Test - public void testAllPagesOfPagination() throws Exception { - Account account = client.getCurrentAccount(); - assertNotNull(account); - AccountToTransactionsConnection connection = client.executeQuery( - account.getTransactionsQuery(20) - ); - assertNotNull(connection); - int MAX_ITERATIONS = 10; - int numIterations = 1; - while (Boolean.TRUE.equals(connection.getPageInfo().getHasNextPage()) && numIterations < MAX_ITERATIONS) { - System.out.println("Fetching the following page."); - numIterations = numIterations + 1; - connection = client.executeQuery(account.getTransactionsQuery( - 20, - connection.getPageInfo().getEndCursor() - )); - assertNotNull(connection); - for (Transaction transaction : connection.getEntities()) { - System.out.println("Transaction ID = " + transaction.getId()); - } + if (currentWallet.get().getStatus() == WalletStatus.DEPLOYED) { + KeyPair keypair = generateSigningKeyPair(); + signingPubKey = Base64.getEncoder().encodeToString(keypair.getPublic().getEncoded()); + signingPrivKey = Base64.getEncoder().encodeToString(keypair.getPrivate().getEncoded()); + System.out.println("Save these keys:"); + System.out.println(signingPubKey); + System.out.println(signingPrivKey); + + client.loadWalletSigningKey(keypair.getPrivate().getEncoded()); + client.initializeWalletAndWaitForInitialized(KeyType.RSA_OAEP, signingPubKey, signingPrivKey, wallet -> { + System.out.println("Wallet update: " + wallet.getStatus()); + assertNotEquals(wallet.getStatus(), WalletStatus.FAILED); + if (wallet.getStatus() == WalletStatus.READY) { + System.out.println("Wallet initialized!"); + } + currentWallet.set(wallet); + return Unit.INSTANCE; + } + ); } - System.out.println("Everything was loaded!"); - } - - @Test - public void testEntityRequest() throws Exception { - String nodeId = getNodeId(); - LightsparkNode node = client.executeQuery(LightsparkNode.getLightsparkNodeQuery(nodeId)); - assertNotNull(node); - System.out.println("Node ID = " + node.getId()); - } - - private String getNodeId() throws Exception { - Account account = client.getCurrentAccount(); - assertNotNull(account); - AccountToNodesConnection connection = client.executeQuery( - account.getNodesQuery(1) - ); - assertNotNull(connection); - return connection.getEntities().get(0).getId(); - } -} -class MyCustomQuery { - static class CurrentAccount { - int conductivity; + System.out.println("Post-initialized wallet:" + currentWallet.get()); } - - CurrentAccount current_account; } diff --git a/wallet-sdk/build.gradle.kts b/wallet-sdk/build.gradle.kts index 3afd1988..041b4607 100644 --- a/wallet-sdk/build.gradle.kts +++ b/wallet-sdk/build.gradle.kts @@ -39,6 +39,7 @@ kotlin { api(libs.kotlinx.datetime) implementation(libs.kotlinx.coroutines.core) implementation(libs.ktor.client.core) + implementation(libs.ktor.client.websockets) // Can use this while locally developing, but should use the published version when publishing: // implementation(project(":core")) implementation(libs.lightspark.core) @@ -129,8 +130,8 @@ android { tasks.matching { name == "bumpAndTagVersion" || name == "bumpVersion" }.configureEach { doFirst { if (project.configurations["commonMainImplementationDependenciesMetadata"].resolvedConfiguration - .lenientConfiguration.artifacts - .any { it.moduleVersion.id.group == "Lightspark" && it.moduleVersion.id.name == "core" } + .lenientConfiguration.artifacts + .any { it.moduleVersion.id.group == "Lightspark" && it.moduleVersion.id.name == "core" } ) { throw GradleException("Cannot depend directly on core. Depend on the published module instead.") } diff --git a/wallet-sdk/src/androidUnitTest/kotlin/com/lightspark/sdk/wallet/ClientIntegrationTests.kt b/wallet-sdk/src/androidUnitTest/kotlin/com/lightspark/sdk/wallet/ClientIntegrationTests.kt index 5f76d5ab..85615591 100644 --- a/wallet-sdk/src/androidUnitTest/kotlin/com/lightspark/sdk/wallet/ClientIntegrationTests.kt +++ b/wallet-sdk/src/androidUnitTest/kotlin/com/lightspark/sdk/wallet/ClientIntegrationTests.kt @@ -1,7 +1,6 @@ package com.lightspark.sdk.wallet import com.lightspark.sdk.core.crypto.generateSigningKeyPair -import com.lightspark.sdk.core.requester.ServerEnvironment import com.lightspark.sdk.core.util.getPlatform import com.lightspark.sdk.wallet.auth.jwt.CustomJwtAuthProvider import com.lightspark.sdk.wallet.auth.jwt.InMemoryJwtStorage @@ -18,7 +17,6 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.ktor.util.decodeBase64Bytes import io.ktor.util.encodeBase64 -import saschpe.kase64.base64Encoded import kotlin.test.Test import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.TestScope @@ -27,6 +25,7 @@ import kotlinx.datetime.Clock import kotlinx.datetime.DateTimePeriod import kotlinx.datetime.TimeZone import kotlinx.datetime.minus +import saschpe.kase64.base64Encoded @OptIn(ExperimentalCoroutinesApi::class) class ClientIntegrationTests { diff --git a/wallet-sdk/src/commonJvmAndroidMain/kotlin/com/lightspark/sdk/wallet/LightsparkFuturesWalletClient.kt b/wallet-sdk/src/commonJvmAndroidMain/kotlin/com/lightspark/sdk/wallet/LightsparkFuturesWalletClient.kt index 0eae1d75..43439b40 100644 --- a/wallet-sdk/src/commonJvmAndroidMain/kotlin/com/lightspark/sdk/wallet/LightsparkFuturesWalletClient.kt +++ b/wallet-sdk/src/commonJvmAndroidMain/kotlin/com/lightspark/sdk/wallet/LightsparkFuturesWalletClient.kt @@ -13,6 +13,8 @@ import kotlin.coroutines.cancellation.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.future.future +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.* /** @@ -95,6 +97,51 @@ class LightsparkFuturesWalletClient constructor(config: ClientConfig) { coroutinesClient.initializeWallet(keyType, signingPublicKey, signingPrivateKey) } + /** + * Deploys a wallet in the Lightspark infrastructure and triggers updates as state changes. + * This is an asynchronous operation, which will continue sending the wallet state updates until + * the Wallet status changes to `DEPLOYED` (or `FAILED`). + * + * @param callback A callback that will be called periodically until the wallet is deployed or failed. + * @throws LightsparkAuthenticationException if there is no valid authentication. + */ + @Throws(LightsparkAuthenticationException::class, CancellationException::class) + fun deployWalletAndAwaitDeployed(callback: (Wallet) -> Unit): CompletableFuture = + coroutineScope.future { + var wallet: Wallet? = null + coroutinesClient.deployWalletAndAwaitDeployed().collect { + wallet = it + coroutineScope.launch { callback(it) } + } + requireNotNull(wallet) { "Failed to deploy wallet." } + } + + /** + * Initializes a wallet in the Lightspark infrastructure and syncs it to the Bitcoin network and triggers updates + * as state changes. This is an asynchronous operation, which will continue sending the wallet state updates until + * the Wallet status changes to `READY` (or `FAILED`). + * + * @param keyType The type of key to use for the wallet. + * @param signingPublicKey The base64-encoded public key to use for signing transactions. + * @param callback A callback that will be called periodically until the wallet is ready or failed. + * @throws LightsparkAuthenticationException if there is no valid authentication. + */ + @Throws(LightsparkAuthenticationException::class, CancellationException::class) + fun initializeWalletAndWaitForInitialized( + keyType: KeyType, + signingPublicKey: String, + signingPrivateKey: String, + callback: (Wallet) -> Unit + ): CompletableFuture = + coroutineScope.future { + var wallet: Wallet? = null + coroutinesClient.initializeWalletAndWaitForInitialized(keyType, signingPublicKey, signingPrivateKey).collect { + wallet = it + coroutineScope.launch { callback(it) } + } + requireNotNull(wallet) { "Failed to initialize wallet." } + } + /** * Removes the wallet from Lightspark infrastructure. It won't be connected to the Lightning network anymore and * its funds won't be accessible outside of the Funds Recovery Kit process. diff --git a/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkCoroutinesWalletClient.kt b/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkCoroutinesWalletClient.kt index 733adf37..5040f95d 100644 --- a/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkCoroutinesWalletClient.kt +++ b/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkCoroutinesWalletClient.kt @@ -2,6 +2,7 @@ package com.lightspark.sdk.wallet +import com.lightspark.sdk.core.Lce import com.lightspark.sdk.core.LightsparkErrorCode import com.lightspark.sdk.core.LightsparkException import com.lightspark.sdk.core.auth.* @@ -15,14 +16,14 @@ import com.lightspark.sdk.wallet.auth.jwt.JwtTokenInfo import com.lightspark.sdk.wallet.graphql.* import com.lightspark.sdk.wallet.model.* import com.lightspark.sdk.wallet.util.serializerFormat -import saschpe.kase64.base64DecodedBytes import kotlin.coroutines.cancellation.CancellationException -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.transformWhile import kotlinx.serialization.json.* +import saschpe.kase64.base64DecodedBytes private const val WALLET_NODE_ID_KEY = "wallet_node_id" private const val SCHEMA_ENDPOINT = "graphql/wallet/2023-05-05" @@ -139,15 +140,22 @@ class LightsparkCoroutinesWalletClient private constructor( } private fun awaitWalletStatus(statuses: Set): Flow { - // TODO: Switch from polling to a subscription when that's possible. - return flow { - var wallet = getCurrentWallet() ?: return@flow - while (wallet.status !in statuses) { - emit(wallet) - delay(3000) - wallet = getCurrentWallet() ?: return@flow + return requester.executeAsSubscription( + Query( + CurrentWalletSubscription, + {}, + ) { + val walletJson = it["current_wallet"] ?: return@Query null + serializerFormat.decodeFromJsonElement(walletJson) + }, + ).mapNotNull { + when (it) { + is Lce.Content -> it.data + else -> null } - emit(wallet) + }.transformWhile { + emit(it) + it.status !in statuses } } @@ -171,7 +179,7 @@ class LightsparkCoroutinesWalletClient private constructor( Query( InitializeWallet, { - add("key_type", keyType) + add("key_type", keyType.rawValue) add("signing_public_key", signingPublicKey) }, signingNodeId = WALLET_NODE_ID_KEY, @@ -301,7 +309,7 @@ class LightsparkCoroutinesWalletClient private constructor( { add("amountMsats", amountMsats) memo?.let { add("memo", memo) } - add("type", type) + add("type", type.rawValue) }, ) { val invoiceJson = @@ -394,19 +402,14 @@ class LightsparkCoroutinesWalletClient private constructor( transactionId: String, statuses: Set, ): Flow { - // TODO: Switch from polling to a subscription when that's possible. - return flow { - var payment = - OutgoingPayment.getOutgoingPaymentQuery(transactionId).execute(this@LightsparkCoroutinesWalletClient) - ?: return@flow - while (payment.status !in statuses) { - emit(payment) - delay(1000) - payment = OutgoingPayment.getOutgoingPaymentQuery(transactionId) - .execute(this@LightsparkCoroutinesWalletClient) - ?: return@flow + return requester.executeAsSubscription(OutgoingPayment.getOutgoingPaymentQuery(transactionId)).mapNotNull { + when (it) { + is Lce.Content -> it.data + else -> null } - emit(payment) + }.transformWhile { + emit(it) + it.status !in statuses } } @@ -714,7 +717,7 @@ class LightsparkCoroutinesWalletClient private constructor( { add("amount_msats", amountMsats) memo?.let { add("memo", memo) } - add("invoice_type", invoiceType) + invoiceType?.let { add("invoice_type", it.rawValue) } }, ) { val outputJson = diff --git a/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkSyncWalletClient.kt b/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkSyncWalletClient.kt index 044b54f6..867d9539 100644 --- a/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkSyncWalletClient.kt +++ b/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/LightsparkSyncWalletClient.kt @@ -9,6 +9,9 @@ import com.lightspark.sdk.wallet.auth.jwt.JwtStorage import com.lightspark.sdk.wallet.graphql.* import com.lightspark.sdk.wallet.model.* import kotlin.coroutines.cancellation.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.* @@ -47,6 +50,7 @@ import kotlinx.serialization.json.* */ class LightsparkSyncWalletClient constructor(config: ClientConfig) { private val asyncClient: LightsparkCoroutinesWalletClient = LightsparkCoroutinesWalletClient(config) + private val parallelCoroutineScope = CoroutineScope(Dispatchers.Default) /** * Override the auth token provider for this client to provide custom headers on all API calls. @@ -100,6 +104,47 @@ class LightsparkSyncWalletClient constructor(config: ClientConfig) { asyncClient.initializeWallet(keyType, signingPublicKey, signingPrivateKey) } + /** + * Deploys a wallet in the Lightspark infrastructure and triggers updates as state changes. + * This is an asynchronous operation, which will continue sending the wallet state updates until + * the Wallet status changes to `DEPLOYED` (or `FAILED`). + * + * @param callback A callback that will be called periodically until the wallet is deployed or failed. + * @throws LightsparkAuthenticationException if there is no valid authentication. + */ + @Throws(LightsparkAuthenticationException::class, CancellationException::class) + fun deployWalletAndAwaitDeployed(callback: (Wallet) -> Unit) { + runBlocking { + asyncClient.deployWalletAndAwaitDeployed().collect { + parallelCoroutineScope.launch { callback(it) } + } + } + } + + /** + * Initializes a wallet in the Lightspark infrastructure and syncs it to the Bitcoin network and triggers updates + * as state changes. This is an asynchronous operation, which will continue sending the wallet state updates until + * the Wallet status changes to `READY` (or `FAILED`). + * + * @param keyType The type of key to use for the wallet. + * @param signingPublicKey The base64-encoded public key to use for signing transactions. + * @param callback A callback that will be called periodically until the wallet is ready or failed. + * @throws LightsparkAuthenticationException if there is no valid authentication. + */ + @Throws(LightsparkAuthenticationException::class, CancellationException::class) + fun initializeWalletAndWaitForInitialized( + keyType: KeyType, + signingPublicKey: String, + signingPrivateKey: String, + callback: (Wallet) -> Unit + ) { + runBlocking { + asyncClient.initializeWalletAndWaitForInitialized(keyType, signingPublicKey, signingPrivateKey).collect { + parallelCoroutineScope.launch { callback(it) } + } + } + } + /** * Removes the wallet from Lightspark infrastructure. It won't be connected to the Lightning network anymore and * its funds won't be accessible outside of the Funds Recovery Kit process. diff --git a/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/graphql/CurrentWallet.kt b/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/graphql/CurrentWallet.kt index 5deebbcf..641421ae 100644 --- a/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/graphql/CurrentWallet.kt +++ b/wallet-sdk/src/commonMain/kotlin/com/lightspark/sdk/wallet/graphql/CurrentWallet.kt @@ -11,3 +11,13 @@ query CurrentWallet { ${Wallet.FRAGMENT} """ + +const val CurrentWalletSubscription = """ +subscription CurrentWallet { + current_wallet { + ...WalletFragment + } +} + +${Wallet.FRAGMENT} +"""