diff --git a/.env.example b/.env.example index 56ab087..56e6b85 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,6 @@ archiver_pk=... -network="./networks/your_network.json" \ No newline at end of file +network="./networks/your_network.json" + +DATABASE_HOST="" +DATABASE_USERNAME="" +DATABASE_PASSWORD="" \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 97d5c6a..6229386 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0.86" borsh = "1.5.1" borsh-derive = "1.5.1" brotli = "6.0.0" @@ -11,6 +12,7 @@ dotenv = "0.15.0" ethers = "2.0.14" ethers-core = "2.0.14" ethers-providers = "2.0.14" +planetscale-driver = "0.5.1" serde = "1.0.205" serde_json = "1.0.122" tokio = { version = "1.39.2", features = ["full"] } diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6c72ede --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM rust:1.73 + +COPY ./ ./ + +RUN cargo build --release + +CMD ["./target/release/wvm-archiver"] \ No newline at end of file diff --git a/db_schema.sql b/db_schema.sql new file mode 100644 index 0000000..cd29309 --- /dev/null +++ b/db_schema.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS WeaveVMArchiver; + +CREATE TABLE IF NOT EXISTS WeaveVMArchiver ( + Id INT AUTO_INCREMENT PRIMARY KEY, + NetworkBlockId INT UNIQUE, + WeaveVMArchiveTxid VARCHAR(68) UNIQUE +); diff --git a/src/main.rs b/src/main.rs index ef966a4..dfdb127 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,6 @@ -use ethers_core::k256::pkcs8::der::asn1::Null; - use crate::utils::archive_block::archive; use crate::utils::schema::Network; - +use crate::utils::planetscale::{ps_archive_block, ps_get_latest_block_id}; use std::thread; use std::time::Duration; @@ -11,15 +9,21 @@ mod utils; async fn main() { let network = Network::config(); let block_time = network.block_time; - let mut start_block = network.start_block; + let ps_latest_archived_block = ps_get_latest_block_id().await; + // it defaults to network.start_block if planestcale fails + let mut start_block = ps_latest_archived_block; println!("\n{:#?}\n\n", network); - // poll block & archive + // poll blocks & archive loop { println!("\n{}", "#".repeat(100)); - println!("\nARCHIVING BLOCK #{} of Network {} -- ChainId: {}\n", start_block, network.name, network.network_chain_id); - archive(Some(start_block)).await; + println!( + "\nARCHIVING BLOCK #{} of Network {} -- ChainId: {}\n", + start_block, network.name, network.network_chain_id + ); + let archive_txid = archive(Some(start_block)).await.unwrap(); + let _ = ps_archive_block(&start_block, &archive_txid).await; start_block += 1; println!("\n{}", "#".repeat(100)); thread::sleep(Duration::from_secs(block_time.into())); diff --git a/src/utils/archive_block.rs b/src/utils/archive_block.rs index 0389fab..4c31fc5 100644 --- a/src/utils/archive_block.rs +++ b/src/utils/archive_block.rs @@ -1,13 +1,13 @@ +use crate::utils::get_block::by_number; use crate::utils::schema::{Block, Network}; use crate::utils::transaction::send_wvm_calldata; -use crate::utils::get_block::by_number; - +use anyhow::Error; -pub async fn archive(block_number: Option) { +pub async fn archive(block_number: Option) -> Result { let network = Network::config(); let start_block = network.start_block; let block_to_archive = block_number.unwrap_or(start_block); - + // fetch block let block_data = by_number(block_to_archive).await.unwrap(); // serialize response into Block struct @@ -17,10 +17,10 @@ pub async fn archive(block_number: Option) { let borsh_res = Block::borsh_ser(&block_data_struct); // brotli compress the borsh serialized block let brotli_res = Block::brotli_compress(&borsh_res); - + // println!("borsh vec length: {:?}", borsh_res.len()); // println!("brotli vec length: {:?}", brotli_res.len()); - let _ = send_wvm_calldata(brotli_res).await; - -} \ No newline at end of file + let txid = send_wvm_calldata(brotli_res).await.unwrap(); + Ok(txid) +} diff --git a/src/utils/env_var.rs b/src/utils/env_var.rs index b38353c..e7c7de6 100644 --- a/src/utils/env_var.rs +++ b/src/utils/env_var.rs @@ -1,7 +1,7 @@ -use std::env; use dotenv::dotenv; +use std::env; -pub fn get_env_var(key: &str) -> Result{ +pub fn get_env_var(key: &str) -> Result { dotenv().ok(); match env::var(key) { Ok(val) => Ok(val), diff --git a/src/utils/get_block.rs b/src/utils/get_block.rs index 8127145..186819a 100644 --- a/src/utils/get_block.rs +++ b/src/utils/get_block.rs @@ -1,10 +1,10 @@ -use ethers_providers::{Middleware, ProviderError}; -use ethers_core::types::{Block, H256}; use crate::utils::schema::Network; +use ethers_core::types::{Block, H256}; +use ethers_providers::{Middleware, ProviderError}; pub async fn by_number(number: u64) -> Result>, ProviderError> { let network: Network = Network::config(); - let provider = Network::provider(&network, false).await; + let provider = Network::provider(&network, false).await; let block = provider.get_block(number).await; match block { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 06080d8..db4f6ad 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,6 @@ -pub mod schema; +pub mod archive_block; +pub mod env_var; pub mod get_block; +pub mod schema; pub mod transaction; -pub mod env_var; -pub mod archive_block; \ No newline at end of file +pub mod planetscale; diff --git a/src/utils/planetscale.rs b/src/utils/planetscale.rs new file mode 100644 index 0000000..3c13a61 --- /dev/null +++ b/src/utils/planetscale.rs @@ -0,0 +1,54 @@ +use std::net; + +use crate::utils::env_var::get_env_var; +use crate::utils::schema::Network; +use anyhow::Error; +use planetscale_driver::{PSConnection, query}; + +async fn ps_init() -> PSConnection { + + let host = get_env_var("DATABASE_HOST").unwrap(); + let username = get_env_var("DATABASE_USERNAME").unwrap(); + let password = get_env_var("DATABASE_PASSWORD").unwrap(); + + let conn: PSConnection = PSConnection::new( + &host, + &username, + &password, + ); + + conn +} + +pub async fn ps_archive_block(network_block_id: &u64, wvm_calldata_txid: &str) -> Result<(), Error> { + // format to the table VAR(66) limitation + let wvm_calldata_txid = wvm_calldata_txid.trim_matches('"'); + let conn = ps_init().await; + + let res = query("INSERT INTO WeaveVMArchiver(NetworkBlockId, WeaveVMArchiveTxid) VALUES($0, \"$1\")") + .bind(network_block_id) + .bind(wvm_calldata_txid) + .execute(&conn) + .await; + + match res { + Ok(result) => { + println!("Insert operation was successful: {:?}", result); + Ok(result) + } + Err(e) => { + println!("Error occurred during insert operation: {:?}", e); + Err(e) + } + } +} + +pub async fn ps_get_latest_block_id() -> u64 { + let network = Network::config(); + let conn = ps_init().await; + + let latest_archived: u64 = query("SELECT MAX(NetworkBlockId) AS LatestNetworkBlockId FROM WeaveVMArchiver;").fetch_scalar(&conn).await.unwrap_or(network.start_block); + // return latest archived block in planetscale + 1 + // so the process can start archiving from latest_archived + 1 + latest_archived + 1 +} \ No newline at end of file diff --git a/src/utils/schema.rs b/src/utils/schema.rs index 9805239..4cb3e69 100644 --- a/src/utils/schema.rs +++ b/src/utils/schema.rs @@ -1,12 +1,12 @@ +use crate::utils::env_var::get_env_var; +use borsh::to_vec; +use borsh_derive::{BorshDeserialize, BorshSerialize}; +use ethers_providers::{Http, Provider}; use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::convert::TryFrom; use std::fs::File; use std::io::{Read, Write}; -use ethers_providers::{Http, Provider}; -use std::convert::TryFrom; -use serde_json::Value; -use borsh_derive::{BorshDeserialize, BorshSerialize}; -use borsh::to_vec; -use crate::utils::env_var::get_env_var; #[derive(Serialize, Deserialize, Debug)] pub struct Network { @@ -18,7 +18,7 @@ pub struct Network { pub block_time: u32, pub start_block: u64, // as per ethers_provider pub archiver_address: String, - pub archive_pool_address: String + pub archive_pool_address: String, } impl Network { @@ -26,13 +26,13 @@ impl Network { let network_config = get_env_var("network").unwrap(); let mut file = File::open(network_config).unwrap(); let mut data = String::new(); - + file.read_to_string(&mut data).unwrap(); - - let network: Network = serde_json::from_str(&data).unwrap(); + + let network: Network = serde_json::from_str(&data).unwrap(); // cannot self send data assert_ne!(network.archiver_address, network.archive_pool_address); - network + network } pub async fn provider(&self, rpc: bool) -> Provider { @@ -44,41 +44,39 @@ impl Network { } else { target_rpc = &network.network_rpc } - let provider: Provider = Provider::::try_from( - target_rpc - ).expect("could not instantiate HTTP Provider"); - + let provider: Provider = + Provider::::try_from(target_rpc).expect("could not instantiate HTTP Provider"); + provider } } - #[derive(Debug, Deserialize, Serialize, BorshSerialize, BorshDeserialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Block { - pub base_fee_per_gas: Option, // "baseFeePerGas" - pub blob_gas_used: Option, // "blobGasUsed" - pub difficulty: String, // "difficulty" - pub excess_blob_gas: Option, // "excessBlobGas" - pub extra_data: String, // "extraData" - pub gas_limit: String, // "gasLimit" - pub gas_used: String, // "gasUsed" - pub hash: String, // "hash" - pub logs_bloom: String, // "logsBloom" - pub miner: String, // "miner" - pub mix_hash: String, // "mixHash" - pub nonce: String, // "nonce" - pub number: String, // "number" - pub parent_beacon_block_root: Option, // "parentBeaconBlockRoot" - pub parent_hash: String, // "parentHash" - pub receipts_root: String, // "receiptsRoot" - pub seal_fields: Vec, // "sealFields" as an array of strings - pub sha3_uncles: String, // "sha3Uncles" - pub size: String, // "size" - pub state_root: String, // "stateRoot" - pub timestamp: String, // "timestamp" - pub total_difficulty: String, // "totalDifficulty" - pub transactions: Vec, // "transactions" as an array of strings + pub base_fee_per_gas: Option, // "baseFeePerGas" + pub blob_gas_used: Option, // "blobGasUsed" + pub difficulty: String, // "difficulty" + pub excess_blob_gas: Option, // "excessBlobGas" + pub extra_data: String, // "extraData" + pub gas_limit: String, // "gasLimit" + pub gas_used: String, // "gasUsed" + pub hash: String, // "hash" + pub logs_bloom: String, // "logsBloom" + pub miner: String, // "miner" + pub mix_hash: String, // "mixHash" + pub nonce: String, // "nonce" + pub number: String, // "number" + pub parent_beacon_block_root: Option, // "parentBeaconBlockRoot" + pub parent_hash: String, // "parentHash" + pub receipts_root: String, // "receiptsRoot" + pub seal_fields: Vec, // "sealFields" as an array of strings + pub sha3_uncles: String, // "sha3Uncles" + pub size: String, // "size" + pub state_root: String, // "stateRoot" + pub timestamp: String, // "timestamp" + pub total_difficulty: String, // "totalDifficulty" + pub transactions: Vec, // "transactions" as an array of strings } impl Block { @@ -86,16 +84,11 @@ impl Block { serde_json::from_value::(value) } pub fn brotli_compress(input: &[u8]) -> Vec { - let mut writer = brotli::CompressorWriter::new( - Vec::new(), - 4096, - 11, - 22); + let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 11, 22); writer.write_all(input).unwrap(); writer.into_inner() } - pub fn borsh_ser(input: &Block) -> Vec { to_vec(input).unwrap() - } -} \ No newline at end of file + } +} diff --git a/src/utils/transaction.rs b/src/utils/transaction.rs index 9332597..1412018 100644 --- a/src/utils/transaction.rs +++ b/src/utils/transaction.rs @@ -1,38 +1,40 @@ -use crate::utils::schema::Network; use crate::utils::env_var::get_env_var; -use ethers_providers::{Provider, Http}; -use ethers::{utils, prelude::*}; +use crate::utils::schema::Network; +use ethers::{prelude::*, utils}; +use ethers_providers::{Http, Provider}; type Client = SignerMiddleware, Wallet>; -pub async fn send_wvm_calldata(block_data: Vec) -> Result<(), Box> { +pub async fn send_wvm_calldata(block_data: Vec) -> Result> { let network = Network::config(); let provider = Network::provider(&network, true).await; let private_key = get_env_var("archiver_pk").unwrap(); let wallet: LocalWallet = private_key .parse::()? - .with_chain_id(network.wvm_chain_id); + .with_chain_id(network.wvm_chain_id); let client = SignerMiddleware::new(provider.clone(), wallet.clone()); let address_from = network.archiver_address.parse::
()?; let address_to = network.archive_pool_address.parse::
()?; - - // print_balances(&provider, &address_from, &address_to).await?; - send_transaction(&client, &address_from, &address_to, block_data).await?; + // check archiver tWVM balance (non-zero) + assert_non_zero_balance(&provider, &address_from).await; + // send calldata tx to WeaveVM + let txid = send_transaction(&client, &address_from, &address_to, block_data).await?; - Ok(()) + Ok(txid) } -async fn print_balances(provider: &Provider, address_from: &Address, address_to: &Address) -> Result<(), Box> { - let balance_from = provider.get_balance(address_from.clone(), None).await?; - let balance_to = provider.get_balance(address_to.clone(), None).await?; - - println!("{} balance: {} tWVM", address_from, balance_from); - println!("{} balance: {} tWVM", address_to, balance_to); - Ok(()) +async fn assert_non_zero_balance(provider: &Provider, address: &Address) { + let balance = provider.get_balance(address.clone(), None).await.unwrap(); + assert!(balance > 0.into()); } -async fn send_transaction(client: &Client, address_from: &Address, address_to: &Address, block_data: Vec) -> Result<(), Box> { +async fn send_transaction( + client: &Client, + address_from: &Address, + address_to: &Address, + block_data: Vec, +) -> Result> { println!( "\nArchiving block data from archiver: {} to archive pool: {}", address_from, address_to @@ -45,8 +47,8 @@ async fn send_transaction(client: &Client, address_from: &Address, address_to: & let tx = client.send_transaction(tx, None).await?.await?; let json_tx = serde_json::json!(tx); - println!("\nWeaveVM Archiving TXID: {}", json_tx["transactionHash"]); - - Ok(()) -} + let txid = json_tx["transactionHash"].to_string(); + println!("\nWeaveVM Archiving TXID: {}", txid); + Ok(txid) +} \ No newline at end of file