Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Gas Oracle): Remove Fetcher singleton #1360

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/lib/zksync_core/src/l1_gas_price/gas_adjuster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ pub struct GasAdjuster<E> {
pub(super) statistics: GasStatistics,
pub(super) config: GasAdjusterConfig,
eth_client: E,
native_token_fetcher_dyn: Arc<dyn ConversionRateFetcher>,
native_token_fetcher: Arc<dyn ConversionRateFetcher>,
}

impl<E: EthInterface> GasAdjuster<E> {
pub async fn new(
eth_client: E,
config: GasAdjusterConfig,
native_token_fetcher_dyn: Arc<dyn ConversionRateFetcher>,
native_token_fetcher: Arc<dyn ConversionRateFetcher>,
) -> Result<Self, Error> {
// Subtracting 1 from the "latest" block number to prevent errors in case
// the info about the latest block is not yet present on the node.
Expand All @@ -49,7 +49,7 @@ impl<E: EthInterface> GasAdjuster<E> {
statistics: GasStatistics::new(config.max_base_fee_samples, current_block, &history),
eth_client,
config,
native_token_fetcher_dyn,
native_token_fetcher,
})
}

Expand Down Expand Up @@ -111,6 +111,13 @@ impl<E: EthInterface> GasAdjuster<E> {
tracing::warn!("Cannot add the base fee to gas statistics: {}", err);
}

if let Err(err) = self.native_token_fetcher.update().await {
tracing::warn!(
"Error when trying to fetch the native erc20 conversion rate: {}",
err
);
}

tokio::time::sleep(self.config.poll_period()).await;
}
Ok(())
Expand All @@ -130,7 +137,7 @@ impl<E: EthInterface> L1GasPriceProvider for GasAdjuster<E> {
let calculated_price =
(self.config.internal_l1_pricing_multiplier * effective_gas_price as f64) as u64;

let conversion_rate = self.native_token_fetcher_dyn.conversion_rate().unwrap_or(1);
let conversion_rate = self.native_token_fetcher.conversion_rate().unwrap_or(1);

self.bound_gas_price(calculated_price) * conversion_rate
}
Expand Down
8 changes: 4 additions & 4 deletions core/lib/zksync_core/src/l1_gas_price/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ pub struct GasAdjusterSingleton {
web3_url: String,
gas_adjuster_config: GasAdjusterConfig,
singleton: OnceCell<anyhow::Result<Arc<GasAdjuster<QueryClient>>>>,
erc20_fetcher_dyn: Arc<dyn ConversionRateFetcher>,
native_token_fetcher: Arc<dyn ConversionRateFetcher>,
}

