Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wvm-archiver"
version = "0.2.4"
version = "0.2.5"
edition = "2021"
description = "EL data pipeline for WVM testnet v0"
authors = ["charmful0x <rani@decent.land>"]
Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,16 @@ As mentioned, PlanetScale is used for cloud indexing, which allows a WeaveVM Arc
### WeaveVM Archiver node instance info

```bash
curl -X GET https://the_network.wvm.network/info
curl -X GET https://the_network.wvm.network/v1/info
```
**returns:**

```rs
pub struct InfoServerResponse {
first_archived_block: Option<u64>,
last_archived_block: Option<u64>,
first_livesync_archived_block: Option<u64>,
last_livesync_archived_block: Option<u64>,
first_backfill_archived_block: Option<u64>,
last_backfill_archived_block: Option<u64>,
livesync_start_block: u64,
total_archived_blocks: u64,
blocks_behind_live_blockheight: u64,
Expand All @@ -129,7 +131,7 @@ pub struct InfoServerResponse {
### WeaveVM Archiver all networks info:

```bash
curl -X GET https://the_network.wvm.network/all-networks-info
curl -X GET https://the_network.wvm.network/v1/all-networks-info
```

**returns:**
Expand All @@ -141,13 +143,13 @@ Vec<Network>
### Retrieve the WVM archive TXID for a given EVM block ID

```bash
curl -X GET https://the_network.wvm.network/block/$BLOCK_ID
curl -X GET https://the_network.wvm.network/v1/block/$BLOCK_ID
```

### Decode the WVM archived block data for a given EVM block ID (return original block data in JSON format)

```bash
curl -X GET https://the_network.wvm.network/block/raw/$BLOCK_ID
curl -X GET https://the_network.wvm.network/v1/block/raw/$BLOCK_ID
```

## License
Expand Down
7 changes: 7 additions & 0 deletions db_schema.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
DROP TABLE IF EXISTS WeaveVMArchiver;
DROP TABLE IF EXISTS WeaveVMArchiverBackfill;

CREATE TABLE IF NOT EXISTS WeaveVMArchiver (
Id INT AUTO_INCREMENT PRIMARY KEY,
NetworkBlockId INT UNIQUE,
WeaveVMArchiveTxid VARCHAR(66) UNIQUE
);

CREATE TABLE IF NOT EXISTS WeaveVMArchiverBackfill (
Id INT AUTO_INCREMENT PRIMARY KEY,
NetworkBlockId INT UNIQUE,
WeaveVMArchiveTxid VARCHAR(66) UNIQUE
);
13 changes: 6 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::utils::archive_block::sprint_blocks_archiving;
use crate::utils::backfill_genesis::backfill_from_genesis;
use crate::utils::schema::Network;
use crate::utils::server_handlers::{
handle_all_networks_info, handle_block, handle_block_raw, handle_info, handle_weave_gm,
Expand All @@ -16,18 +15,18 @@ async fn main() -> shuttle_axum::ShuttleAxum {
// server routes
let router = Router::new()
.route("/", get(handle_weave_gm))
.route("/info", get(handle_info))
.route("/block/:id", get(handle_block))
.route("/block/raw/:id", get(handle_block_raw))
.route("/all-networks-info", get(handle_all_networks_info));
.route("/v1/info", get(handle_info))
.route("/v1/block/:id", get(handle_block))
.route("/v1/block/raw/:id", get(handle_block_raw))
.route("/v1/all-networks-info", get(handle_all_networks_info));

// poll blocks & sprint archiving in parallel
task::spawn(async move {
sprint_blocks_archiving().await;
sprint_blocks_archiving(false).await;
});
// backfill blocks from genesis till network.start_block
task::spawn(async move {
backfill_from_genesis().await.unwrap();
sprint_blocks_archiving(true).await;
});
Ok(router.into())
}
38 changes: 21 additions & 17 deletions src/utils/archive_block.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::utils::env_var::get_env_var;
use crate::utils::get_block::{by_number, get_current_block_number};
use crate::utils::planetscale::{ps_archive_block, ps_get_latest_block_id};
use crate::utils::schema::{Block, Network};
Expand All @@ -7,9 +8,19 @@ use std::{thread, time::Duration};

pub async fn archive(block_number: Option<u64>, is_backfill: bool) -> Result<String, Error> {
let network = Network::config();
let start_block = network.start_block;
let block_to_archive = block_number.unwrap_or(start_block);
let block_to_archive = block_number.unwrap_or(if is_backfill {
get_env_var("backfill_start_block")
.unwrap()
.parse::<u64>()
.unwrap()
} else {
network.start_block
});

// assert to ensure backfill and livesync dont collide at the continuity checkpoint
if is_backfill {
assert!(block_number.unwrap() < network.start_block)
}
// fetch block
let block_data = by_number(block_to_archive).await.unwrap();
// serialize response into Block struct
Expand All @@ -20,9 +31,6 @@ pub async fn archive(block_number: Option<u64>, is_backfill: bool) -> Result<Str
// brotli compress the borsh serialized block
let brotli_res = Block::brotli_compress(&borsh_res);

// println!("borsh vec length: {:?}", borsh_res.len());
// println!("brotli vec length: {:?}", brotli_res.len());

let txid = if is_backfill {
send_wvm_calldata_backfill(brotli_res).await.unwrap()
} else {
Expand All @@ -32,27 +40,23 @@ pub async fn archive(block_number: Option<u64>, is_backfill: bool) -> Result<Str
Ok(txid)
}

pub async fn sprint_blocks_archiving() {
pub async fn sprint_blocks_archiving(is_backfill: bool) {
let network = Network::config();
let block_time = network.block_time;
let mut current_block_number = get_current_block_number().await.as_u64();
let ps_latest_archived_block = ps_get_latest_block_id().await;
// it defaults to network.start_block if planestcale fails
let mut start_block = if ps_latest_archived_block < network.start_block {
network.start_block
} else {
ps_latest_archived_block
};
// it defaults to network.start_block or env.backfill_start_block
// based on is_backfill if planestcale fails
let mut start_block = ps_get_latest_block_id(is_backfill).await;

loop {
if start_block < current_block_number - 1 {
println!("\n{}", "#".repeat(100));
println!(
"\nARCHIVING BLOCK #{} of Network {} -- ChainId: {}\n",
start_block, network.name, network.network_chain_id
"\nARCHIVING BLOCK #{} of Network {} -- ChainId: {} -- IS_BACKFILL: {}\n",
start_block, network.name, network.network_chain_id, is_backfill
);
let archive_txid = archive(Some(start_block), false).await.unwrap();
let _ = ps_archive_block(&start_block, &archive_txid).await;
let archive_txid = archive(Some(start_block), is_backfill).await.unwrap();
let _ = ps_archive_block(&start_block, &archive_txid, is_backfill).await;
start_block += 1;
println!("\n{}", "#".repeat(100));
} else {
Expand Down
52 changes: 26 additions & 26 deletions src/utils/backfill_genesis.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
use crate::utils::archive_block::archive;
use crate::utils::env_var::get_env_var;
use crate::utils::planetscale::ps_archive_block;
use crate::utils::schema::Network;
use anyhow::{Error, Ok};
// use crate::utils::archive_block::archive;
// use crate::utils::env_var::get_env_var;
// use crate::utils::planetscale::ps_archive_block;
// use crate::utils::schema::Network;
// use anyhow::{Error, Ok};

pub async fn backfill_from_genesis() -> Result<(), Error> {
let network = Network::config();
let config_start_block = network.start_block;
let backfill_block_start: String = get_env_var("backfill_start_block").unwrap_or(0.to_string());
let backfill_blocks: Vec<u64> =
(backfill_block_start.parse::<u64>().unwrap()..=config_start_block).collect();
// pub async fn backfill_from_genesis() -> Result<(), Error> {
// let network = Network::config();
// let config_start_block = network.start_block;
// let backfill_block_start: String = get_env_var("backfill_start_block").unwrap_or(0.to_string());
// let backfill_blocks: Vec<u64> =
// (backfill_block_start.parse::<u64>().unwrap()..=config_start_block).collect();

if config_start_block == 0 {
return Ok(());
}
// if config_start_block == 0 {
// return Ok(());
// }

for &block_number in backfill_blocks.iter() {
println!("\n{}", "#".repeat(100));
println!(
"\nARCHIVING **BACKFILL** BLOCK #{} of Network {} -- ChainId: {}\n",
&block_number, network.name, network.network_chain_id
);
let archive_txid = archive(Some(block_number), true).await.unwrap();
let _ = ps_archive_block(&block_number, &archive_txid).await;
println!("\n{}", "#".repeat(100));
}
// for &block_number in backfill_blocks.iter() {
// println!("\n{}", "#".repeat(100));
// println!(
// "\nARCHIVING **BACKFILL** BLOCK #{} of Network {} -- ChainId: {}\n",
// &block_number, network.name, network.network_chain_id
// );
// let archive_txid = archive(Some(block_number), true).await.unwrap();
// let _ = ps_archive_block(&block_number, &archive_txid, true).await;
// println!("\n{}", "#".repeat(100));
// }

Ok(())
}
// Ok(())
// }
107 changes: 83 additions & 24 deletions src/utils/planetscale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@ async fn ps_init() -> PSConnection {
pub async fn ps_archive_block(
network_block_id: &u64,
wvm_calldata_txid: &str,
is_backfill: bool,
) -> Result<(), Error> {
// format to the table VAR(66) limitation
let wvm_calldata_txid = wvm_calldata_txid.trim_matches('"');
let conn = ps_init().await;
let mut ps_table_name = get_env_var("ps_table_name").unwrap();

let res = query(
"INSERT INTO WeaveVMArchiverMetis(NetworkBlockId, WeaveVMArchiveTxid) VALUES($0, \"$1\")",
)
.bind(network_block_id)
.bind(wvm_calldata_txid)
.execute(&conn)
.await;
if is_backfill {
ps_table_name = format!("{}{}", ps_table_name, "Backfill")
}

let query_str = format!(
"INSERT INTO {}(NetworkBlockId, WeaveVMArchiveTxid) VALUES($0, \"$1\")",
ps_table_name
);

let res = query(&query_str)
.bind(network_block_id)
.bind(wvm_calldata_txid)
.execute(&conn)
.await;

match res {
Ok(result) => {
Expand All @@ -42,45 +51,86 @@ pub async fn ps_archive_block(
}
}

pub async fn ps_get_latest_block_id() -> u64 {
pub async fn ps_get_latest_block_id(is_backfill: bool) -> u64 {
let network = Network::config();
let conn = ps_init().await;

let latest_archived: u64 =
query("SELECT MAX(NetworkBlockId) AS LatestNetworkBlockId FROM WeaveVMArchiverMetis;")
.fetch_scalar(&conn)
.await
.unwrap_or(network.start_block);
let mut ps_table_name = get_env_var("ps_table_name").unwrap();
if is_backfill {
ps_table_name = format!("{}{}", ps_table_name, "Backfill")
}

let query_str = format!(
"SELECT MAX(NetworkBlockId) AS LatestNetworkBlockId FROM {};",
ps_table_name
);

let default_start_block = if is_backfill {
get_env_var("backfill_start_block")
.unwrap()
.parse::<u64>()
.unwrap()
} else {
network.start_block
};

let latest_archived: u64 = query(&query_str)
.fetch_scalar(&conn)
.await
.unwrap_or(default_start_block);
// return latest archived block in planetscale + 1
// so the process can start archiving from latest_archived + 1
latest_archived + 1
}

pub async fn ps_get_archived_block_txid(id: u64) -> Value {
let conn = ps_init().await;
let network = Network::config();
let ps_table_name = get_env_var("ps_table_name").unwrap();

let query_formatted = format!(
"SELECT WeaveVMArchiveTxid FROM WeaveVMArchiverMetis WHERE NetworkBlockId = {}",
id
let query_formatted_livesync = format!(
"SELECT WeaveVMArchiveTxid FROM {} WHERE NetworkBlockId = {}",
ps_table_name, id
);

let query_formatted_backfill = format!(
"SELECT WeaveVMArchiveTxid FROM {}Backfill WHERE NetworkBlockId = {}",
ps_table_name, id
);

// query from tables based on block id existence in the livesync of backfill
let query_formatted = if id >= network.start_block {
query_formatted_livesync
} else {
query_formatted_backfill
};

let txid: PsGetBlockTxid = query(&query_formatted).fetch_one(&conn).await.unwrap();

let res = serde_json::json!(txid);
res
}

pub async fn ps_get_blocks_extremes(extreme: &str) -> Value {
pub async fn ps_get_blocks_extremes(extreme: &str, is_backfill: bool) -> Value {
let conn = ps_init().await;

let mut ps_table_name = get_env_var("ps_table_name").unwrap();

ps_table_name = if is_backfill {
format!("{ps_table_name}Backfill")
} else {
ps_table_name
};

let query_type = match extreme {
"first" => "ASC",
"last" => "DESC",
_ => panic!("invalid extreme value. Use 'first' or 'last'."),
};

let query_formatted = format!(
"SELECT NetworkBlockId FROM WeaveVMArchiverMetis ORDER BY NetworkBlockId {} LIMIT 1;",
query_type
"SELECT NetworkBlockId FROM {} ORDER BY NetworkBlockId {} LIMIT 1;",
ps_table_name, query_type
);

let query: PsGetExtremeBlock = query(&query_formatted).fetch_one(&conn).await.unwrap();
Expand All @@ -89,10 +139,19 @@ pub async fn ps_get_blocks_extremes(extreme: &str) -> Value {
res
}

pub async fn ps_get_archived_blocks_count() -> PsGetTotalBlocksCount {
pub async fn ps_get_archived_blocks_count() -> u64 {
let conn = ps_init().await;

let query_formatted = "SELECT MAX(Id) FROM WeaveVMArchiverMetis;";
let count: PsGetTotalBlocksCount = query(&query_formatted).fetch_one(&conn).await.unwrap();
count
let ps_table_name = get_env_var("ps_table_name").unwrap();

let query_formatted_livesync = format!("SELECT MAX(Id) FROM {};", ps_table_name);
let query_formatted_backfill = format!("SELECT MAX(Id) FROM {}Backfill;", ps_table_name);
let count_livesync: PsGetTotalBlocksCount = query(&query_formatted_livesync)
.fetch_one(&conn)
.await
.unwrap();
let count_backfill: PsGetTotalBlocksCount = query(&query_formatted_backfill)
.fetch_one(&conn)
.await
.unwrap();
count_livesync.count + count_backfill.count
}
Loading