diff --git a/packages/app/lib/io/p2panda/logs.dart b/packages/app/lib/io/p2panda/logs.dart new file mode 100644 index 00000000..a9f32c45 --- /dev/null +++ b/packages/app/lib/io/p2panda/logs.dart @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +import 'package:app/io/p2panda/p2panda.dart'; + +class Logger { + Logger() { + p2panda.subscribeLogStream().listen((logEntry) { + print(logEntry.msg); + }); + } +} diff --git a/packages/app/lib/io/p2panda/node.dart b/packages/app/lib/io/p2panda/node.dart index d2cbcd1a..44c307c3 100644 --- a/packages/app/lib/io/p2panda/node.dart +++ b/packages/app/lib/io/p2panda/node.dart @@ -12,6 +12,9 @@ const List relayAddresses = (bool.hasEnvironment("RELAY_ADDRESS") && ? [String.fromEnvironment("RELAY_ADDRESS")] : []; +const String logLevel = + String.fromEnvironment("LOG_LEVEL", defaultValue: "ERROR"); + /// Start a p2panda node in the background. Future startNode() async { // Determine folder where we can persist data @@ -26,6 +29,7 @@ Future startNode() async { // Start node in background thread p2panda.startNode( + logLevel: logLevel, keyPair: key, databaseUrl: databaseUrl, blobsBasePath: basePath, diff --git a/packages/p2panda/lib/src/bridge_generated.dart b/packages/p2panda/lib/src/bridge_generated.dart index b47b1360..e236ef53 100644 --- a/packages/p2panda/lib/src/bridge_generated.dart +++ b/packages/p2panda/lib/src/bridge_generated.dart @@ -14,6 +14,10 @@ import 'dart:ffi' as ffi; part 'bridge_generated.freezed.dart'; abstract class P2Panda { + Stream subscribeLogStream({dynamic hint}); + + FlutterRustBridgeTaskConstMeta get kSubscribeLogStreamConstMeta; + /// Create, sign and encode a p2panda entry. /// /// Takes large u64 integers for log id and seq num as strings. If we would declare them as u64 @@ -60,7 +64,8 @@ abstract class P2Panda { /// /// Supports Android logging for logs coming from the node. Future startNode( - {required KeyPair keyPair, + {required String logLevel, + required KeyPair keyPair, required String databaseUrl, required String blobsBasePath, required List relayAddresses, @@ -149,6 +154,28 @@ class KeyPair { ); } +class LogEntry { + final int timestamp; + final LogLevel level; + final String tag; + final String msg; + + const LogEntry({ + required this.timestamp, + required this.level, + required this.tag, + required this.msg, + }); +} + +enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, +} + /// Operations are categorised by their action type. /// /// An action defines the operation format and if this operation creates, updates or deletes a data @@ -225,6 +252,23 @@ class P2PandaImpl implements P2Panda { factory P2PandaImpl.wasm(FutureOr module) => P2PandaImpl(module as ExternalLibrary); P2PandaImpl.raw(this._platform); + Stream subscribeLogStream({dynamic hint}) { + return _platform.executeStream(FlutterRustBridgeTask( + callFfi: (port_) => _platform.inner.wire_subscribe_log_stream(port_), + parseSuccessData: _wire2api_log_entry, + parseErrorData: null, + constMeta: kSubscribeLogStreamConstMeta, + argValues: [], + hint: hint, + )); + } + + FlutterRustBridgeTaskConstMeta get kSubscribeLogStreamConstMeta => + const FlutterRustBridgeTaskConstMeta( + debugName: "subscribe_log_stream", + argNames: [], + ); + Future signAndEncodeEntry( {required String logId, required String seqNum, @@ -331,24 +375,27 @@ class P2PandaImpl implements P2Panda { ); Future startNode( - {required KeyPair keyPair, + {required String logLevel, + required KeyPair keyPair, required String databaseUrl, required String blobsBasePath, required List relayAddresses, required List allowSchemaIds, dynamic hint}) { - var arg0 = _platform.api2wire_box_autoadd_key_pair(keyPair); - var arg1 = _platform.api2wire_String(databaseUrl); - var arg2 = _platform.api2wire_String(blobsBasePath); - var arg3 = _platform.api2wire_StringList(relayAddresses); - var arg4 = _platform.api2wire_StringList(allowSchemaIds); + var arg0 = _platform.api2wire_String(logLevel); + var arg1 = _platform.api2wire_box_autoadd_key_pair(keyPair); + var arg2 = _platform.api2wire_String(databaseUrl); + var arg3 = _platform.api2wire_String(blobsBasePath); + var arg4 = _platform.api2wire_StringList(relayAddresses); + var arg5 = _platform.api2wire_StringList(allowSchemaIds); return _platform.executeNormal(FlutterRustBridgeTask( - callFfi: (port_) => - _platform.inner.wire_start_node(port_, arg0, arg1, arg2, arg3, arg4), + callFfi: (port_) => _platform.inner + .wire_start_node(port_, arg0, arg1, arg2, arg3, arg4, arg5), parseSuccessData: _wire2api_unit, parseErrorData: _wire2api_FrbAnyhowException, constMeta: kStartNodeConstMeta, argValues: [ + logLevel, keyPair, databaseUrl, blobsBasePath, @@ -363,6 +410,7 @@ class P2PandaImpl implements P2Panda { const FlutterRustBridgeTaskConstMeta( debugName: "start_node", argNames: [ + "logLevel", "keyPair", "databaseUrl", "blobsBasePath", @@ -533,6 +581,22 @@ class P2PandaImpl implements P2Panda { ); } + LogEntry _wire2api_log_entry(dynamic raw) { + final arr = raw as List; + if (arr.length != 4) + throw Exception('unexpected arr length: expect 4 but see ${arr.length}'); + return LogEntry( + timestamp: _wire2api_u64(arr[0]), + level: _wire2api_log_level(arr[1]), + tag: _wire2api_String(arr[2]), + msg: _wire2api_String(arr[3]), + ); + } + + LogLevel _wire2api_log_level(dynamic raw) { + return LogLevel.values[raw as int]; + } + OperationAction _wire2api_operation_action(dynamic raw) { return OperationAction.values[raw as int]; } @@ -541,6 +605,10 @@ class P2PandaImpl implements P2Panda { return raw == null ? null : _wire2api_String(raw); } + int _wire2api_u64(dynamic raw) { + return castInt(raw); + } + int _wire2api_u8(dynamic raw) { return raw as int; } @@ -845,6 +913,20 @@ class P2PandaWire implements FlutterRustBridgeWireBase { late final _init_frb_dart_api_dl = _init_frb_dart_api_dlPtr .asFunction)>(); + void wire_subscribe_log_stream( + int port_, + ) { + return _wire_subscribe_log_stream( + port_, + ); + } + + late final _wire_subscribe_log_streamPtr = + _lookup>( + 'wire_subscribe_log_stream'); + late final _wire_subscribe_log_stream = + _wire_subscribe_log_streamPtr.asFunction(); + void wire_sign_and_encode_entry( int port_, ffi.Pointer log_id, @@ -955,6 +1037,7 @@ class P2PandaWire implements FlutterRustBridgeWireBase { void wire_start_node( int port_, + ffi.Pointer log_level, ffi.Pointer key_pair, ffi.Pointer database_url, ffi.Pointer blobs_base_path, @@ -963,6 +1046,7 @@ class P2PandaWire implements FlutterRustBridgeWireBase { ) { return _wire_start_node( port_, + log_level, key_pair, database_url, blobs_base_path, @@ -975,6 +1059,7 @@ class P2PandaWire implements FlutterRustBridgeWireBase { ffi.NativeFunction< ffi.Void Function( ffi.Int64, + ffi.Pointer, ffi.Pointer, ffi.Pointer, ffi.Pointer, @@ -983,6 +1068,7 @@ class P2PandaWire implements FlutterRustBridgeWireBase { late final _wire_start_node = _wire_start_nodePtr.asFunction< void Function( int, + ffi.Pointer, ffi.Pointer, ffi.Pointer, ffi.Pointer, diff --git a/packages/p2panda/native/Cargo.toml b/packages/p2panda/native/Cargo.toml index 0b2f434b..8f1d89b8 100644 --- a/packages/p2panda/native/Cargo.toml +++ b/packages/p2panda/native/Cargo.toml @@ -17,4 +17,4 @@ ed25519-dalek = "1.0.1" flutter_rust_bridge = "1.82.6" log = "0.4.19" p2panda-rs = "0.8.1" -tokio = { version = "1.28.2", features = ["rt", "rt-multi-thread", "sync", "parking_lot"] } +tokio = { version = "1.28.2", features = ["rt"] } diff --git a/packages/p2panda/native/src/api.rs b/packages/p2panda/native/src/api.rs index 15532331..4f49b212 100644 --- a/packages/p2panda/native/src/api.rs +++ b/packages/p2panda/native/src/api.rs @@ -1,11 +1,15 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use android_logger::{Config, FilterBuilder}; +use std::str::FromStr; +use std::sync::OnceLock; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use android_logger::{AndroidLogger, Config, Filter, FilterBuilder}; use anyhow::{anyhow, Result}; use aquadoggo::{AllowList, Configuration}; use ed25519_dalek::SecretKey; -use flutter_rust_bridge::RustOpaque; -use log::LevelFilter; +use flutter_rust_bridge::{RustOpaque, StreamSink}; +use log::{warn, Level, LevelFilter, Log, Record}; use p2panda_rs::document::DocumentViewId; use p2panda_rs::entry; use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry}; @@ -21,6 +25,104 @@ use crate::node::Manager; static NODE_INSTANCE: OnceCell = OnceCell::const_new(); +static STREAM_SINK: OnceCell> = OnceCell::const_new(); + +static LOGGER: OnceLock = OnceLock::new(); + +pub enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, +} + +pub struct LogEntry { + pub timestamp: u64, + pub level: LogLevel, + pub tag: String, + pub msg: String, +} + +struct Logger { + android_logger: AndroidLogger, + max_level: LevelFilter, + filter: Filter, +} + +impl Logger { + fn new(max_level: LevelFilter) -> Logger { + let filter = FilterBuilder::new() + .filter(Some("aquadoggo"), max_level) + .build(); + let android_logger = + AndroidLogger::new(Config::default().with_max_level(LevelFilter::Trace)); + + Logger { + android_logger, + max_level, + filter, + } + } + + fn record_to_entry(record: &Record) -> LogEntry { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_millis() as u64; + + let level = match record.level() { + Level::Trace => LogLevel::Trace, + Level::Debug => LogLevel::Debug, + Level::Info => LogLevel::Info, + Level::Warn => LogLevel::Warn, + Level::Error => LogLevel::Error, + }; + + let tag = record.file().unwrap_or_else(|| record.target()).to_owned(); + + let msg = format!("{}", record.args()); + + LogEntry { + timestamp, + level, + tag, + msg, + } + } +} + +impl Log for Logger { + fn enabled(&self, metadata: &log::Metadata) -> bool { + metadata.level() <= self.max_level + } + + fn log(&self, record: &log::Record) { + if !self.enabled(record.metadata()) { + return; + } + + if !self.filter.matches(record) { + return; + } + + match STREAM_SINK.get() { + Some(sink) => { + sink.add(Logger::record_to_entry(record)); + } + None => (), + } + + self.android_logger.log(record); + } + + fn flush(&self) {} +} + +pub fn subscribe_log_stream(sink: StreamSink) { + let _ = STREAM_SINK.set(sink); +} + pub type HexString = String; /// Ed25519 key pair for authors to sign p2panda entries with. @@ -207,26 +309,27 @@ pub fn decode_operation(operation: Vec) -> Result<(OperationAction, String)> )) } +fn init_logging(level: LevelFilter) { + let logger = LOGGER.get_or_init(|| Logger::new(level)); + if let Err(err) = log::set_logger(logger) { + warn!("logger setup failed: {err}"); + } else { + log::set_max_level(level); + } +} + /// Runs a p2panda node in a separate thread in the background. /// /// Supports Android logging for logs coming from the node. pub fn start_node( + log_level: String, key_pair: KeyPair, database_url: String, blobs_base_path: String, relay_addresses: Vec, allow_schema_ids: Vec, ) -> Result<()> { - // Initialise logging for Android developer console - android_logger::init_once( - Config::default() - .with_max_level(LevelFilter::Trace) - .with_filter( - FilterBuilder::new() - .filter(Some("aquadoggo"), LevelFilter::Debug) - .build(), - ), - ); + init_logging(LevelFilter::from_str(&log_level).expect("unknown log level")); // Set node configuration let mut config = Configuration::default(); diff --git a/packages/p2panda/native/src/bridge_generated.rs b/packages/p2panda/native/src/bridge_generated.rs index 611a700e..516bc409 100644 --- a/packages/p2panda/native/src/bridge_generated.rs +++ b/packages/p2panda/native/src/bridge_generated.rs @@ -22,6 +22,22 @@ use std::sync::Arc; // Section: wire functions +fn wire_subscribe_log_stream_impl(port_: MessagePort) { + FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, (), _>( + WrapInfo { + debug_name: "subscribe_log_stream", + port: Some(port_), + mode: FfiCallMode::Stream, + }, + move || { + move |task_callback| { + Result::<_, ()>::Ok(subscribe_log_stream( + task_callback.stream_sink::<_, LogEntry>(), + )) + } + }, + ) +} fn wire_sign_and_encode_entry_impl( port_: MessagePort, log_id: impl Wire2Api + UnwindSafe, @@ -110,6 +126,7 @@ fn wire_decode_operation_impl(port_: MessagePort, operation: impl Wire2Api + UnwindSafe, key_pair: impl Wire2Api + UnwindSafe, database_url: impl Wire2Api + UnwindSafe, blobs_base_path: impl Wire2Api + UnwindSafe, @@ -123,6 +140,7 @@ fn wire_start_node_impl( mode: FfiCallMode::Normal, }, move || { + let api_log_level = log_level.wire2api(); let api_key_pair = key_pair.wire2api(); let api_database_url = database_url.wire2api(); let api_blobs_base_path = blobs_base_path.wire2api(); @@ -130,6 +148,7 @@ fn wire_start_node_impl( let api_allow_schema_ids = allow_schema_ids.wire2api(); move |task_callback| { start_node( + api_log_level, api_key_pair, api_database_url, api_blobs_base_path, @@ -284,6 +303,43 @@ impl rust2dart::IntoIntoDart for KeyPair { } } +impl support::IntoDart for LogEntry { + fn into_dart(self) -> support::DartAbi { + vec![ + self.timestamp.into_into_dart().into_dart(), + self.level.into_into_dart().into_dart(), + self.tag.into_into_dart().into_dart(), + self.msg.into_into_dart().into_dart(), + ] + .into_dart() + } +} +impl support::IntoDartExceptPrimitive for LogEntry {} +impl rust2dart::IntoIntoDart for LogEntry { + fn into_into_dart(self) -> Self { + self + } +} + +impl support::IntoDart for LogLevel { + fn into_dart(self) -> support::DartAbi { + match self { + Self::Trace => 0, + Self::Debug => 1, + Self::Info => 2, + Self::Warn => 3, + Self::Error => 4, + } + .into_dart() + } +} +impl support::IntoDartExceptPrimitive for LogLevel {} +impl rust2dart::IntoIntoDart for LogLevel { + fn into_into_dart(self) -> Self { + self + } +} + impl support::IntoDart for OperationAction { fn into_dart(self) -> support::DartAbi { match self { @@ -312,6 +368,11 @@ mod io { use super::*; // Section: wire functions + #[no_mangle] + pub extern "C" fn wire_subscribe_log_stream(port_: i64) { + wire_subscribe_log_stream_impl(port_) + } + #[no_mangle] pub extern "C" fn wire_sign_and_encode_entry( port_: i64, @@ -357,6 +418,7 @@ mod io { #[no_mangle] pub extern "C" fn wire_start_node( port_: i64, + log_level: *mut wire_uint_8_list, key_pair: *mut wire_KeyPair, database_url: *mut wire_uint_8_list, blobs_base_path: *mut wire_uint_8_list, @@ -365,6 +427,7 @@ mod io { ) { wire_start_node_impl( port_, + log_level, key_pair, database_url, blobs_base_path, diff --git a/packages/p2panda/native/src/node.rs b/packages/p2panda/native/src/node.rs index ec2ccaa8..aedda023 100644 --- a/packages/p2panda/native/src/node.rs +++ b/packages/p2panda/native/src/node.rs @@ -8,6 +8,16 @@ use p2panda_rs::identity::KeyPair; use tokio::runtime; use tokio::sync::mpsc::{channel, Sender}; +struct PanicDetector {} + +impl Drop for PanicDetector { + fn drop(&mut self) { + if std::thread::panicking() { + panic!("OUUUCH"); + } + } +} + pub struct Manager { shutdown_signal: Sender, } @@ -17,6 +27,8 @@ impl Manager { let (shutdown_signal, mut on_shutdown) = channel(4); thread::spawn(move || { + let _ = PanicDetector {}; + let rt = runtime::Builder::new_current_thread() .enable_all() .build() @@ -32,6 +44,7 @@ impl Manager { node.shutdown().await; }); + }); Ok(Manager { shutdown_signal })