impl GasAdjusterSingleton {
pub fn new(
web3_url: String,
gas_adjuster_config: GasAdjusterConfig,
erc20_fetcher_dyn: Arc<dyn ConversionRateFetcher>,
native_token_fetcher: Arc<dyn ConversionRateFetcher>,
) -> Self {
Self {
web3_url,
gas_adjuster_config,
singleton: OnceCell::new(),
erc20_fetcher_dyn,
native_token_fetcher,
}
}

Expand All @@ -43,7 +43,7 @@ impl GasAdjusterSingleton {
let adjuster = GasAdjuster::new(
query_client.clone(),
self.gas_adjuster_config,
self.erc20_fetcher_dyn.clone(),
self.native_token_fetcher.clone(),
)
.await
.context("GasAdjuster::new()")?;
Expand Down
50 changes: 14 additions & 36 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ use crate::{
l1_gas_price::{GasAdjusterSingleton, L1GasPriceProvider},
metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig},
metrics::{InitStage, APP_METRICS},
native_token_fetcher::{
ConversionRateFetcher, NativeTokenFetcherSingleton, NoOpConversionRateFetcher,
},
native_token_fetcher::{ConversionRateFetcher, NativeTokenFetcher, NoOpConversionRateFetcher},
state_keeper::{
create_state_keeper, MempoolFetcher, MempoolGuard, MiniblockSealer, SequencerSealer,
},
Expand Down Expand Up @@ -347,40 +345,28 @@ pub async fn initialize_components(
task_futures.push(conversion_rate_task);
};

// spawn the native ERC20 fetcher if it is enabled
let mut fetcher_component = if components.contains(&Component::NativeTokenFetcher) {
let fetcher = NativeTokenFetcherSingleton::new(
configs
.native_token_fetcher_config
.clone()
.context("native_token_fetcher_config")?,
);
let (cb_sender, cb_receiver) = oneshot::channel();

Some(fetcher)
let native_token_fetcher = if components.contains(&Component::NativeTokenFetcher) {
Arc::new(
NativeTokenFetcher::new(
configs
.native_token_fetcher_config
.clone()
.context("native_token_fetcher_config")?,
)
.await?,
) as Arc<dyn ConversionRateFetcher>
} else {
None
Arc::new(NoOpConversionRateFetcher::new())
};

let (cb_sender, cb_receiver) = oneshot::channel();

let conversion_rate_fetcher: Arc<dyn ConversionRateFetcher> =
if let Some(fetcher_singleton) = &mut fetcher_component {
let fetcher = fetcher_singleton
.get_or_init()
.await
.context("fetcher.get_or_init()")?;
fetcher
} else {
// create no-op fetcher if the native token fetcher is not enabled
Arc::new(NoOpConversionRateFetcher::new())
};

let query_client = QueryClient::new(&eth_client_config.web3_url).unwrap();
let gas_adjuster_config = configs.gas_adjuster_config.context("gas_adjuster_config")?;
let mut gas_adjuster = GasAdjusterSingleton::new(
eth_client_config.web3_url.clone(),
gas_adjuster_config,
conversion_rate_fetcher,
native_token_fetcher,
);

// Prometheus exporter and circuit breaker checker should run for every component configuration.
Expand Down Expand Up @@ -728,14 +714,6 @@ pub async fn initialize_components(
task_futures.push(task);
}

// check if the native ERC20 fetcher is enabled and run it if it is
fetcher_component
.and_then(|c| c.run_if_initialized(stop_receiver.clone()))
.into_iter()
.for_each(|handle| {
task_futures.push(handle);
});

Ok((task_futures, stop_sender, cb_receiver, health_check_handle))
}

Expand Down
122 changes: 41 additions & 81 deletions core/lib/zksync_core/src/native_token_fetcher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std::{cmp::min, sync::Arc, time::Duration};
use std::{cmp::min, sync::Arc};

use anyhow::Context;
use async_trait::async_trait;
use hex::ToHex;
use metrics::atomics::AtomicU64;
use tokio::{
sync::{watch, OnceCell},
task::JoinHandle,
};
use tokio::sync::Mutex;
use zksync_config::configs::native_token_fetcher::NativeTokenFetcherConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side comment: mutexes from std are perfectly fine to use in an async context, and generally they are more performant than tokio mutex. Usually, you only need tokio mutex if you need to hold a guard across an await point (and even in such situations, it's often a code smell). See the tokio docs for context.

This is a minor thing though and can be kept as is.


/// Trait used to query the stack's native token conversion rate. Used to properly
/// determine gas prices, as they partially depend on L1 gas prices, denominated in `eth`.
#[async_trait]
pub trait ConversionRateFetcher: 'static + std::fmt::Debug + Send + Sync {
fn conversion_rate(&self) -> anyhow::Result<u64>;
async fn update(&self) -> anyhow::Result<()>;
}

#[derive(Debug)]
Expand All @@ -30,47 +29,9 @@ impl ConversionRateFetcher for NoOpConversionRateFetcher {
fn conversion_rate(&self) -> anyhow::Result<u64> {
Ok(1)
}
}

pub(crate) struct NativeTokenFetcherSingleton {
native_token_fetcher_config: NativeTokenFetcherConfig,
singleton: OnceCell<anyhow::Result<Arc<NativeTokenFetcher>>>,
}

impl NativeTokenFetcherSingleton {
pub fn new(native_token_fetcher_config: NativeTokenFetcherConfig) -> Self {
Self {
native_token_fetcher_config,
singleton: OnceCell::new(),
}
}

pub async fn get_or_init(&mut self) -> anyhow::Result<Arc<NativeTokenFetcher>> {
match self
.singleton
.get_or_init(|| async {
Ok(Arc::new(
NativeTokenFetcher::new(self.native_token_fetcher_config.clone()).await?,
))
})
.await
{
Ok(fetcher) => Ok(fetcher.clone()),
Err(_e) => Err(anyhow::anyhow!(
"Failed to get or initialize NativeTokenFetcher"
)),
}
}

pub fn run_if_initialized(
self,
stop_signal: watch::Receiver<bool>,
) -> Option<JoinHandle<anyhow::Result<()>>> {
let fetcher = match self.singleton.get()? {
Ok(fetcher) => fetcher.clone(),
Err(_e) => return None,
};
Some(tokio::spawn(async move { fetcher.run(stop_signal).await }))
async fn update(&self) -> anyhow::Result<()> {
Ok(())
}
}

Expand All @@ -81,6 +42,7 @@ pub(crate) struct NativeTokenFetcher {
pub config: NativeTokenFetcherConfig,
pub latest_to_eth_conversion_rate: AtomicU64,
http_client: reqwest::Client,
error_reporter: Arc<Mutex<ErrorReporter>>,
}

impl NativeTokenFetcher {
Expand All @@ -99,48 +61,15 @@ impl NativeTokenFetcher {
.await
.context("Unable to parse the response of the native token conversion rate server")?;

let error_reporter = Arc::new(Mutex::new(ErrorReporter::new()));

Ok(Self {
config,
latest_to_eth_conversion_rate: AtomicU64::new(conversion_rate),
http_client,
error_reporter,
})
}

pub(crate) async fn run(&self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let mut error_reporter = ErrorReporter::new();
loop {
if *stop_receiver.borrow() {
tracing::info!("Stop signal received, native_token_fetcher is shutting down");
break;
}

match self
.http_client
.get(format!(
"{}/conversion_rate/0x{}",
&self.config.host,
&self.config.token_address.encode_hex::<String>()
))
.send()
.await
{
Ok(response) => {
let conversion_rate = response.json::<u64>().await.context(
"Unable to parse the response of the native token conversion rate server",
)?;
tracing::info!("Fetched native token conversion rate: {}", conversion_rate);
self.latest_to_eth_conversion_rate
.store(conversion_rate, std::sync::atomic::Ordering::Relaxed);
error_reporter.reset();
}
Err(err) => error_reporter.process(anyhow::anyhow!(err)),
}

tokio::time::sleep(Duration::from_secs(self.config.poll_interval)).await;
}

Ok(())
}
}

#[async_trait]
Expand All @@ -151,6 +80,35 @@ impl ConversionRateFetcher for NativeTokenFetcher {
.load(std::sync::atomic::Ordering::Relaxed),
)
}

async fn update(&self) -> anyhow::Result<()> {
match self
.http_client
.get(format!(
"{}/conversion_rate/0x{}",
&self.config.host,
&self.config.token_address.encode_hex::<String>()
))
.send()
.await
{
Ok(response) => {
let conversion_rate = response.json::<u64>().await.context(
"Unable to parse the response of the native token conversion rate server",
)?;
self.latest_to_eth_conversion_rate
.store(conversion_rate, std::sync::atomic::Ordering::Relaxed);
self.error_reporter.lock().await.reset();
}
Err(err) => self
.error_reporter
.lock()
.await
.process(anyhow::anyhow!(err)),
}

Ok(())
}
}

#[derive(Debug)]
Expand All @@ -176,7 +134,9 @@ impl ErrorReporter {

fn process(&mut self, err: anyhow::Error) {
self.current_try = min(self.current_try + 1, Self::MAX_CONSECUTIVE_NETWORK_ERRORS);

tracing::error!("Failed to fetch native token conversion rate from the server: {err}");

if self.current_try >= Self::MAX_CONSECUTIVE_NETWORK_ERRORS && !self.alert_spawned {
vlog::capture_message(&err.to_string(), vlog::AlertLevel::Warning);
self.alert_spawned = true;
Expand Down
Loading
Loading