Skip to content

Commit

Permalink
Merge pull request #7 from radiusxyz/feature/sync-block
Browse files Browse the repository at this point in the history
there is no point in conflict.
  • Loading branch information
raynear committed Sep 19, 2023
2 parents c0bd2ee + 73dad98 commit 3abb8aa
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 8 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ cairo_venv
# idea
.idea

.env
.env

/epool
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"crates/client/mapping-sync",
"crates/client/storage",
"crates/client/transaction-pool",
"crates/client/sync-block",
]

[profile.release]
Expand Down
8 changes: 4 additions & 4 deletions crates/client/block-proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ where
let mut lock = epool.lock().await;
if lock.is_enabled() {
let block_height = self.parent_number.to_string().parse::<u64>().unwrap() + 1;
let encrypted_tx_pool_size: usize = lock.len(block_height);
/* let encrypted_tx_pool_size: usize = lock.len(block_height);
if encrypted_tx_pool_size > 0 {
let encrypted_invoke_transactions = lock.get_encrypted_tx_pool(block_height);
Expand All @@ -624,7 +624,7 @@ where
submit_to_da(&encoded_data_for_da);
// let da_block_height = submit_to_da(&encoded_data_for_da).await;
// println!("this is the block_height: {}", da_block_height);
}
} */
lock.init_tx_pool(block_height);
}
}
Expand Down Expand Up @@ -1215,7 +1215,7 @@ mod tests {
}
}

