Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions balius-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use logging::LoggerHost;
use router::Router;
use sign::SignerHost;
use std::{collections::HashMap, io::Read, path::Path, sync::Arc, time::Instant};
use submit::SubmitHost;
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -64,6 +65,9 @@ pub enum Error {
#[error("ledger error: {0}")]
Ledger(String),

#[error("submit error: {0}")]
Submit(String),

#[error("config error: {0}")]
Config(String),

Expand Down Expand Up @@ -312,7 +316,7 @@ struct WorkerState {
pub logging: Option<logging::LoggerHost>,
pub kv: Option<kv::KvHost>,
pub sign: Option<sign::SignerHost>,
pub submit: Option<submit::Submit>,
pub submit: Option<submit::SubmitHost>,
pub http: Option<http::Http>,
}

Expand Down Expand Up @@ -560,7 +564,10 @@ impl Runtime {
.sign
.as_ref()
.map(|s| SignerHost::new(id, s, &self.metrics)),
submit: self.submit.clone(),
submit: self
.submit
.as_ref()
.map(|s| SubmitHost::new(id, s, &self.metrics)),
http: self.http.clone(),
},
);
Expand Down
11 changes: 11 additions & 0 deletions balius-runtime/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Metrics {
tx_handled: Counter<u64>,
undo_utxo_handled: Counter<u64>,
undo_tx_handled: Counter<u64>,
submit_tx: Counter<u64>,
signer_sign_payload: Counter<u64>,
ledger_read_utxos: Counter<u64>,
ledger_search_utxos: Counter<u64>,
Expand Down Expand Up @@ -78,6 +79,11 @@ impl Metrics {
.with_description("Amount of undo Tx event handled per worker.")
.build();

let submit_tx = meter
.u64_counter("submit_tx")
.with_description("Amount of submit_tx calls per worker.")
.build();

let signer_sign_payload = meter
.u64_counter("signer_sign_payload")
.with_description("Amount of sign payload handled per worker.")
Expand Down Expand Up @@ -150,6 +156,7 @@ impl Metrics {
tx_handled,
undo_utxo_handled,
undo_tx_handled,
submit_tx,
signer_sign_payload,
ledger_read_utxos,
ledger_search_utxos,
Expand Down Expand Up @@ -219,6 +226,10 @@ impl Metrics {
.add(1, &[KeyValue::new("worker", worker_id.to_owned())]);
}

pub fn submit_tx(&self, worker_id: &str) {
self.submit_tx
.add(1, &[KeyValue::new("worker", worker_id.to_owned())]);
}
pub fn signer_sign_payload(&self, worker_id: &str) {
self.signer_sign_payload
.add(1, &[KeyValue::new("worker", worker_id.to_owned())]);
Expand Down
48 changes: 43 additions & 5 deletions balius-runtime/src/submit/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,52 @@
use crate::wit::balius::app::submit as wit;
use std::sync::Arc;

use tokio::sync::Mutex;

use crate::{metrics::Metrics, wit::balius::app::submit as wit};

pub mod u5c;

#[async_trait::async_trait]
pub trait Submitter: Send + Sync {
async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError>;
}

#[derive(Clone)]
#[allow(clippy::large_enum_variant)]
pub enum Submit {
Mock,
U5C(u5c::Submit),
Custom(Arc<Mutex<dyn Submitter + Send + Sync>>),
}

impl wit::Host for Submit {
async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError> {
println!("{}", hex::encode(tx));
pub struct SubmitHost {
worker_id: String,
submit: Submit,
metrics: Arc<Metrics>,
}
impl SubmitHost {
pub fn new(worker_id: &str, submit: &Submit, metrics: &Arc<Metrics>) -> Self {
Self {
worker_id: worker_id.to_string(),
submit: submit.clone(),
metrics: metrics.clone(),
}
}
}

Ok(())
impl wit::Host for SubmitHost {
async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError> {
self.metrics.submit_tx(&self.worker_id);
match &mut self.submit {
Submit::Mock => {
println!("{}", hex::encode(tx));
Ok(())
}
Submit::U5C(x) => x.submit_tx(tx).await,
Submit::Custom(x) => {
let mut lock = x.lock().await;
lock.submit_tx(tx).await
}
}
}
}
50 changes: 50 additions & 0 deletions balius-runtime/src/submit/u5c.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::wit::balius::app::submit as wit;

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Config {
pub endpoint_url: String,
pub headers: Option<HashMap<String, String>>,
}

#[derive(Clone)]
pub struct Submit {
client: utxorpc::CardanoSubmitClient,
}

impl Submit {
pub async fn new(config: &Config) -> Result<Self, crate::Error> {
let mut builder = utxorpc::ClientBuilder::new().uri(&config.endpoint_url)?;
if let Some(headers) = &config.headers {
for (k, v) in headers.iter() {
builder = builder.metadata(k, v)?;
}
}

Ok(Self {
client: builder.build().await,
})
}

pub async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError> {
self.client
.submit_tx(vec![tx])
.await
.map_err(|err| match err {
utxorpc::Error::GrpcError(status) => {
let code: i32 = status.code().into();
if code == 3 {
wit::SubmitError::Invalid(status.to_string())
} else {
wit::SubmitError::Internal(status.to_string())
}
}
utxorpc::Error::TransportError(err) => wit::SubmitError::Internal(err.to_string()),
utxorpc::Error::ParseError(err) => wit::SubmitError::Internal(err.to_string()),
})?;
Ok(())
}
}
12 changes: 12 additions & 0 deletions balius-sdk/src/qol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
Ledger(wit::balius::app::ledger::LedgerError),
#[error("sign error: {0}")]
Sign(wit::balius::app::sign::SignError),
#[error("submit error: {0}")]
Submit(wit::balius::app::submit::SubmitError),
#[error("http error: {0}")]
Http(wit::balius::app::http::ErrorCode),
}
Expand Down Expand Up @@ -74,6 +76,10 @@ impl From<Error> for wit::HandleError {
code: 9,
message: format!("event mismatch, expected {x}"),
},
Error::Submit(err) => wit::HandleError {
code: 10,
message: err.to_string(),
},
}
}
}
Expand Down Expand Up @@ -102,6 +108,12 @@ impl From<wit::balius::app::sign::SignError> for Error {
}
}

impl From<wit::balius::app::submit::SubmitError> for Error {
fn from(error: wit::balius::app::submit::SubmitError) -> Self {
Error::Submit(error)
}
}

impl From<wit::balius::app::http::ErrorCode> for Error {
fn from(error: wit::balius::app::http::ErrorCode) -> Self {
Error::Http(error)
Expand Down
23 changes: 22 additions & 1 deletion baliusd/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use balius_runtime::{
sign::in_memory::{Ed25519Key, SignerKey},
};
use pallas::crypto::key::ed25519;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tokio::sync::{Mutex, RwLock};

Expand Down Expand Up @@ -145,6 +145,13 @@ pub enum HttpConfig {
Reqwest(ReqwestHttpConfig),
}

#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(tag = "type")]
#[serde(rename_all = "lowercase")]
pub enum SubmitConfig {
U5c(balius_runtime::submit::u5c::Config),
}

#[derive(Deserialize, Clone, Debug)]
pub struct Config {
pub rpc: drivers::jsonrpc::Config,
Expand All @@ -158,6 +165,7 @@ pub struct Config {
pub signing: Option<SignerConfig>,
pub store: Option<StoreConfig>,
pub http: Option<HttpConfig>,
pub submit: Option<SubmitConfig>,
}

impl From<&Config> for balius_runtime::kv::Kv {
Expand Down Expand Up @@ -232,3 +240,16 @@ impl From<&Config> for balius_runtime::http::Http {
balius_runtime::http::Http::Reqwest(client)
}
}

impl Config {
pub async fn into_submit(self) -> balius_runtime::submit::Submit {
match &self.submit {
Some(SubmitConfig::U5c(cfg)) => balius_runtime::submit::Submit::U5C(
balius_runtime::submit::u5c::Submit::new(cfg)
.await
.expect("Failed to convert config into submit interface"),
),
None => balius_runtime::submit::Submit::Mock,
}
}
}
1 change: 1 addition & 0 deletions baliusd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async fn daemon(debug: bool) -> miette::Result<()> {
.with_kv(kv)
.with_logger((&config).into())
.with_signer((&config).into())
.with_submit(config.clone().into_submit().await)
.with_http((&config).into())
.build()
.into_diagnostic()
Expand Down
2 changes: 1 addition & 1 deletion examples/asteria-tracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn handle_utxo(config: sdk::Config<Config>, utxo: sdk::Utxo<Datum>) -> sdk::Work
worker::logging::log(
worker::logging::Level::Debug,
&format!("UTxO {}:", &out_ref),
&format!("{:#?} - {pos_x_str} - {pos_y_str} - {fuel_str}", operation),
&format!("{operation:#?} - {pos_x_str} - {pos_y_str} - {fuel_str}"),
);

let _ = HttpRequest::post(url).json(&payload)?.send()?;
Expand Down
14 changes: 14 additions & 0 deletions examples/comprehensive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ fn signer_sign_payload(

#[serde_as]
#[derive(Serialize, Deserialize)]
struct SubmitTxParams {
cbor: String,
}

fn submit_tx(_: Config<MyConfig>, request: Params<SubmitTxParams>) -> WorkerResult<()> {
let cbor = hex::decode(&request.cbor).map_err(|_| Error::BadParams)?;
balius_sdk::wit::balius::app::submit::submit_tx(&cbor)?;

Ok(())
}

#[derive(Debug, Deserialize)]

struct LedgerSearchUtxosParams {
address: String,
max_items: u32,
Expand Down Expand Up @@ -236,6 +249,7 @@ fn main() -> balius_sdk::Worker {
FnHandler::from(signer_get_public_key),
)
.with_request_handler("signer-sign-payload", FnHandler::from(signer_sign_payload))
.with_request_handler("submit-tx", FnHandler::from(submit_tx))
.with_request_handler("ledger-search-utxos", FnHandler::from(ledger_search_utxos))
.with_signer("alice", "ed25519")
.with_signer("bob", "ed25519")
Expand Down
5 changes: 4 additions & 1 deletion wit/balius.wit
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ interface sign {

interface submit {
type cbor = list<u8>;
type submit-error = u32;
variant submit-error {
internal(string),
invalid(string)
}

submit-tx: func(tx: cbor) -> result<_, submit-error>;
}
Expand Down
Loading