diff --git a/Cargo.toml b/Cargo.toml index 7346814..bdfb08e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wvm-archiver" -version = "0.2.4" +version = "0.2.5" edition = "2021" description = "EL data pipeline for WVM testnet v0" authors = ["charmful0x "] diff --git a/README.md b/README.md index ed6e5d9..f564448 100644 --- a/README.md +++ b/README.md @@ -105,14 +105,16 @@ As mentioned, PlanetScale is used for cloud indexing, which allows a WeaveVM Arc ### WeaveVM Archiver node instance info ```bash -curl -X GET https://the_network.wvm.network/info +curl -X GET https://the_network.wvm.network/v1/info ``` **returns:** ```rs pub struct InfoServerResponse { - first_archived_block: Option, - last_archived_block: Option, + first_livesync_archived_block: Option, + last_livesync_archived_block: Option, + first_backfill_archived_block: Option, + last_backfill_archived_block: Option, livesync_start_block: u64, total_archived_blocks: u64, blocks_behind_live_blockheight: u64, @@ -129,7 +131,7 @@ pub struct InfoServerResponse { ### WeaveVM Archiver all networks info: ```bash -curl -X GET https://the_network.wvm.network/all-networks-info +curl -X GET https://the_network.wvm.network/v1/all-networks-info ``` **returns:** @@ -141,13 +143,13 @@ Vec ### Retrieve the WVM archive TXID for a given EVM block ID ```bash -curl -X GET https://the_network.wvm.network/block/$BLOCK_ID +curl -X GET https://the_network.wvm.network/v1/block/$BLOCK_ID ``` ### Decode the WVM archived block data for a given EVM block ID (return original block data in JSON format) ```bash -curl -X GET https://the_network.wvm.network/block/raw/$BLOCK_ID +curl -X GET https://the_network.wvm.network/v1/block/raw/$BLOCK_ID ``` ## License diff --git a/db_schema.sql b/db_schema.sql index db3cc73..7b1d4f9 100644 --- a/db_schema.sql +++ b/db_schema.sql @@ -1,7 +1,14 @@ DROP TABLE IF EXISTS WeaveVMArchiver; +DROP TABLE IF EXISTS WeaveVMArchiverBackfill; CREATE TABLE IF NOT EXISTS WeaveVMArchiver ( Id INT AUTO_INCREMENT PRIMARY KEY, NetworkBlockId INT UNIQUE, WeaveVMArchiveTxid VARCHAR(66) UNIQUE ); + +CREATE TABLE IF NOT EXISTS WeaveVMArchiverBackfill ( + Id INT AUTO_INCREMENT PRIMARY KEY, + NetworkBlockId INT UNIQUE, + WeaveVMArchiveTxid VARCHAR(66) UNIQUE +); diff --git a/src/main.rs b/src/main.rs index e367459..23a22b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ use crate::utils::archive_block::sprint_blocks_archiving; -use crate::utils::backfill_genesis::backfill_from_genesis; use crate::utils::schema::Network; use crate::utils::server_handlers::{ handle_all_networks_info, handle_block, handle_block_raw, handle_info, handle_weave_gm, @@ -16,18 +15,18 @@ async fn main() -> shuttle_axum::ShuttleAxum { // server routes let router = Router::new() .route("/", get(handle_weave_gm)) - .route("/info", get(handle_info)) - .route("/block/:id", get(handle_block)) - .route("/block/raw/:id", get(handle_block_raw)) - .route("/all-networks-info", get(handle_all_networks_info)); + .route("/v1/info", get(handle_info)) + .route("/v1/block/:id", get(handle_block)) + .route("/v1/block/raw/:id", get(handle_block_raw)) + .route("/v1/all-networks-info", get(handle_all_networks_info)); // poll blocks & sprint archiving in parallel task::spawn(async move { - sprint_blocks_archiving().await; + sprint_blocks_archiving(false).await; }); // backfill blocks from genesis till network.start_block task::spawn(async move { - backfill_from_genesis().await.unwrap(); + sprint_blocks_archiving(true).await; }); Ok(router.into()) } diff --git a/src/utils/archive_block.rs b/src/utils/archive_block.rs index 35e23c9..967c470 100644 --- a/src/utils/archive_block.rs +++ b/src/utils/archive_block.rs @@ -1,3 +1,4 @@ +use crate::utils::env_var::get_env_var; use crate::utils::get_block::{by_number, get_current_block_number}; use crate::utils::planetscale::{ps_archive_block, ps_get_latest_block_id}; use crate::utils::schema::{Block, Network}; @@ -7,9 +8,19 @@ use std::{thread, time::Duration}; pub async fn archive(block_number: Option, is_backfill: bool) -> Result { let network = Network::config(); - let start_block = network.start_block; - let block_to_archive = block_number.unwrap_or(start_block); + let block_to_archive = block_number.unwrap_or(if is_backfill { + get_env_var("backfill_start_block") + .unwrap() + .parse::() + .unwrap() + } else { + network.start_block + }); + // assert to ensure backfill and livesync dont collide at the continuity checkpoint + if is_backfill { + assert!(block_number.unwrap() < network.start_block) + } // fetch block let block_data = by_number(block_to_archive).await.unwrap(); // serialize response into Block struct @@ -20,9 +31,6 @@ pub async fn archive(block_number: Option, is_backfill: bool) -> Result, is_backfill: bool) -> Result Result<(), Error> { - let network = Network::config(); - let config_start_block = network.start_block; - let backfill_block_start: String = get_env_var("backfill_start_block").unwrap_or(0.to_string()); - let backfill_blocks: Vec = - (backfill_block_start.parse::().unwrap()..=config_start_block).collect(); +// pub async fn backfill_from_genesis() -> Result<(), Error> { +// let network = Network::config(); +// let config_start_block = network.start_block; +// let backfill_block_start: String = get_env_var("backfill_start_block").unwrap_or(0.to_string()); +// let backfill_blocks: Vec = +// (backfill_block_start.parse::().unwrap()..=config_start_block).collect(); - if config_start_block == 0 { - return Ok(()); - } +// if config_start_block == 0 { +// return Ok(()); +// } - for &block_number in backfill_blocks.iter() { - println!("\n{}", "#".repeat(100)); - println!( - "\nARCHIVING **BACKFILL** BLOCK #{} of Network {} -- ChainId: {}\n", - &block_number, network.name, network.network_chain_id - ); - let archive_txid = archive(Some(block_number), true).await.unwrap(); - let _ = ps_archive_block(&block_number, &archive_txid).await; - println!("\n{}", "#".repeat(100)); - } +// for &block_number in backfill_blocks.iter() { +// println!("\n{}", "#".repeat(100)); +// println!( +// "\nARCHIVING **BACKFILL** BLOCK #{} of Network {} -- ChainId: {}\n", +// &block_number, network.name, network.network_chain_id +// ); +// let archive_txid = archive(Some(block_number), true).await.unwrap(); +// let _ = ps_archive_block(&block_number, &archive_txid, true).await; +// println!("\n{}", "#".repeat(100)); +// } - Ok(()) -} +// Ok(()) +// } diff --git a/src/utils/planetscale.rs b/src/utils/planetscale.rs index 0b21fd6..23ad101 100644 --- a/src/utils/planetscale.rs +++ b/src/utils/planetscale.rs @@ -17,18 +17,27 @@ async fn ps_init() -> PSConnection { pub async fn ps_archive_block( network_block_id: &u64, wvm_calldata_txid: &str, + is_backfill: bool, ) -> Result<(), Error> { // format to the table VAR(66) limitation let wvm_calldata_txid = wvm_calldata_txid.trim_matches('"'); let conn = ps_init().await; + let mut ps_table_name = get_env_var("ps_table_name").unwrap(); - let res = query( - "INSERT INTO WeaveVMArchiverMetis(NetworkBlockId, WeaveVMArchiveTxid) VALUES($0, \"$1\")", - ) - .bind(network_block_id) - .bind(wvm_calldata_txid) - .execute(&conn) - .await; + if is_backfill { + ps_table_name = format!("{}{}", ps_table_name, "Backfill") + } + + let query_str = format!( + "INSERT INTO {}(NetworkBlockId, WeaveVMArchiveTxid) VALUES($0, \"$1\")", + ps_table_name + ); + + let res = query(&query_str) + .bind(network_block_id) + .bind(wvm_calldata_txid) + .execute(&conn) + .await; match res { Ok(result) => { @@ -42,15 +51,33 @@ pub async fn ps_archive_block( } } -pub async fn ps_get_latest_block_id() -> u64 { +pub async fn ps_get_latest_block_id(is_backfill: bool) -> u64 { let network = Network::config(); let conn = ps_init().await; - let latest_archived: u64 = - query("SELECT MAX(NetworkBlockId) AS LatestNetworkBlockId FROM WeaveVMArchiverMetis;") - .fetch_scalar(&conn) - .await - .unwrap_or(network.start_block); + let mut ps_table_name = get_env_var("ps_table_name").unwrap(); + if is_backfill { + ps_table_name = format!("{}{}", ps_table_name, "Backfill") + } + + let query_str = format!( + "SELECT MAX(NetworkBlockId) AS LatestNetworkBlockId FROM {};", + ps_table_name + ); + + let default_start_block = if is_backfill { + get_env_var("backfill_start_block") + .unwrap() + .parse::() + .unwrap() + } else { + network.start_block + }; + + let latest_archived: u64 = query(&query_str) + .fetch_scalar(&conn) + .await + .unwrap_or(default_start_block); // return latest archived block in planetscale + 1 // so the process can start archiving from latest_archived + 1 latest_archived + 1 @@ -58,20 +85,43 @@ pub async fn ps_get_latest_block_id() -> u64 { pub async fn ps_get_archived_block_txid(id: u64) -> Value { let conn = ps_init().await; + let network = Network::config(); + let ps_table_name = get_env_var("ps_table_name").unwrap(); - let query_formatted = format!( - "SELECT WeaveVMArchiveTxid FROM WeaveVMArchiverMetis WHERE NetworkBlockId = {}", - id + let query_formatted_livesync = format!( + "SELECT WeaveVMArchiveTxid FROM {} WHERE NetworkBlockId = {}", + ps_table_name, id + ); + + let query_formatted_backfill = format!( + "SELECT WeaveVMArchiveTxid FROM {}Backfill WHERE NetworkBlockId = {}", + ps_table_name, id ); + + // query from tables based on block id existence in the livesync of backfill + let query_formatted = if id >= network.start_block { + query_formatted_livesync + } else { + query_formatted_backfill + }; + let txid: PsGetBlockTxid = query(&query_formatted).fetch_one(&conn).await.unwrap(); let res = serde_json::json!(txid); res } -pub async fn ps_get_blocks_extremes(extreme: &str) -> Value { +pub async fn ps_get_blocks_extremes(extreme: &str, is_backfill: bool) -> Value { let conn = ps_init().await; + let mut ps_table_name = get_env_var("ps_table_name").unwrap(); + + ps_table_name = if is_backfill { + format!("{ps_table_name}Backfill") + } else { + ps_table_name + }; + let query_type = match extreme { "first" => "ASC", "last" => "DESC", @@ -79,8 +129,8 @@ pub async fn ps_get_blocks_extremes(extreme: &str) -> Value { }; let query_formatted = format!( - "SELECT NetworkBlockId FROM WeaveVMArchiverMetis ORDER BY NetworkBlockId {} LIMIT 1;", - query_type + "SELECT NetworkBlockId FROM {} ORDER BY NetworkBlockId {} LIMIT 1;", + ps_table_name, query_type ); let query: PsGetExtremeBlock = query(&query_formatted).fetch_one(&conn).await.unwrap(); @@ -89,10 +139,19 @@ pub async fn ps_get_blocks_extremes(extreme: &str) -> Value { res } -pub async fn ps_get_archived_blocks_count() -> PsGetTotalBlocksCount { +pub async fn ps_get_archived_blocks_count() -> u64 { let conn = ps_init().await; - - let query_formatted = "SELECT MAX(Id) FROM WeaveVMArchiverMetis;"; - let count: PsGetTotalBlocksCount = query(&query_formatted).fetch_one(&conn).await.unwrap(); - count + let ps_table_name = get_env_var("ps_table_name").unwrap(); + + let query_formatted_livesync = format!("SELECT MAX(Id) FROM {};", ps_table_name); + let query_formatted_backfill = format!("SELECT MAX(Id) FROM {}Backfill;", ps_table_name); + let count_livesync: PsGetTotalBlocksCount = query(&query_formatted_livesync) + .fetch_one(&conn) + .await + .unwrap(); + let count_backfill: PsGetTotalBlocksCount = query(&query_formatted_backfill) + .fetch_one(&conn) + .await + .unwrap(); + count_livesync.count + count_backfill.count } diff --git a/src/utils/schema.rs b/src/utils/schema.rs index fef68ae..563c535 100644 --- a/src/utils/schema.rs +++ b/src/utils/schema.rs @@ -129,8 +129,10 @@ pub struct PsGetTotalBlocksCount { #[derive(Debug, Serialize)] pub struct InfoServerResponse { - first_archived_block: Option, - last_archived_block: Option, + first_livesync_archived_block: Option, + last_livesync_archived_block: Option, + first_backfill_archived_block: Option, + last_backfill_archived_block: Option, livesync_start_block: u64, total_archived_blocks: u64, blocks_behind_live_blockheight: u64, @@ -144,7 +146,12 @@ pub struct InfoServerResponse { } impl InfoServerResponse { - pub async fn new(first_block: Option, last_block: Option) -> InfoServerResponse { + pub async fn new( + first_livesync_block: Option, + last_livesync_block: Option, + first_backfill_block: Option, + last_backfill_block: Option, + ) -> InfoServerResponse { let network = Network::config(); // balances let archiver_balance = get_balance_of(network.archiver_address.clone()).await; @@ -152,17 +159,19 @@ impl InfoServerResponse { let backfill_balance = get_balance_of(network.backfill_address.clone()).await; let backfill_balance = Some(backfill_balance).unwrap_or("0".into()); // blocks stats - let total_archived_blocks = (ps_get_archived_blocks_count().await).count; + let total_archived_blocks = ps_get_archived_blocks_count().await; let current_live_block = get_current_block_number().await.as_u64(); - let blocks_behind_live_blockheight = current_live_block - last_block.unwrap_or(0); + let blocks_behind_live_blockheight = current_live_block - last_livesync_block.unwrap_or(0); let instance: InfoServerResponse = InfoServerResponse { archiver_balance, backfill_balance, blocks_behind_live_blockheight, livesync_start_block: network.start_block, - first_archived_block: first_block, - last_archived_block: last_block, + first_livesync_archived_block: first_livesync_block, + first_backfill_archived_block: first_backfill_block, + last_livesync_archived_block: last_livesync_block, + last_backfill_archived_block: last_backfill_block, total_archived_blocks, archiver_address: network.archiver_address, backfill_address: network.backfill_address, diff --git a/src/utils/server_handlers.rs b/src/utils/server_handlers.rs index 1542648..f8c5b7a 100644 --- a/src/utils/server_handlers.rs +++ b/src/utils/server_handlers.rs @@ -5,8 +5,6 @@ use crate::utils::transaction::decode_wvm_tx_data; use axum::{extract::Path, response::Json}; use serde_json::Value; -use super::all_networks; - pub async fn handle_weave_gm() -> &'static str { "WeaveGM!" } @@ -17,12 +15,17 @@ pub async fn handle_block(Path(id): Path) -> Json { } pub async fn handle_info() -> Json { - let first = ps_get_blocks_extremes("first").await; - let last = ps_get_blocks_extremes("last").await; - - let first_block = first.get("block_id").unwrap().as_u64(); - let last_block = last.get("block_id").unwrap().as_u64(); - let stats_res: InfoServerResponse = InfoServerResponse::new(first_block, last_block).await; + let first_livesync = ps_get_blocks_extremes("first", false).await; + let last_livesync = ps_get_blocks_extremes("last", false).await; + let first_backfill = ps_get_blocks_extremes("first", true).await; + let last_backfill = ps_get_blocks_extremes("last", true).await; + + let first_livesync = first_livesync.get("block_id").unwrap().as_u64(); + let last_livesync = last_livesync.get("block_id").unwrap().as_u64(); + let first_backfill = first_backfill.get("block_id").unwrap().as_u64(); + let last_backfill = last_backfill.get("block_id").unwrap().as_u64(); + let stats_res: InfoServerResponse = + InfoServerResponse::new(first_livesync, last_livesync, first_backfill, last_backfill).await; let res = serde_json::to_value(&stats_res).unwrap(); Json(res) diff --git a/src/utils/transaction.rs b/src/utils/transaction.rs index 7b5686d..e27fe8e 100644 --- a/src/utils/transaction.rs +++ b/src/utils/transaction.rs @@ -58,7 +58,10 @@ pub async fn get_balance_of(addr: String) -> U256 { let network = Network::config(); let provider = Network::provider(&network, true).await; let address = addr.parse::
().unwrap_or(Address::zero()); - let balance = provider.get_balance(address, None).await.unwrap_or(U256::zero()); + let balance = provider + .get_balance(address, None) + .await + .unwrap_or(U256::zero()); balance } @@ -73,7 +76,7 @@ pub async fn send_transaction( address_from, address_to ); // 2.14 Gwei - let gas_price = U256::from(2_140_000_000); + let gas_price = U256::from(1_000_000_000); let tx = TransactionRequest::new() .to(address_to.clone()) .value(U256::from(utils::parse_ether(0)?))