async fn submit_to_da(data: &str) -> String {
/* async fn submit_to_da(data: &str) -> String {
dotenv().ok();
let da_host = env::var("DA_HOST").expect("DA_HOST must be set");
let da_namespace = env::var("DA_NAMESPACE").expect("DA_NAMESPACE must be set");
Expand Down Expand Up @@ -1267,4 +1267,4 @@ fn encode_data_to_base64(original: &str) -> String {
// Convert bytes to base64
let base64_str: String = general_purpose::STANDARD.encode(&bytes);
base64_str
}
} */
19 changes: 19 additions & 0 deletions crates/client/sync-block/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "sync-block"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
repository.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rocksdb = "0.21.0"
base64 = "0.21.2"
serde = { version = "1.0.175", default-features = false }
serde_json = { version = "1.0.100", features = ["std"] }
serde_with = { version = "2.3.3", default-features = false }
bincode = "1.3.3"
hyper = {version = "0.14.27", features = ["full"]}
tokio = { version = "1.30.0", features = ["full"] }
dotenv = "0.15"
209 changes: 209 additions & 0 deletions crates/client/sync-block/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// let a: Vec<_> = txs.encrypted_pool.values().cloned().collect();
// println!("self.txs.get_mut: {:?}", a);

use std::collections::HashMap;
use std::path::Path;
use std::str::Bytes;
use std::time::Duration;
use std::{env, str, thread, time};

use base64::engine::general_purpose;
use base64::Engine as _;
use bincode::{deserialize, serialize};
use dotenv::dotenv;
use hyper::header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use hyper::{Body, Client, Request};
use rocksdb::{Error, IteratorMode, DB};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio;
use tokio::runtime::Runtime;

fn get_next_entry(start_key: &[u8]) -> (Box<[u8]>, Box<[u8]>) {
// Create an iterator starting from the key after the specified start_key.
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");

let mut iter = db.iterator(IteratorMode::From(start_key, rocksdb::Direction::Forward));

// Iterate to get the next entry.
iter.next().expect("Something wrong with iter.next function").unwrap()
}

// fn get_next_entry(db: &DB, start_key: &[u8]) -> Result<Option<(Box<[u8]>, Box<[u8]>)>, Error> {
// Create an iterator starting from the key after the specified start_key.
// let path = Path::new("epool");
// let db = DB::open_default(&path).expect("Failed to open database");
//
// let mut iter = db.iterator(IteratorMode::From(start_key, rocksdb::Direction::Forward));
//
// Iterate to get the next entry.
// if let Some(Ok((key, value))) = iter.next() {
// return Ok(Some((key.to_vec().into_boxed_slice(), value.to_vec().into_boxed_slice())));
// }
//
// Ok(None) // Return Ok(None) if no next entry is found.
// }

fn encode_data_to_base64(original: &str) -> String {
// Convert string to bytes
let bytes = original.as_bytes();
// Convert bytes to base64
let base64_str: String = general_purpose::STANDARD.encode(&bytes);
base64_str
}

pub fn submit_block_to_db(block_height: u64, txs: Vec<u8>) {
println!("submit_block_to_db: key: {:?}", block_height);

// Open or create a RocksDB database.
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");
db.put(block_height.to_be_bytes(), txs).expect("Failed to put tx into RocksDB");
db.put("sync_target".as_bytes(), serialize(&block_height).expect("Failed to serialize"))
.expect("Failed to put tx into RocksDB");
}

pub fn submit_to_db(key: &[u8], value: Vec<u8>) {
println!("submit_to_db: key: {:?} value: {:?}", key, value);
// Open or create a RocksDB database.
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");
db.put(key, value).expect("Failed to put tx into RocksDB");
}

pub fn retrieve_from_db(key: &[u8]) -> Vec<u8> {
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");
let value = db.get(key).expect("msg");
let result = match value {
Some(val) => val,
None => Vec::new(), // Provide a default value (empty vector) for the None arm
};
result
}

async fn submit_to_da(data: &str) -> String {
dotenv().ok();
let da_host = env::var("DA_HOST").expect("DA_HOST must be set");
let da_namespace = env::var("DA_NAMESPACE").expect("DA_NAMESPACE must be set");
let da_auth_token = env::var("DA_AUTH_TOKEN").expect("DA_AUTH_TOKEN must be set");
let da_auth = format!("Bearer {}", da_auth_token);

let client = Client::new();
let rpc_request = json!({
"jsonrpc": "2.0",
"method": "blob.Submit",
"params": [
[
{
"namespace": da_namespace,
"data": data,
}
]
],
"id": 1,
});

let uri = std::env::var("da_uri").unwrap_or(da_host.into());

let req = Request::post(uri.as_str())
.header(AUTHORIZATION, HeaderValue::from_str(da_auth.as_str()).unwrap())
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
.body(Body::from(rpc_request.to_string()))
.unwrap();
let response_future = client.request(req);

let resp = tokio::time::timeout(Duration::from_secs(100), response_future)
.await
.map_err(|_| "Request timed out")
.unwrap()
.unwrap();

let response_body = hyper::body::to_bytes(resp.into_body()).await.unwrap();
let parsed: Value = serde_json::from_slice(&response_body).unwrap();

// println!("stompesi - {:?}", parsed);

if let Some(result_value) = parsed.get("result") { result_value.to_string() } else { "".to_string() }
}

// From_ne_bytes. In Rust we have standard library functions to convert from bytes to integers and
// back again. The from_le_bytes, from_ne_bytes and from_be_bytes functions can be used. To convert
// from an integer back into an array of bytes, we can use functions like to_ne_bytes. The correct
// Endianness must be selected.

pub fn sync_with_da() {
// Open or create a RocksDB database.
println!("sync_with_da started");
// let path = Path::new("epool");
// println!("open db");
// let db = DB::open_default(&path).expect("Failed to open database");

let three_seconds = time::Duration::from_millis(3000);

// let mut i: u64 = 0;
// println!("submit sync into db");
// submit_to_db("sync".as_bytes(), serialize(&i).expect("Failed to serialize"));

let mut txs;
let mut sync_target_bin: Vec<u8>;
let mut sync_target: u64 = 0;
let mut sync_bin;
let mut sync: u64 = 0;
let mut block_height = "".to_string();

submit_to_db("sync".as_bytes(), serialize(&sync).expect("Failed to serialize"));
submit_to_db("sync_target".as_bytes(), serialize(&sync_target).expect("Failed to serialize"));
// Create the runtime
let rt = Runtime::new().unwrap();

loop {
println!("sleep 3 seconds");
thread::sleep(three_seconds);
sync_target_bin = retrieve_from_db("sync_target".as_bytes());
sync_bin = retrieve_from_db("sync".as_bytes());
println!("sync_target_bin: {:?}, sync_bin: {:?}", sync_target_bin, sync_bin);
sync_target = deserialize(&sync_target_bin).expect("Failed to deserialize");
sync = deserialize(&sync_target_bin).expect("Failed to deserialize");

println!("this is sync_target_bin outside if: {:?}", sync_target_bin);
if sync_target != sync {
println!("this is sync_bin inside if: {:?}", sync_bin);
println!("this is sync_target inside if: {:?}", sync_target);

let (key, value) = get_next_entry(&sync_bin);
txs = retrieve_from_db(&serialize(&key).expect("Failed to serialize"));
let s = match str::from_utf8(&txs) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
rt.block_on(async {
block_height = submit_to_da(&encode_data_to_base64(s)).await;
println!("this is the block height from DA: {}", block_height);
});
println!("try to submit block no. {:?}", key);
if !(block_height.len() == 0) {
submit_to_db(
"sync".as_bytes(),
serialize(&key).expect(
"Failed to
serialize",
),
);
println!("last synced block is updated to {:?}", key);
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = 4; //= sumit_to(2, 2);
assert_eq!(result, 4);
}
}
5 changes: 4 additions & 1 deletion crates/client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ vdf = { workspace = true }
starknet-crypto = { workspace = true, default-features = false, features = [
"alloc",
] }
tokio = { workspace = true }
tokio = { workspace = true }

sync-block = { path = "../sync-block" }
bincode = "1.3.3"
9 changes: 7 additions & 2 deletions crates/client/transaction-pool/src/graph/encrypted_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

use std::collections::HashMap;

use bincode::{deserialize, serialize};
use mp_starknet::transaction::types::{EncryptedInvokeTransaction, Transaction};
use sync_block;
// use sp_runtime::traits::Block as BlockT;

// pub struct NewBlock(Box<dyn BlockT>);
Expand Down Expand Up @@ -312,8 +314,11 @@ impl EncryptedPool {
pub fn close(&mut self, block_height: u64) -> Result<bool, &str> {
match self.txs.get_mut(&block_height) {
Some(txs) => {
// println!("close!");
Ok(txs.close())
let raw_txs: Vec<_> = txs.encrypted_pool.values().cloned().collect();
let serialized_tx = serialize(&raw_txs).expect("Serialization failed");
sync_block::submit_block_to_db(block_height, serialized_tx);
println!("Bye world");
txs.close();
}
None => Err("not exist? cannot close"),
}
Expand Down
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ clap = { workspace = true, features = ["derive"] }
futures = { workspace = true, features = ["thread-pool"] }
log = { workspace = true }
serde = { workspace = true }
sync-block = { path = "../../crates/client/sync-block" }

frame-system = { workspace = true }
sc-cli = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ mod genesis_block;
mod rpc;
mod starknet;

use std::thread;
use sync_block;

fn main() -> sc_cli::Result<()> {
thread::spawn(move || {
sync_block::sync_with_da();
});
command::run()
}

0 comments on commit 3abb8aa

Please sign in to comment.