diff --git a/.gitignore b/.gitignore index 519c388a03..dcefb9bed8 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,6 @@ cairo_venv # idea .idea -.env \ No newline at end of file +.env + +/epool \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 0b85ab6d0b..9c4d662b07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,7 +4832,9 @@ dependencies = [ "glob", "libc", "libz-sys", + "lz4-sys", "tikv-jemalloc-sys", + "zstd-sys", ] [[package]] @@ -5084,6 +5086,7 @@ dependencies = [ "substrate-build-script-utils", "substrate-frame-rpc-system", "substrate-prometheus-endpoint", + "sync-block", "tokio", "try-runtime-cli", ] @@ -5333,6 +5336,7 @@ name = "mc-transaction-pool" version = "4.0.0-dev" dependencies = [ "async-trait", + "bincode 1.3.3", "encryptor", "futures", "futures-timer", @@ -5356,6 +5360,7 @@ dependencies = [ "sp-transaction-pool", "starknet-crypto 0.6.0", "substrate-prometheus-endpoint", + "sync-block", "thiserror", "tokio", "vdf", @@ -10307,6 +10312,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync-block" +version = "0.1.0" +dependencies = [ + "base64 0.21.2", + "bincode 1.3.3", + "dotenv", + "hyper", + "rocksdb", + "serde", + "serde_json", + "serde_with", + "tokio", +] + [[package]] name = "synstructure" version = "0.12.6" diff --git a/Cargo.toml b/Cargo.toml index 04f2409083..f124f47447 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "crates/client/mapping-sync", "crates/client/storage", "crates/client/transaction-pool", + "crates/client/sync-block", ] [profile.release] diff --git a/crates/client/block-proposer/src/lib.rs b/crates/client/block-proposer/src/lib.rs index 220db71a4c..f0adf96b2c 100644 --- a/crates/client/block-proposer/src/lib.rs +++ b/crates/client/block-proposer/src/lib.rs @@ -612,7 +612,7 @@ where let mut lock = epool.lock().await; if lock.is_enabled() { let block_height = self.parent_number.to_string().parse::().unwrap() + 1; - let encrypted_tx_pool_size: usize = lock.len(block_height); +/* let encrypted_tx_pool_size: usize = lock.len(block_height); if encrypted_tx_pool_size > 0 { let encrypted_invoke_transactions = lock.get_encrypted_tx_pool(block_height); @@ -624,7 +624,7 @@ where submit_to_da(&encoded_data_for_da); // let da_block_height = submit_to_da(&encoded_data_for_da).await; // println!("this is the block_height: {}", da_block_height); - } + } */ lock.init_tx_pool(block_height); } } @@ -1215,7 +1215,7 @@ mod tests { } } -async fn submit_to_da(data: &str) -> String { +/* async fn submit_to_da(data: &str) -> String { dotenv().ok(); let da_host = env::var("DA_HOST").expect("DA_HOST must be set"); let da_namespace = env::var("DA_NAMESPACE").expect("DA_NAMESPACE must be set"); @@ -1267,4 +1267,4 @@ fn encode_data_to_base64(original: &str) -> String { // Convert bytes to base64 let base64_str: String = general_purpose::STANDARD.encode(&bytes); base64_str -} +} */ diff --git a/crates/client/sync-block/Cargo.toml b/crates/client/sync-block/Cargo.toml new file mode 100644 index 0000000000..fac59f0153 --- /dev/null +++ b/crates/client/sync-block/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "sync-block" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rocksdb = "0.21.0" +base64 = "0.21.2" +serde = { version = "1.0.175", default-features = false } +serde_json = { version = "1.0.100", features = ["std"] } +serde_with = { version = "2.3.3", default-features = false } +bincode = "1.3.3" +hyper = {version = "0.14.27", features = ["full"]} +tokio = { version = "1.30.0", features = ["full"] } +dotenv = "0.15" \ No newline at end of file diff --git a/crates/client/sync-block/src/lib.rs b/crates/client/sync-block/src/lib.rs new file mode 100644 index 0000000000..16d3a60548 --- /dev/null +++ b/crates/client/sync-block/src/lib.rs @@ -0,0 +1,209 @@ +// let a: Vec<_> = txs.encrypted_pool.values().cloned().collect(); +// println!("self.txs.get_mut: {:?}", a); + +use std::collections::HashMap; +use std::path::Path; +use std::str::Bytes; +use std::time::Duration; +use std::{env, str, thread, time}; + +use base64::engine::general_purpose; +use base64::Engine as _; +use bincode::{deserialize, serialize}; +use dotenv::dotenv; +use hyper::header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE}; +use hyper::{Body, Client, Request}; +use rocksdb::{Error, IteratorMode, DB}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio; +use tokio::runtime::Runtime; + +fn get_next_entry(start_key: &[u8]) -> (Box<[u8]>, Box<[u8]>) { + // Create an iterator starting from the key after the specified start_key. + let path = Path::new("epool"); + let db = DB::open_default(&path).expect("Failed to open database"); + + let mut iter = db.iterator(IteratorMode::From(start_key, rocksdb::Direction::Forward)); + + // Iterate to get the next entry. + iter.next().expect("Something wrong with iter.next function").unwrap() +} + +// fn get_next_entry(db: &DB, start_key: &[u8]) -> Result, Box<[u8]>)>, Error> { +// Create an iterator starting from the key after the specified start_key. +// let path = Path::new("epool"); +// let db = DB::open_default(&path).expect("Failed to open database"); +// +// let mut iter = db.iterator(IteratorMode::From(start_key, rocksdb::Direction::Forward)); +// +// Iterate to get the next entry. +// if let Some(Ok((key, value))) = iter.next() { +// return Ok(Some((key.to_vec().into_boxed_slice(), value.to_vec().into_boxed_slice()))); +// } +// +// Ok(None) // Return Ok(None) if no next entry is found. +// } + +fn encode_data_to_base64(original: &str) -> String { + // Convert string to bytes + let bytes = original.as_bytes(); + // Convert bytes to base64 + let base64_str: String = general_purpose::STANDARD.encode(&bytes); + base64_str +} + +pub fn submit_block_to_db(block_height: u64, txs: Vec) { + println!("submit_block_to_db: key: {:?}", block_height); + + // Open or create a RocksDB database. + let path = Path::new("epool"); + let db = DB::open_default(&path).expect("Failed to open database"); + db.put(block_height.to_be_bytes(), txs).expect("Failed to put tx into RocksDB"); + db.put("sync_target".as_bytes(), serialize(&block_height).expect("Failed to serialize")) + .expect("Failed to put tx into RocksDB"); +} + +pub fn submit_to_db(key: &[u8], value: Vec) { + println!("submit_to_db: key: {:?} value: {:?}", key, value); + // Open or create a RocksDB database. + let path = Path::new("epool"); + let db = DB::open_default(&path).expect("Failed to open database"); + db.put(key, value).expect("Failed to put tx into RocksDB"); +} + +pub fn retrieve_from_db(key: &[u8]) -> Vec { + let path = Path::new("epool"); + let db = DB::open_default(&path).expect("Failed to open database"); + let value = db.get(key).expect("msg"); + let result = match value { + Some(val) => val, + None => Vec::new(), // Provide a default value (empty vector) for the None arm + }; + result +} + +async fn submit_to_da(data: &str) -> String { + dotenv().ok(); + let da_host = env::var("DA_HOST").expect("DA_HOST must be set"); + let da_namespace = env::var("DA_NAMESPACE").expect("DA_NAMESPACE must be set"); + let da_auth_token = env::var("DA_AUTH_TOKEN").expect("DA_AUTH_TOKEN must be set"); + let da_auth = format!("Bearer {}", da_auth_token); + + let client = Client::new(); + let rpc_request = json!({ + "jsonrpc": "2.0", + "method": "blob.Submit", + "params": [ + [ + { + "namespace": da_namespace, + "data": data, + } + ] + ], + "id": 1, + }); + + let uri = std::env::var("da_uri").unwrap_or(da_host.into()); + + let req = Request::post(uri.as_str()) + .header(AUTHORIZATION, HeaderValue::from_str(da_auth.as_str()).unwrap()) + .header(CONTENT_TYPE, HeaderValue::from_static("application/json")) + .body(Body::from(rpc_request.to_string())) + .unwrap(); + let response_future = client.request(req); + + let resp = tokio::time::timeout(Duration::from_secs(100), response_future) + .await + .map_err(|_| "Request timed out") + .unwrap() + .unwrap(); + + let response_body = hyper::body::to_bytes(resp.into_body()).await.unwrap(); + let parsed: Value = serde_json::from_slice(&response_body).unwrap(); + + // println!("stompesi - {:?}", parsed); + + if let Some(result_value) = parsed.get("result") { result_value.to_string() } else { "".to_string() } +} + +// From_ne_bytes. In Rust we have standard library functions to convert from bytes to integers and +// back again. The from_le_bytes, from_ne_bytes and from_be_bytes functions can be used. To convert +// from an integer back into an array of bytes, we can use functions like to_ne_bytes. The correct +// Endianness must be selected. + +pub fn sync_with_da() { + // Open or create a RocksDB database. + println!("sync_with_da started"); + // let path = Path::new("epool"); + // println!("open db"); + // let db = DB::open_default(&path).expect("Failed to open database"); + + let three_seconds = time::Duration::from_millis(3000); + + // let mut i: u64 = 0; + // println!("submit sync into db"); + // submit_to_db("sync".as_bytes(), serialize(&i).expect("Failed to serialize")); + + let mut txs; + let mut sync_target_bin: Vec; + let mut sync_target: u64 = 0; + let mut sync_bin; + let mut sync: u64 = 0; + let mut block_height = "".to_string(); + + submit_to_db("sync".as_bytes(), serialize(&sync).expect("Failed to serialize")); + submit_to_db("sync_target".as_bytes(), serialize(&sync_target).expect("Failed to serialize")); + // Create the runtime + let rt = Runtime::new().unwrap(); + + loop { + println!("sleep 3 seconds"); + thread::sleep(three_seconds); + sync_target_bin = retrieve_from_db("sync_target".as_bytes()); + sync_bin = retrieve_from_db("sync".as_bytes()); + println!("sync_target_bin: {:?}, sync_bin: {:?}", sync_target_bin, sync_bin); + sync_target = deserialize(&sync_target_bin).expect("Failed to deserialize"); + sync = deserialize(&sync_target_bin).expect("Failed to deserialize"); + + println!("this is sync_target_bin outside if: {:?}", sync_target_bin); + if sync_target != sync { + println!("this is sync_bin inside if: {:?}", sync_bin); + println!("this is sync_target inside if: {:?}", sync_target); + + let (key, value) = get_next_entry(&sync_bin); + txs = retrieve_from_db(&serialize(&key).expect("Failed to serialize")); + let s = match str::from_utf8(&txs) { + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence: {}", e), + }; + rt.block_on(async { + block_height = submit_to_da(&encode_data_to_base64(s)).await; + println!("this is the block height from DA: {}", block_height); + }); + println!("try to submit block no. {:?}", key); + if !(block_height.len() == 0) { + submit_to_db( + "sync".as_bytes(), + serialize(&key).expect( + "Failed to + serialize", + ), + ); + println!("last synced block is updated to {:?}", key); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = 4; //= sumit_to(2, 2); + assert_eq!(result, 4); + } +} diff --git a/crates/client/transaction-pool/Cargo.toml b/crates/client/transaction-pool/Cargo.toml index 675679df73..d3abe0c1b9 100644 --- a/crates/client/transaction-pool/Cargo.toml +++ b/crates/client/transaction-pool/Cargo.toml @@ -43,4 +43,7 @@ vdf = { workspace = true } starknet-crypto = { workspace = true, default-features = false, features = [ "alloc", ] } -tokio = { workspace = true } \ No newline at end of file +tokio = { workspace = true } + +sync-block = { path = "../sync-block" } +bincode = "1.3.3" \ No newline at end of file diff --git a/crates/client/transaction-pool/src/graph/encrypted_pool.rs b/crates/client/transaction-pool/src/graph/encrypted_pool.rs index 1cd1b978f8..19410f720e 100644 --- a/crates/client/transaction-pool/src/graph/encrypted_pool.rs +++ b/crates/client/transaction-pool/src/graph/encrypted_pool.rs @@ -4,7 +4,9 @@ use std::collections::HashMap; +use bincode::{deserialize, serialize}; use mp_starknet::transaction::types::{EncryptedInvokeTransaction, Transaction}; +use sync_block; // use sp_runtime::traits::Block as BlockT; // pub struct NewBlock(Box); @@ -312,8 +314,11 @@ impl EncryptedPool { pub fn close(&mut self, block_height: u64) -> Result { match self.txs.get_mut(&block_height) { Some(txs) => { - // println!("close!"); - Ok(txs.close()) + let raw_txs: Vec<_> = txs.encrypted_pool.values().cloned().collect(); + let serialized_tx = serialize(&raw_txs).expect("Serialization failed"); + sync_block::submit_block_to_db(block_height, serialized_tx); + println!("Bye world"); + txs.close(); } None => Err("not exist? cannot close"), } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 42b89df783..dcdf588b99 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -27,6 +27,7 @@ clap = { workspace = true, features = ["derive"] } futures = { workspace = true, features = ["thread-pool"] } log = { workspace = true } serde = { workspace = true } +sync-block = { path = "../../crates/client/sync-block" } frame-system = { workspace = true } sc-cli = { workspace = true } diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 236c53e732..ebd448d110 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -11,6 +11,12 @@ mod genesis_block; mod rpc; mod starknet; +use std::thread; +use sync_block; + fn main() -> sc_cli::Result<()> { + thread::spawn(move || { + sync_block::sync_with_da(); + }); command::run() }