Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sync block #7

Merged
merged 5 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ cairo_venv
# idea
.idea

.env
.env

/epool
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"crates/client/mapping-sync",
"crates/client/storage",
"crates/client/transaction-pool",
"crates/client/sync-block",
]

[profile.release]
Expand Down
8 changes: 4 additions & 4 deletions crates/client/block-proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ where
let mut lock = epool.lock().await;
if lock.is_enabled() {
let block_height = self.parent_number.to_string().parse::<u64>().unwrap() + 1;
let encrypted_tx_pool_size: usize = lock.len(block_height);
/* let encrypted_tx_pool_size: usize = lock.len(block_height);

if encrypted_tx_pool_size > 0 {
let encrypted_invoke_transactions = lock.get_encrypted_tx_pool(block_height);
Expand All @@ -624,7 +624,7 @@ where
submit_to_da(&encoded_data_for_da);
// let da_block_height = submit_to_da(&encoded_data_for_da).await;
// println!("this is the block_height: {}", da_block_height);
}
} */
lock.init_tx_pool(block_height);
}
}
Expand Down Expand Up @@ -1215,7 +1215,7 @@ mod tests {
}
}

async fn submit_to_da(data: &str) -> String {
/* async fn submit_to_da(data: &str) -> String {
dotenv().ok();
let da_host = env::var("DA_HOST").expect("DA_HOST must be set");
let da_namespace = env::var("DA_NAMESPACE").expect("DA_NAMESPACE must be set");
Expand Down Expand Up @@ -1267,4 +1267,4 @@ fn encode_data_to_base64(original: &str) -> String {
// Convert bytes to base64
let base64_str: String = general_purpose::STANDARD.encode(&bytes);
base64_str
}
} */
19 changes: 19 additions & 0 deletions crates/client/sync-block/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "sync-block"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
repository.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rocksdb = "0.21.0"
base64 = "0.21.2"
serde = { version = "1.0.175", default-features = false }
serde_json = { version = "1.0.100", features = ["std"] }
serde_with = { version = "2.3.3", default-features = false }
bincode = "1.3.3"
hyper = {version = "0.14.27", features = ["full"]}
tokio = { version = "1.30.0", features = ["full"] }
dotenv = "0.15"
209 changes: 209 additions & 0 deletions crates/client/sync-block/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// let a: Vec<_> = txs.encrypted_pool.values().cloned().collect();
// println!("self.txs.get_mut: {:?}", a);

use std::collections::HashMap;
use std::path::Path;
use std::str::Bytes;
use std::time::Duration;
use std::{env, str, thread, time};

use base64::engine::general_purpose;
use base64::Engine as _;
use bincode::{deserialize, serialize};
use dotenv::dotenv;
use hyper::header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use hyper::{Body, Client, Request};
use rocksdb::{Error, IteratorMode, DB};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio;
use tokio::runtime::Runtime;

fn get_next_entry(start_key: &[u8]) -> (Box<[u8]>, Box<[u8]>) {
// Create an iterator starting from the key after the specified start_key.
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");

let mut iter = db.iterator(IteratorMode::From(start_key, rocksdb::Direction::Forward));

// Iterate to get the next entry.
iter.next().expect("Something wrong with iter.next function").unwrap()
}

// fn get_next_entry(db: &DB, start_key: &[u8]) -> Result<Option<(Box<[u8]>, Box<[u8]>)>, Error> {
// Create an iterator starting from the key after the specified start_key.
// let path = Path::new("epool");
// let db = DB::open_default(&path).expect("Failed to open database");
//
// let mut iter = db.iterator(IteratorMode::From(start_key, rocksdb::Direction::Forward));
//
// Iterate to get the next entry.
// if let Some(Ok((key, value))) = iter.next() {
// return Ok(Some((key.to_vec().into_boxed_slice(), value.to_vec().into_boxed_slice())));
// }
//
// Ok(None) // Return Ok(None) if no next entry is found.
// }

fn encode_data_to_base64(original: &str) -> String {
// Convert string to bytes
let bytes = original.as_bytes();
// Convert bytes to base64
let base64_str: String = general_purpose::STANDARD.encode(&bytes);
base64_str
}

pub fn submit_block_to_db(block_height: u64, txs: Vec<u8>) {
println!("submit_block_to_db: key: {:?}", block_height);

// Open or create a RocksDB database.
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");
db.put(block_height.to_be_bytes(), txs).expect("Failed to put tx into RocksDB");
db.put("sync_target".as_bytes(), serialize(&block_height).expect("Failed to serialize"))
.expect("Failed to put tx into RocksDB");
}

pub fn submit_to_db(key: &[u8], value: Vec<u8>) {
println!("submit_to_db: key: {:?} value: {:?}", key, value);
// Open or create a RocksDB database.
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");
db.put(key, value).expect("Failed to put tx into RocksDB");
}

pub fn retrieve_from_db(key: &[u8]) -> Vec<u8> {
let path = Path::new("epool");
let db = DB::open_default(&path).expect("Failed to open database");
let value = db.get(key).expect("msg");
let result = match value {
Some(val) => val,
None => Vec::new(), // Provide a default value (empty vector) for the None arm
};
result
}

async fn submit_to_da(data: &str) -> String {
dotenv().ok();
let da_host = env::var("DA_HOST").expect("DA_HOST must be set");
let da_namespace = env::var("DA_NAMESPACE").expect("DA_NAMESPACE must be set");
let da_auth_token = env::var("DA_AUTH_TOKEN").expect("DA_AUTH_TOKEN must be set");
let da_auth = format!("Bearer {}", da_auth_token);

let client = Client::new();
let rpc_request = json!({
"jsonrpc": "2.0",
"method": "blob.Submit",
"params": [
[
{
"namespace": da_namespace,
"data": data,
}
]
],
"id": 1,
});

let uri = std::env::var("da_uri").unwrap_or(da_host.into());

let req = Request::post(uri.as_str())
.header(AUTHORIZATION, HeaderValue::from_str(da_auth.as_str()).unwrap())
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
.body(Body::from(rpc_request.to_string()))
.unwrap();
let response_future = client.request(req);

let resp = tokio::time::timeout(Duration::from_secs(100), response_future)
.await
.map_err(|_| "Request timed out")
.unwrap()
.unwrap();

let response_body = hyper::body::to_bytes(resp.into_body()).await.unwrap();
let parsed: Value = serde_json::from_slice(&response_body).unwrap();

// println!("stompesi - {:?}", parsed);

if let Some(result_value) = parsed.get("result") { result_value.to_string() } else { "".to_string() }
}

// From_ne_bytes. In Rust we have standard library functions to convert from bytes to integers and
// back again. The from_le_bytes, from_ne_bytes and from_be_bytes functions can be used. To convert
// from an integer back into an array of bytes, we can use functions like to_ne_bytes. The correct
// Endianness must be selected.

pub fn sync_with_da() {
// Open or create a RocksDB database.
println!("sync_with_da started");
// let path = Path::new("epool");
// println!("open db");
// let db = DB::open_default(&path).expect("Failed to open database");

let three_seconds = time::Duration::from_millis(3000);

// let mut i: u64 = 0;
// println!("submit sync into db");
// submit_to_db("sync".as_bytes(), serialize(&i).expect("Failed to serialize"));

let mut txs;
let mut sync_target_bin: Vec<u8>;
let mut sync_target: u64 = 0;
let mut sync_bin;
let mut sync: u64 = 0;
let mut block_height = "".to_string();

submit_to_db("sync".as_bytes(), serialize(&sync).expect("Failed to serialize"));
submit_to_db("sync_target".as_bytes(), serialize(&sync_target).expect("Failed to serialize"));
// Create the runtime
let rt = Runtime::new().unwrap();

loop {
println!("sleep 3 seconds");
thread::sleep(three_seconds);
sync_target_bin = retrieve_from_db("sync_target".as_bytes());
sync_bin = retrieve_from_db("sync".as_bytes());
println!("sync_target_bin: {:?}, sync_bin: {:?}", sync_target_bin, sync_bin);
sync_target = deserialize(&sync_target_bin).expect("Failed to deserialize");
sync = deserialize(&sync_target_bin).expect("Failed to deserialize");

println!("this is sync_target_bin outside if: {:?}", sync_target_bin);
if sync_target != sync {
println!("this is sync_bin inside if: {:?}", sync_bin);
println!("this is sync_target inside if: {:?}", sync_target);

let (key, value) = get_next_entry(&sync_bin);
txs = retrieve_from_db(&serialize(&key).expect("Failed to serialize"));
let s = match str::from_utf8(&txs) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
rt.block_on(async {
block_height = submit_to_da(&encode_data_to_base64(s)).await;
println!("this is the block height from DA: {}", block_height);
});
println!("try to submit block no. {:?}", key);
if !(block_height.len() == 0) {
submit_to_db(
"sync".as_bytes(),
serialize(&key).expect(
"Failed to
serialize",
),
);
println!("last synced block is updated to {:?}", key);
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = 4; //= sumit_to(2, 2);
assert_eq!(result, 4);
}
}
5 changes: 4 additions & 1 deletion crates/client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ vdf = { workspace = true }
starknet-crypto = { workspace = true, default-features = false, features = [
"alloc",
] }
tokio = { workspace = true }
tokio = { workspace = true }

sync-block = { path = "../sync-block" }
bincode = "1.3.3"
9 changes: 7 additions & 2 deletions crates/client/transaction-pool/src/graph/encrypted_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

use std::collections::HashMap;

use bincode::{deserialize, serialize};
use mp_starknet::transaction::types::{EncryptedInvokeTransaction, Transaction};
use sync_block;
// use sp_runtime::traits::Block as BlockT;

// pub struct NewBlock(Box<dyn BlockT>);
Expand Down Expand Up @@ -312,8 +314,11 @@ impl EncryptedPool {
pub fn close(&mut self, block_height: u64) -> Result<bool, &str> {
match self.txs.get_mut(&block_height) {
Some(txs) => {
// println!("close!");
Ok(txs.close())
let raw_txs: Vec<_> = txs.encrypted_pool.values().cloned().collect();
let serialized_tx = serialize(&raw_txs).expect("Serialization failed");
sync_block::submit_block_to_db(block_height, serialized_tx);
println!("Bye world");
txs.close();
}
None => Err("not exist? cannot close"),
}
Expand Down
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ clap = { workspace = true, features = ["derive"] }
futures = { workspace = true, features = ["thread-pool"] }
log = { workspace = true }
serde = { workspace = true }
sync-block = { path = "../../crates/client/sync-block" }

frame-system = { workspace = true }
sc-cli = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ mod genesis_block;
mod rpc;
mod starknet;

use std::thread;
use sync_block;

fn main() -> sc_cli::Result<()> {
thread::spawn(move || {
sync_block::sync_with_da();
});
command::run()
}
Loading