diff --git a/Cargo.lock b/Cargo.lock index a3a4c0ecf1c..0ab7d42aeb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,6 +345,7 @@ dependencies = [ "environment", "eth2_config", "eth2_network_config", + "execution_layer", "futures", "genesis", "hex", @@ -611,7 +612,7 @@ dependencies = [ "eth2_ssz_derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "eth2_ssz_types", "ethereum-types 0.12.1", - "quickcheck", + "quickcheck 0.9.2", "quickcheck_macros", "smallvec", "tree_hash", @@ -1842,8 +1843,10 @@ dependencies = [ "exit-future", "futures", "hex", + "jsonwebtoken", "lru", "parking_lot 0.11.2", + "rand 0.7.3", "reqwest", "sensitive_url", "serde", @@ -1851,11 +1854,13 @@ dependencies = [ "slog", "slot_clock", "task_executor", + "tempfile", "tokio", "tree_hash", "tree_hash_derive 0.4.0", "types", "warp 0.3.0", + "zeroize", ] [[package]] @@ -2765,6 +2770,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonwebtoken" +version = "8.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "012bb02250fdd38faa5feee63235f7a459974440b9b57593822414c31f92839e" +dependencies = [ + "base64 0.13.0", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "k256" version = "0.8.1" @@ -3588,7 +3607,7 @@ dependencies = [ "eth2_hashing 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.12.1", "lazy_static", - "quickcheck", + "quickcheck 0.9.2", "quickcheck_macros", "safe_arith", ] @@ -4265,6 +4284,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -4672,6 +4700,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "quickcheck" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "quickcheck_macros" version = "0.9.1" @@ -5475,6 +5512,18 @@ dependencies = [ "rand_core 0.6.3", ] +[[package]] +name = "simple_asn1" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a762b1c38b9b990c694b9c2f8abe3372ce6a9ceaae6bca39cfc46e054f45745" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror", + "time 0.3.7", +] + [[package]] name = "simulator" version = "0.2.0" @@ -6077,6 +6126,7 @@ dependencies = [ "itoa 1.0.1", "libc", "num_threads", + "quickcheck 1.0.3", "time-macros", ] diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 3242336c55c..257b08c266a 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -29,6 +29,7 @@ environment = { path = "../lighthouse/environment" } task_executor = { path = "../common/task_executor" } genesis = { path = "genesis" } eth2_network_config = { path = "../common/eth2_network_config" } +execution_layer = { path = "execution_layer" } lighthouse_network = { path = "./lighthouse_network" } serde = "1.0.116" clap_utils = { path = "../common/clap_utils" } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 0142a0f0bb1..e474b9a521a 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -337,14 +337,20 @@ where let el_runtime = ExecutionLayerRuntime::default(); - let urls = urls + let urls: Vec = urls .iter() .map(|s| SensitiveUrl::parse(*s)) .collect::>() .unwrap(); - let execution_layer = ExecutionLayer::from_urls( - urls, - Some(Address::repeat_byte(42)), + + let config = execution_layer::Config { + execution_endpoints: urls, + secret_files: vec![], + suggested_fee_recipient: Some(Address::repeat_byte(42)), + ..Default::default() + }; + let execution_layer = ExecutionLayer::from_config( + config, el_runtime.task_executor.clone(), el_runtime.log.clone(), ) diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f8c21b86230..a7e4356fe2e 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -149,11 +149,10 @@ where None }; - let execution_layer = if let Some(execution_endpoints) = config.execution_endpoints { + let execution_layer = if let Some(config) = config.execution_layer { let context = runtime_context.service_context("exec".into()); - let execution_layer = ExecutionLayer::from_urls( - execution_endpoints, - config.suggested_fee_recipient, + let execution_layer = ExecutionLayer::from_config( + config, context.executor.clone(), context.log().clone(), ) diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 97689622600..15259204a5f 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -4,7 +4,7 @@ use sensitive_url::SensitiveUrl; use serde_derive::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; -use types::{Address, Graffiti, PublicKeyBytes}; +use types::{Graffiti, PublicKeyBytes}; /// Default directory name for the freezer database under the top-level data dir. const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; @@ -72,8 +72,7 @@ pub struct Config { pub network: network::NetworkConfig, pub chain: beacon_chain::ChainConfig, pub eth1: eth1::Config, - pub execution_endpoints: Option>, - pub suggested_fee_recipient: Option
, + pub execution_layer: Option, pub http_api: http_api::Config, pub http_metrics: http_metrics::Config, pub monitoring_api: Option, @@ -94,8 +93,7 @@ impl Default for Config { dummy_eth1_backend: false, sync_eth1_chain: false, eth1: <_>::default(), - execution_endpoints: None, - suggested_fee_recipient: None, + execution_layer: None, graffiti: Graffiti::default(), http_api: <_>::default(), http_metrics: <_>::default(), diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index 7dbb326a670..a7d369ac046 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -18,6 +18,7 @@ serde_json = "1.0.58" serde = { version = "1.0.116", features = ["derive"] } eth1 = { path = "../eth1" } warp = { git = "https://github.com/macladson/warp", rev ="dfa259e", features = ["tls"] } +jsonwebtoken = "8" environment = { path = "../../lighthouse/environment" } bytes = "1.1.0" task_executor = { path = "../../common/task_executor" } @@ -29,3 +30,6 @@ tree_hash = "0.4.1" tree_hash_derive = { path = "../../consensus/tree_hash_derive"} parking_lot = "0.11.0" slot_clock = { path = "../../common/slot_clock" } +tempfile = "3.1.0" +rand = "0.7.3" +zeroize = { version = "1.4.2", features = ["zeroize_derive"] } diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index c178c0d5c62..334be3bfead 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use eth1::http::RpcError; +use reqwest::StatusCode; use serde::{Deserialize, Serialize}; pub const LATEST_TAG: &str = "latest"; @@ -7,6 +8,7 @@ pub const LATEST_TAG: &str = "latest"; use crate::engines::ForkChoiceState; pub use types::{Address, EthSpec, ExecutionBlockHash, ExecutionPayload, Hash256, Uint256}; +pub mod auth; pub mod http; pub mod json_structures; @@ -15,6 +17,7 @@ pub type PayloadId = [u8; 8]; #[derive(Debug)] pub enum Error { Reqwest(reqwest::Error), + Auth(auth::Error), BadResponse(String), RequestFailed(String), InvalidExecutePayloadResponse(&'static str), @@ -31,7 +34,14 @@ pub enum Error { impl From for Error { fn from(e: reqwest::Error) -> Self { - Error::Reqwest(e) + if matches!( + e.status(), + Some(StatusCode::UNAUTHORIZED) | Some(StatusCode::FORBIDDEN) + ) { + Error::Auth(auth::Error::InvalidToken) + } else { + Error::Reqwest(e) + } } } @@ -41,6 +51,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: auth::Error) -> Self { + Error::Auth(e) + } +} + /// A generic interface for an execution engine API. #[async_trait] pub trait EngineApi { diff --git a/beacon_node/execution_layer/src/engine_api/auth.rs b/beacon_node/execution_layer/src/engine_api/auth.rs new file mode 100644 index 00000000000..a4050a25c0e --- /dev/null +++ b/beacon_node/execution_layer/src/engine_api/auth.rs @@ -0,0 +1,148 @@ +use jsonwebtoken::{encode, get_current_timestamp, Algorithm, EncodingKey, Header}; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use zeroize::Zeroize; + +/// Default algorithm used for JWT token signing. +const DEFAULT_ALGORITHM: Algorithm = Algorithm::HS256; + +/// JWT secret length in bytes. +pub const JWT_SECRET_LENGTH: usize = 32; + +#[derive(Debug)] +pub enum Error { + JWT(jsonwebtoken::errors::Error), + InvalidToken, +} + +impl From for Error { + fn from(e: jsonwebtoken::errors::Error) -> Self { + Error::JWT(e) + } +} + +/// Provides wrapper around `[u8; JWT_SECRET_LENGTH]` that implements `Zeroize`. +#[derive(Zeroize)] +#[zeroize(drop)] +pub struct JwtKey([u8; JWT_SECRET_LENGTH as usize]); + +impl JwtKey { + /// Wrap given slice in `Self`. Returns an error if slice.len() != `JWT_SECRET_LENGTH`. + pub fn from_slice(key: &[u8]) -> Result { + if key.len() != JWT_SECRET_LENGTH { + return Err(format!( + "Invalid key length. Expected {} got {}", + JWT_SECRET_LENGTH, + key.len() + )); + } + let mut res = [0; JWT_SECRET_LENGTH]; + res.copy_from_slice(key); + Ok(Self(res)) + } + + /// Generate a random secret. + pub fn random() -> Self { + Self(rand::thread_rng().gen::<[u8; JWT_SECRET_LENGTH]>()) + } + + /// Returns a reference to the underlying byte array. + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } + + /// Returns the hex encoded `String` for the secret. + pub fn hex_string(&self) -> String { + hex::encode(self.0) + } +} + +/// Contains the JWT secret and claims parameters. +pub struct Auth { + key: EncodingKey, + id: Option, + clv: Option, +} + +impl Auth { + pub fn new(secret: JwtKey, id: Option, clv: Option) -> Self { + Self { + key: EncodingKey::from_secret(secret.as_bytes()), + id, + clv, + } + } + + /// Generate a JWT token with `claims.iat` set to current time. + pub fn generate_token(&self) -> Result { + let claims = self.generate_claims_at_timestamp(); + self.generate_token_with_claims(&claims) + } + + /// Generate a JWT token with the given claims. + fn generate_token_with_claims(&self, claims: &Claims) -> Result { + let header = Header::new(DEFAULT_ALGORITHM); + Ok(encode(&header, claims, &self.key)?) + } + + /// Generate a `Claims` struct with `iat` set to current time + fn generate_claims_at_timestamp(&self) -> Claims { + Claims { + iat: get_current_timestamp(), + id: self.id.clone(), + clv: self.clv.clone(), + } + } + + /// Validate a JWT token given the secret key and return the originally signed `TokenData`. + pub fn validate_token( + token: &str, + secret: &JwtKey, + ) -> Result, Error> { + let mut validation = jsonwebtoken::Validation::new(DEFAULT_ALGORITHM); + validation.validate_exp = false; + validation.required_spec_claims.remove("exp"); + + jsonwebtoken::decode::( + token, + &jsonwebtoken::DecodingKey::from_secret(secret.as_bytes()), + &validation, + ) + .map_err(Into::into) + } +} + +/// Claims struct as defined in https://github.com/ethereum/execution-apis/blob/main/src/engine/authentication.md#jwt-claims +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct Claims { + /// issued-at claim. Represented as seconds passed since UNIX_EPOCH. + iat: u64, + /// Optional unique identifier for the CL node. + id: Option, + /// Optional client version for the CL node. + clv: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::JWT_SECRET; + + #[test] + fn test_roundtrip() { + let auth = Auth::new( + JwtKey::from_slice(&JWT_SECRET).unwrap(), + Some("42".into()), + Some("Lighthouse".into()), + ); + let claims = auth.generate_claims_at_timestamp(); + let token = auth.generate_token_with_claims(&claims).unwrap(); + + assert_eq!( + Auth::validate_token(&token, &JwtKey::from_slice(&JWT_SECRET).unwrap()) + .unwrap() + .claims, + claims + ); + } +} diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 4fa5e80a78b..d678b2e1d5d 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -1,6 +1,7 @@ //! Contains an implementation of `EngineAPI` using the JSON-RPC API via HTTP. use super::*; +use crate::auth::Auth; use crate::json_structures::*; use async_trait::async_trait; use eth1::http::EIP155_ERROR_STR; @@ -39,6 +40,7 @@ pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_millis(50 pub struct HttpJsonRpc { pub client: Client, pub url: SensitiveUrl, + auth: Option, } impl HttpJsonRpc { @@ -46,6 +48,15 @@ impl HttpJsonRpc { Ok(Self { client: Client::builder().build()?, url, + auth: None, + }) + } + + pub fn new_with_auth(url: SensitiveUrl, auth: Auth) -> Result { + Ok(Self { + client: Client::builder().build()?, + url, + auth: Some(auth), }) } @@ -62,17 +73,19 @@ impl HttpJsonRpc { id: STATIC_ID, }; - let body: JsonResponseBody = self + let mut request = self .client .post(self.url.full.clone()) .timeout(timeout) .header(CONTENT_TYPE, "application/json") - .json(&body) - .send() - .await? - .error_for_status()? - .json() - .await?; + .json(&body); + + // Generate and add a jwt token to the header if auth is defined. + if let Some(auth) = &self.auth { + request = request.bearer_auth(auth.generate_token()?); + }; + + let body: JsonResponseBody = request.send().await?.error_for_status()?.json().await?; match (body.result, body.error) { (result, None) => serde_json::from_value(result).map_err(Into::into), @@ -183,8 +196,9 @@ impl EngineApi for HttpJsonRpc { #[cfg(test)] mod test { + use super::auth::JwtKey; use super::*; - use crate::test_utils::MockServer; + use crate::test_utils::{MockServer, JWT_SECRET}; use std::future::Future; use std::str::FromStr; use std::sync::Arc; @@ -197,14 +211,25 @@ mod test { } impl Tester { - pub fn new() -> Self { + pub fn new(with_auth: bool) -> Self { let server = MockServer::unit_testing(); let rpc_url = SensitiveUrl::parse(&server.url()).unwrap(); - let rpc_client = Arc::new(HttpJsonRpc::new(rpc_url).unwrap()); - let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap(); - let echo_client = Arc::new(HttpJsonRpc::new(echo_url).unwrap()); + // Create rpc clients that include JWT auth headers if `with_auth` is true. + let (rpc_client, echo_client) = if with_auth { + let rpc_auth = Auth::new(JwtKey::from_slice(&JWT_SECRET).unwrap(), None, None); + let echo_auth = Auth::new(JwtKey::from_slice(&JWT_SECRET).unwrap(), None, None); + ( + Arc::new(HttpJsonRpc::new_with_auth(rpc_url, rpc_auth).unwrap()), + Arc::new(HttpJsonRpc::new_with_auth(echo_url, echo_auth).unwrap()), + ) + } else { + ( + Arc::new(HttpJsonRpc::new(rpc_url).unwrap()), + Arc::new(HttpJsonRpc::new(echo_url).unwrap()), + ) + }; Self { server, @@ -235,6 +260,22 @@ mod test { self } + pub async fn assert_auth_failure(self, request_func: R) -> Self + where + R: Fn(Arc) -> F, + F: Future>, + T: std::fmt::Debug, + { + let res = request_func(self.echo_client.clone()).await; + if !matches!(res, Err(Error::Auth(_))) { + panic!( + "No authentication provided, rpc call should have failed.\nResult: {:?}", + res + ) + } + self + } + pub async fn with_preloaded_responses( self, preloaded_responses: Vec, @@ -391,7 +432,7 @@ mod test { #[tokio::test] async fn get_block_by_number_request() { - Tester::new() + Tester::new(true) .assert_request_equals( |client| async move { let _ = client @@ -406,11 +447,19 @@ mod test { }), ) .await; + + Tester::new(false) + .assert_auth_failure(|client| async move { + client + .get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG)) + .await + }) + .await; } #[tokio::test] async fn get_block_by_hash_request() { - Tester::new() + Tester::new(true) .assert_request_equals( |client| async move { let _ = client @@ -425,11 +474,19 @@ mod test { }), ) .await; + + Tester::new(false) + .assert_auth_failure(|client| async move { + client + .get_block_by_hash(ExecutionBlockHash::repeat_byte(1)) + .await + }) + .await; } #[tokio::test] async fn forkchoice_updated_v1_with_payload_attributes_request() { - Tester::new() + Tester::new(true) .assert_request_equals( |client| async move { let _ = client @@ -464,11 +521,30 @@ mod test { }), ) .await; + + Tester::new(false) + .assert_auth_failure(|client| async move { + client + .forkchoice_updated_v1( + ForkChoiceState { + head_block_hash: ExecutionBlockHash::repeat_byte(1), + safe_block_hash: ExecutionBlockHash::repeat_byte(1), + finalized_block_hash: ExecutionBlockHash::zero(), + }, + Some(PayloadAttributes { + timestamp: 5, + prev_randao: Hash256::zero(), + suggested_fee_recipient: Address::repeat_byte(0), + }), + ) + .await + }) + .await; } #[tokio::test] async fn get_payload_v1_request() { - Tester::new() + Tester::new(true) .assert_request_equals( |client| async move { let _ = client.get_payload_v1::([42; 8]).await; @@ -481,11 +557,17 @@ mod test { }), ) .await; + + Tester::new(false) + .assert_auth_failure(|client| async move { + client.get_payload_v1::([42; 8]).await + }) + .await; } #[tokio::test] async fn new_payload_v1_request() { - Tester::new() + Tester::new(true) .assert_request_equals( |client| async move { let _ = client @@ -530,11 +612,34 @@ mod test { }), ) .await; + + Tester::new(false) + .assert_auth_failure(|client| async move { + client + .new_payload_v1::(ExecutionPayload { + parent_hash: ExecutionBlockHash::repeat_byte(0), + fee_recipient: Address::repeat_byte(1), + state_root: Hash256::repeat_byte(1), + receipts_root: Hash256::repeat_byte(0), + logs_bloom: vec![1; 256].into(), + prev_randao: Hash256::repeat_byte(1), + block_number: 0, + gas_limit: 1, + gas_used: 2, + timestamp: 42, + extra_data: vec![].into(), + base_fee_per_gas: Uint256::from(1), + block_hash: ExecutionBlockHash::repeat_byte(1), + transactions: vec![].into(), + }) + .await + }) + .await; } #[tokio::test] async fn forkchoice_updated_v1_request() { - Tester::new() + Tester::new(true) .assert_request_equals( |client| async move { let _ = client @@ -560,6 +665,21 @@ mod test { }), ) .await; + + Tester::new(false) + .assert_auth_failure(|client| async move { + client + .forkchoice_updated_v1( + ForkChoiceState { + head_block_hash: ExecutionBlockHash::repeat_byte(0), + safe_block_hash: ExecutionBlockHash::repeat_byte(0), + finalized_block_hash: ExecutionBlockHash::repeat_byte(1), + }, + None, + ) + .await + }) + .await; } fn str_to_payload_id(s: &str) -> PayloadId { @@ -583,7 +703,7 @@ mod test { /// The `id` field has been modified on these vectors to match the one we use. #[tokio::test] async fn geth_test_vectors() { - Tester::new() + Tester::new(true) .assert_request_equals( // engine_forkchoiceUpdatedV1 (prepare payload) REQUEST validation |client| async move { diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 2773ac79f8c..6dec9983c4e 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -21,6 +21,7 @@ enum EngineState { Synced, Offline, Syncing, + AuthFailed, } #[derive(Copy, Clone, PartialEq, Debug)] @@ -135,6 +136,7 @@ pub struct Engines { pub enum EngineError { Offline { id: String }, Api { id: String, error: EngineApiError }, + Auth { id: String }, } impl Engines { @@ -226,6 +228,18 @@ impl Engines { *state_lock = EngineState::Syncing } + Err(EngineApiError::Auth(err)) => { + if logging.is_enabled() { + warn!( + self.log, + "Failed jwt authorization"; + "error" => ?err, + "id" => &engine.id + ); + } + + *state_lock = EngineState::AuthFailed + } Err(e) => { if logging.is_enabled() { warn!( @@ -295,7 +309,13 @@ impl Engines { let mut errors = vec![]; for engine in &self.engines { - let engine_synced = *engine.state.read().await == EngineState::Synced; + let (engine_synced, engine_auth_failed) = { + let state = engine.state.read().await; + ( + *state == EngineState::Synced, + *state == EngineState::AuthFailed, + ) + }; if engine_synced { match func(engine).await { Ok(result) => return Ok(result), @@ -313,6 +333,10 @@ impl Engines { }) } } + } else if engine_auth_failed { + errors.push(EngineError::Auth { + id: engine.id.clone(), + }) } else { errors.push(EngineError::Offline { id: engine.id.clone(), diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index ef2ce065cb3..dd85d2d3808 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -4,15 +4,19 @@ //! This crate only provides useful functionality for "The Merge", it does not provide any of the //! deposit-contract functionality that the `beacon_node/eth1` crate already provides. +use auth::{Auth, JwtKey}; use engine_api::{Error as ApiError, *}; use engines::{Engine, EngineError, Engines, ForkChoiceState, Logging}; use lru::LruCache; use payload_status::process_multiple_payload_statuses; use sensitive_url::SensitiveUrl; +use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, trace, Logger}; use slot_clock::SlotClock; use std::collections::HashMap; use std::future::Future; +use std::io::Write; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use task_executor::TaskExecutor; @@ -30,6 +34,9 @@ mod engines; mod payload_status; pub mod test_utils; +/// Name for the default file used for the jwt secret. +pub const DEFAULT_JWT_FILE: &str = "jwt.hex"; + /// Each time the `ExecutionLayer` retrieves a block from an execution node, it stores that block /// in an LRU cache to avoid redundant lookups. This is the size of that cache. const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128; @@ -54,6 +61,7 @@ pub enum Error { FeeRecipientUnspecified, ConsensusFailure, MissingLatestValidHash, + InvalidJWTSecret(String), } impl From for Error { @@ -77,6 +85,31 @@ struct Inner { log: Logger, } +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct Config { + /// Endpoint urls for EL nodes that are running the engine api. + pub execution_endpoints: Vec, + /// JWT secrets for the above endpoints running the engine api. + pub secret_files: Vec, + /// The default fee recipient to use on the beacon node if none if provided from + /// the validator client during block preparation. + pub suggested_fee_recipient: Option
, + /// An optional id for the beacon node that will be passed to the EL in the JWT token claim. + pub jwt_id: Option, + /// An optional client version for the beacon node that will be passed to the EL in the JWT token claim. + pub jwt_version: Option, + /// Default directory for the jwt secret if not provided through cli. + pub default_datadir: PathBuf, +} + +fn strip_prefix(s: &str) -> &str { + if let Some(stripped) = s.strip_prefix("0x") { + stripped + } else { + s + } +} + /// Provides access to one or more execution engines and provides a neat interface for consumption /// by the `BeaconChain`. /// @@ -92,22 +125,73 @@ pub struct ExecutionLayer { } impl ExecutionLayer { - /// Instantiate `Self` with `urls.len()` engines, all using the JSON-RPC via HTTP. - pub fn from_urls( - urls: Vec, - suggested_fee_recipient: Option
, - executor: TaskExecutor, - log: Logger, - ) -> Result { + /// Instantiate `Self` with Execution engines specified using `Config`, all using the JSON-RPC via HTTP. + pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result { + let Config { + execution_endpoints: urls, + mut secret_files, + suggested_fee_recipient, + jwt_id, + jwt_version, + default_datadir, + } = config; + if urls.is_empty() { return Err(Error::NoEngines); } - let engines = urls + // Extend the jwt secret files with the default jwt secret path if not provided via cli. + // This ensures that we have a jwt secret for every EL. + secret_files.extend(vec![ + default_datadir.join(DEFAULT_JWT_FILE); + urls.len().saturating_sub(secret_files.len()) + ]); + + let secrets: Vec<(JwtKey, PathBuf)> = secret_files + .iter() + .map(|p| { + // Read secret from file if it already exists + if p.exists() { + std::fs::read_to_string(p) + .map_err(|e| { + format!("Failed to read JWT secret file {:?}, error: {:?}", p, e) + }) + .and_then(|ref s| { + let secret = JwtKey::from_slice( + &hex::decode(strip_prefix(s)) + .map_err(|e| format!("Invalid hex string: {:?}", e))?, + )?; + Ok((secret, p.to_path_buf())) + }) + } else { + // Create a new file and write a randomly generated secret to it if file does not exist + std::fs::File::options() + .write(true) + .create_new(true) + .open(p) + .map_err(|e| { + format!("Failed to open JWT secret file {:?}, error: {:?}", p, e) + }) + .and_then(|mut f| { + let secret = auth::JwtKey::random(); + f.write_all(secret.hex_string().as_bytes()).map_err(|e| { + format!("Failed to write to JWT secret file: {:?}", e) + })?; + Ok((secret, p.to_path_buf())) + }) + } + }) + .collect::>() + .map_err(Error::InvalidJWTSecret)?; + + let engines: Vec> = urls .into_iter() - .map(|url| { + .zip(secrets.into_iter()) + .map(|(url, (secret, path))| { let id = url.to_string(); - let api = HttpJsonRpc::new(url)?; + let auth = Auth::new(secret, jwt_id.clone(), jwt_version.clone()); + debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?path); + let api = HttpJsonRpc::new_with_auth(url, auth)?; Ok(Engine::new(id, api)) }) .collect::>()?; @@ -793,6 +877,7 @@ mod test { MockExecutionLayer::default_params() .move_to_block_prior_to_terminal_block() .with_terminal_block(|spec, el, _| async move { + el.engines().upcheck_not_synced(Logging::Disabled).await; assert_eq!(el.get_terminal_pow_block_hash(&spec).await.unwrap(), None) }) .await @@ -811,6 +896,7 @@ mod test { MockExecutionLayer::default_params() .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { + el.engines().upcheck_not_synced(Logging::Disabled).await; assert_eq!( el.is_valid_terminal_pow_block_hash(terminal_block.unwrap().block_hash, &spec) .await @@ -826,6 +912,7 @@ mod test { MockExecutionLayer::default_params() .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { + el.engines().upcheck_not_synced(Logging::Disabled).await; let invalid_terminal_block = terminal_block.unwrap().parent_hash; assert_eq!( @@ -843,6 +930,7 @@ mod test { MockExecutionLayer::default_params() .move_to_terminal_block() .with_terminal_block(|spec, el, _| async move { + el.engines().upcheck_not_synced(Logging::Disabled).await; let missing_terminal_block = ExecutionBlockHash::repeat_byte(42); assert_eq!( diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index db024ba8b5d..a72f34b1abd 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -1,11 +1,12 @@ use crate::{ - test_utils::{MockServer, DEFAULT_TERMINAL_BLOCK, DEFAULT_TERMINAL_DIFFICULTY}, - *, + test_utils::{MockServer, DEFAULT_TERMINAL_BLOCK, DEFAULT_TERMINAL_DIFFICULTY, JWT_SECRET}, + Config, *, }; use environment::null_logger; use sensitive_url::SensitiveUrl; use std::sync::Arc; use task_executor::TaskExecutor; +use tempfile::NamedTempFile; use types::{Address, ChainSpec, Epoch, EthSpec, Hash256, Uint256}; pub struct ExecutionLayerRuntime { @@ -85,10 +86,19 @@ impl MockExecutionLayer { ); let url = SensitiveUrl::parse(&server.url()).unwrap(); + let file = NamedTempFile::new().unwrap(); - let el = ExecutionLayer::from_urls( - vec![url], - Some(Address::repeat_byte(42)), + let path = file.path().into(); + std::fs::write(&path, hex::encode(JWT_SECRET)).unwrap(); + + let config = Config { + execution_endpoints: vec![url], + secret_files: vec![path], + suggested_fee_recipient: Some(Address::repeat_byte(42)), + ..Default::default() + }; + let el = ExecutionLayer::from_config( + config, el_runtime.task_executor.clone(), el_runtime.log.clone(), ) diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 9d6eb5cf04b..0172d859524 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -1,6 +1,9 @@ //! Provides a mock execution engine HTTP JSON-RPC API for use in testing. -use crate::engine_api::{http::JSONRPC_VERSION, PayloadStatusV1, PayloadStatusV1Status}; +use crate::engine_api::auth::JwtKey; +use crate::engine_api::{ + auth::Auth, http::JSONRPC_VERSION, PayloadStatusV1, PayloadStatusV1Status, +}; use bytes::Bytes; use environment::null_logger; use execution_block_generator::{Block, PoWBlock}; @@ -9,19 +12,21 @@ use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use serde::{Deserialize, Serialize}; use serde_json::json; use slog::{info, Logger}; +use std::convert::Infallible; use std::future::Future; use std::marker::PhantomData; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; use tokio::{runtime, sync::oneshot}; use types::{EthSpec, ExecutionBlockHash, Uint256}; -use warp::Filter; +use warp::{http::StatusCode, Filter, Rejection}; pub use execution_block_generator::{generate_pow_block, ExecutionBlockGenerator}; pub use mock_execution_layer::{ExecutionLayerRuntime, MockExecutionLayer}; pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; +pub const JWT_SECRET: [u8; 32] = [42; 32]; mod execution_block_generator; mod handle_rpc; @@ -222,6 +227,10 @@ pub struct StaticNewPayloadResponse { status: PayloadStatusV1, should_import: bool, } +#[derive(Debug)] +struct AuthError(String); + +impl warp::reject::Reject for AuthError {} /// A wrapper around all the items required to spawn the HTTP server. /// @@ -252,6 +261,66 @@ impl Default for Config { } } +/// An API error serializable to JSON. +#[derive(Serialize)] +struct ErrorMessage { + code: u16, + message: String, +} + +/// Returns a `warp` header which filters out request that has a missing or incorrectly +/// signed JWT token. +fn auth_header_filter() -> warp::filters::BoxedFilter<()> { + warp::any() + .and(warp::filters::header::optional("Authorization")) + .and_then(move |authorization: Option| async move { + match authorization { + None => Err(warp::reject::custom(AuthError( + "auth absent from request".to_string(), + ))), + Some(auth) => { + if let Some(token) = auth.strip_prefix("Bearer ") { + let secret = JwtKey::from_slice(&JWT_SECRET).unwrap(); + match Auth::validate_token(token, &secret) { + Ok(_) => Ok(()), + Err(e) => Err(warp::reject::custom(AuthError(format!( + "Auth failure: {:?}", + e + )))), + } + } else { + Err(warp::reject::custom(AuthError( + "Bearer token not present in auth header".to_string(), + ))) + } + } + } + }) + .untuple_one() + .boxed() +} +/// This function receives a `Rejection` and tries to return a custom +/// value on invalid auth, otherwise simply passes the rejection along. +async fn handle_rejection(err: Rejection) -> Result { + let code; + let message; + + if let Some(e) = err.find::() { + message = format!("Authorization error: {:?}", e); + code = StatusCode::UNAUTHORIZED; + } else { + message = "BAD_REQUEST".to_string(); + code = StatusCode::BAD_REQUEST; + } + + let json = warp::reply::json(&ErrorMessage { + code: code.as_u16(), + message, + }); + + Ok(warp::reply::with_status(json, code)) +} + /// Creates a server that will serve requests using information from `ctx`. /// /// The server will shut down gracefully when the `shutdown` future resolves. @@ -288,7 +357,6 @@ pub fn serve( .get("id") .and_then(serde_json::Value::as_u64) .ok_or_else(|| warp::reject::custom(MissingIdField))?; - let preloaded_response = { let mut preloaded_responses = ctx.preloaded_responses.lock(); if !preloaded_responses.is_empty() { @@ -339,7 +407,9 @@ pub fn serve( }); let routes = warp::post() + .and(auth_header_filter()) .and(root.or(echo)) + .recover(handle_rejection) // Add a `Server` header. .map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-mock-execution-client")); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 9e300d88cac..e6374d8207e 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -414,6 +414,35 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { will be used. Defaults to http://127.0.0.1:8545.") .takes_value(true) ) + .arg( + Arg::with_name("jwt-secrets") + .long("jwt-secrets") + .value_name("JWT-SECRETS") + .help("One or more comma-delimited file paths which contain the corresponding hex-encoded \ + JWT secrets for each execution endpoint provided in the --execution-endpoints flag. \ + The number of paths should be in the same order and strictly equal to the number \ + of execution endpoints provided.") + .takes_value(true) + .requires("execution-endpoints") + ) + .arg( + Arg::with_name("jwt-id") + .long("jwt-id") + .value_name("JWT-ID") + .help("Used by the beacon node to communicate a unique identifier to execution nodes \ + during JWT authentication. It corresponds to the 'id' field in the JWT claims object.\ + Set to empty by deafult") + .takes_value(true) + ) + .arg( + Arg::with_name("jwt-version") + .long("jwt-version") + .value_name("JWT-VERSION") + .help("Used by the beacon node to communicate a client version to execution nodes \ + during JWT authentication. It corresponds to the 'clv' field in the JWT claims object.\ + Set to empty by deafult") + .takes_value(true) + ) .arg( Arg::with_name("suggested-fee-recipient") .long("suggested-fee-recipient") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index ed2e3308a92..7f92cad3620 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -236,20 +236,41 @@ pub fn get_config( client_config.eth1.purge_cache = true; } - if let Some(endpoints) = cli_args.value_of("execution-endpoints") { - client_config.sync_eth1_chain = true; - client_config.execution_endpoints = endpoints - .split(',') - .map(SensitiveUrl::parse) - .collect::>() - .map(Some) - .map_err(|e| format!("execution-endpoints contains an invalid URL {:?}", e))?; - } else if cli_args.is_present("merge") { - client_config.execution_endpoints = Some(client_config.eth1.endpoints.clone()); - } + if cli_args.is_present("merge") || cli_args.is_present("execution-endpoints") { + let mut el_config = execution_layer::Config::default(); + + if let Some(endpoints) = cli_args.value_of("execution-endpoints") { + client_config.sync_eth1_chain = true; + el_config.execution_endpoints = endpoints + .split(',') + .map(SensitiveUrl::parse) + .collect::>() + .map_err(|e| format!("execution-endpoints contains an invalid URL {:?}", e))?; + } else if cli_args.is_present("merge") { + el_config.execution_endpoints = client_config.eth1.endpoints.clone(); + } - client_config.suggested_fee_recipient = - clap_utils::parse_optional(cli_args, "suggested-fee-recipient")?; + if let Some(secrets) = cli_args.value_of("jwt-secrets") { + let secret_files: Vec<_> = secrets.split(',').map(PathBuf::from).collect(); + if !secret_files.is_empty() && secret_files.len() != el_config.execution_endpoints.len() + { + return Err(format!( + "{} execution-endpoints supplied with {} jwt-secrets. Lengths \ + must match or jwt-secrets must be empty.", + el_config.execution_endpoints.len(), + secret_files.len(), + )); + } + el_config.secret_files = secret_files; + } + + el_config.suggested_fee_recipient = + clap_utils::parse_optional(cli_args, "suggested-fee-recipient")?; + el_config.jwt_id = clap_utils::parse_optional(cli_args, "jwt-id")?; + el_config.jwt_version = clap_utils::parse_optional(cli_args, "jwt-version")?; + el_config.default_datadir = client_config.data_dir.clone(); + client_config.execution_layer = Some(el_config); + } if let Some(freezer_dir) = cli_args.value_of("freezer-dir") { client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 7de201bc3f1..e70c81ac102 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -212,7 +212,7 @@ fn merge_flag() { CommandLineTest::new() .flag("merge", None) .run_with_zero_port() - .with_config(|config| assert!(config.execution_endpoints.is_some())); + .with_config(|config| assert!(config.execution_layer.is_some())); } #[test] fn merge_execution_endpoints_flag() { @@ -233,7 +233,33 @@ fn merge_execution_endpoints_flag() { .flag("merge", None) .flag("execution-endpoints", Some(&endpoint_arg)) .run_with_zero_port() - .with_config(|config| assert_eq!(config.execution_endpoints.as_ref(), Some(&endpoints))); + .with_config(|config| { + let config = config.execution_layer.as_ref().unwrap(); + assert_eq!(config.execution_endpoints, endpoints) + }); +} +#[test] +fn merge_jwt_secrets_flag() { + let dir = TempDir::new().expect("Unable to create temporary directory"); + let mut file = File::create(dir.path().join("jwtsecrets")).expect("Unable to create file"); + file.write_all(b"0x3cbc11b0d8fa16f3344eacfd6ff6430b9d30734450e8adcf5400f88d327dcb33") + .expect("Unable to write to file"); + CommandLineTest::new() + .flag("merge", None) + .flag("execution-endpoints", Some("http://localhost:8551/")) + .flag( + "jwt-secrets", + dir.path().join("jwt-file").as_os_str().to_str(), + ) + .run_with_zero_port() + .with_config(|config| { + let config = config.execution_layer.as_ref().unwrap(); + assert_eq!( + config.execution_endpoints[0].full.to_string(), + "http://localhost:8551/" + ); + assert_eq!(config.secret_files[0], dir.path().join("jwt-file")); + }); } #[test] fn merge_fee_recipient_flag() { @@ -245,10 +271,24 @@ fn merge_fee_recipient_flag() { ) .run_with_zero_port() .with_config(|config| { + let config = config.execution_layer.as_ref().unwrap(); assert_eq!( config.suggested_fee_recipient, Some(Address::from_str("0x00000000219ab540356cbb839cbe05303d7705fa").unwrap()) - ) + ); + }); +} +#[test] +fn jwt_optional_flags() { + CommandLineTest::new() + .flag("merge", None) + .flag("jwt-id", Some("bn-1")) + .flag("jwt-version", Some("Lighthouse-v2.1.3")) + .run_with_zero_port() + .with_config(|config| { + let config = config.execution_layer.as_ref().unwrap(); + assert_eq!(config.jwt_id, Some("bn-1".to_string())); + assert_eq!(config.jwt_version, Some("Lighthouse-v2.1.3".to_string())); }); } #[test] diff --git a/testing/execution_engine_integration/src/execution_engine.rs b/testing/execution_engine_integration/src/execution_engine.rs index 84d72100848..a7928f0866a 100644 --- a/testing/execution_engine_integration/src/execution_engine.rs +++ b/testing/execution_engine_integration/src/execution_engine.rs @@ -1,4 +1,5 @@ use crate::{genesis_json::geth_genesis_json, SUPPRESS_LOGS}; +use execution_layer::DEFAULT_JWT_FILE; use sensitive_url::SensitiveUrl; use std::path::PathBuf; use std::process::{Child, Command, Output, Stdio}; @@ -9,7 +10,12 @@ use unused_port::unused_tcp_port; /// Defined for each EE type (e.g., Geth, Nethermind, etc). pub trait GenericExecutionEngine: Clone { fn init_datadir() -> TempDir; - fn start_client(datadir: &TempDir, http_port: u16, http_auth_port: u16) -> Child; + fn start_client( + datadir: &TempDir, + http_port: u16, + http_auth_port: u16, + jwt_secret_path: PathBuf, + ) -> Child; } /// Holds handle to a running EE process, plus some other metadata. @@ -35,9 +41,10 @@ impl Drop for ExecutionEngine { impl ExecutionEngine { pub fn new(engine: E) -> Self { let datadir = E::init_datadir(); + let jwt_secret_path = datadir.path().join(DEFAULT_JWT_FILE); let http_port = unused_tcp_port().unwrap(); let http_auth_port = unused_tcp_port().unwrap(); - let child = E::start_client(&datadir, http_port, http_auth_port); + let child = E::start_client(&datadir, http_port, http_auth_port, jwt_secret_path); Self { engine, datadir, @@ -51,10 +58,13 @@ impl ExecutionEngine { SensitiveUrl::parse(&format!("http://127.0.0.1:{}", self.http_port)).unwrap() } - #[allow(dead_code)] // Future use. - pub fn http_ath_url(&self) -> SensitiveUrl { + pub fn http_auth_url(&self) -> SensitiveUrl { SensitiveUrl::parse(&format!("http://127.0.0.1:{}", self.http_auth_port)).unwrap() } + + pub fn datadir(&self) -> PathBuf { + self.datadir.path().to_path_buf() + } } /* @@ -98,7 +108,12 @@ impl GenericExecutionEngine for Geth { datadir } - fn start_client(datadir: &TempDir, http_port: u16, http_auth_port: u16) -> Child { + fn start_client( + datadir: &TempDir, + http_port: u16, + http_auth_port: u16, + jwt_secret_path: PathBuf, + ) -> Child { let network_port = unused_tcp_port().unwrap(); Command::new(Self::binary_path()) @@ -109,10 +124,12 @@ impl GenericExecutionEngine for Geth { .arg("engine,eth") .arg("--http.port") .arg(http_port.to_string()) - .arg("--http.authport") + .arg("--authrpc.port") .arg(http_auth_port.to_string()) .arg("--port") .arg(network_port.to_string()) + .arg("--authrpc.jwtsecret") + .arg(jwt_secret_path.as_path().to_str().unwrap()) .stdout(build_stdio()) .stderr(build_stdio()) .spawn() diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 3f0e9544245..1bbcfe09a17 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -46,10 +46,17 @@ impl TestRig { let ee_a = { let execution_engine = ExecutionEngine::new(generic_engine.clone()); - let urls = vec![execution_engine.http_url()]; + let urls = vec![execution_engine.http_auth_url()]; + + let config = execution_layer::Config { + execution_endpoints: urls, + secret_files: vec![], + suggested_fee_recipient: Some(Address::repeat_byte(42)), + default_datadir: execution_engine.datadir(), + ..Default::default() + }; let execution_layer = - ExecutionLayer::from_urls(urls, fee_recipient, executor.clone(), log.clone()) - .unwrap(); + ExecutionLayer::from_config(config, executor.clone(), log.clone()).unwrap(); ExecutionPair { execution_engine, execution_layer, @@ -59,8 +66,16 @@ impl TestRig { let ee_b = { let execution_engine = ExecutionEngine::new(generic_engine); let urls = vec![execution_engine.http_url()]; + + let config = execution_layer::Config { + execution_endpoints: urls, + secret_files: vec![], + suggested_fee_recipient: fee_recipient, + default_datadir: execution_engine.datadir(), + ..Default::default() + }; let execution_layer = - ExecutionLayer::from_urls(urls, fee_recipient, executor, log).unwrap(); + ExecutionLayer::from_config(config, executor, log.clone()).unwrap(); ExecutionPair { execution_engine, execution_layer,