Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: Network Explorer: Add freegeoip API key and split out tasks for country distributions #806

Merged
merged 13 commits into from
Oct 19, 2021
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions explorer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ okapi = { version = "0.6.0-alpha-1", features = ["derive_json_schema"] }
rocket_okapi = "0.7.0-alpha-1"
log = "0.4.0"
pretty_env_logger = "0.4.0"
thiserror = "1.0.29"

mixnet-contract = { path = "../common/mixnet-contract" }
network-defaults = { path = "../common/network-defaults" }
Expand Down
23 changes: 15 additions & 8 deletions explorer-api/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
Network Explorer API
====================

An API that can:
An API that provides data for the [Network Explorer](../explorer).

* calculate how many nodes are in which country, by checking the IPs of all nodes against an external service
* serve "hello world" via HTTP
Features:

- geolocates mixnodes using https://freegeoip.app/
- calculates how many nodes are in each country
- proxies mixnode API requests to add HTTPS

## Running

TODO:
Supply the environment variable `GEO_IP_SERVICE_API_KEY` with a key from https://freegeoip.app/.

Run as a service and reverse proxy with `nginx` to add `https` with Lets Encrypt.

# TODO / Known Issues

## TODO

* record the number of mixnodes on a given date and write to a file for later retrieval
* store the nodes per country state in a variable
* grab mixnode description info via reqwest and serve it (avoid mixed-content errors)
* serve it all over http
* dependency injection
* tests
* tests
63 changes: 63 additions & 0 deletions explorer-api/src/country_statistics/distribution.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use log::info;

use crate::country_statistics::country_nodes_distribution::CountryNodesDistribution;

use crate::state::ExplorerApiStateContext;

pub(crate) struct CountryStatisticsDistributionTask {
state: ExplorerApiStateContext,
}

impl CountryStatisticsDistributionTask {
pub(crate) fn new(state: ExplorerApiStateContext) -> Self {
CountryStatisticsDistributionTask { state }
}

pub(crate) fn start(mut self) {
info!("Spawning mix node country distribution task runner...");
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(std::time::Duration::from_secs(60 * 15)); // every 15 mins
loop {
// wait for the next interval tick
interval_timer.tick().await;
self.calculate_nodes_per_country().await;
}
});
}

/// Retrieves the current list of mixnodes from the validators and calculates how many nodes are in each country
async fn calculate_nodes_per_country(&mut self) {
let cache = self.state.inner.mix_nodes.get_location_cache().await;

let three_letter_iso_country_codes: Vec<String> = cache
.values()
.flat_map(|i| {
i.location
.as_ref()
.map(|j| j.three_letter_iso_country_code.clone())
})
.collect();

let mut distribution = CountryNodesDistribution::new();

info!("Calculating country distribution from located mixnodes...");
for three_letter_iso_country_code in three_letter_iso_country_codes {
*(distribution.entry(three_letter_iso_country_code)).or_insert(0) += 1;
}

// replace the shared distribution to be the new distribution
self.state
.inner
.country_node_distribution
.set_all(distribution)
.await;

info!(
mmsinclair marked this conversation as resolved.
Show resolved Hide resolved
"Mixnode country distribution done: {:?}",
self.state.inner.country_node_distribution.get_all().await
);

// keep state on disk, so that when this process dies it can start up again and users get some data
self.state.write_to_file().await;
}
}
139 changes: 139 additions & 0 deletions explorer-api/src/country_statistics/geolocate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use log::{info, warn};
use reqwest::Error as ReqwestError;
use thiserror::Error;

use crate::mix_nodes::{GeoLocation, Location};
use crate::state::ExplorerApiStateContext;

pub(crate) struct GeoLocateTask {
state: ExplorerApiStateContext,
}

impl GeoLocateTask {
pub(crate) fn new(state: ExplorerApiStateContext) -> Self {
GeoLocateTask { state }
}

pub(crate) fn start(mut self) {
info!("Spawning mix node locator task runner...");
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(std::time::Duration::from_millis(50));
loop {
// wait for the next interval tick
interval_timer.tick().await;
self.locate_mix_nodes().await;
}
});
}

async fn locate_mix_nodes(&mut self) {
let mixnode_bonds = self.state.inner.mix_nodes.get().await.value;

for (i, cache_item) in mixnode_bonds.values().enumerate() {
if self
.state
.inner
.mix_nodes
.is_location_valid(&cache_item.mix_node.identity_key)
.await
{
// when the cached location is valid, don't locate and continue to next mix node
continue;
}

// the mix node has not been located or is the cache time has expired
match locate(&cache_item.mix_node.host).await {
Ok(geo_location) => {
let location = Location::new(geo_location);

trace!(
"{} mix nodes already located. Ip {} is located in {:#?}",
i,
cache_item.mix_node.host,
location.three_letter_iso_country_code,
);

if i > 0 && (i % 100) == 0 {
info!(
"Located {} mixnodes...",
i + 1,
);
}

self.state
.inner
.mix_nodes
.set_location(&cache_item.mix_node.identity_key, Some(location))
.await;

// one node has been located, so return out of the loop
return;
mmsinclair marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => match e {
LocateError::ReqwestError(e) => warn!(
"❌ Oh no! Location for {} failed {}",
cache_item.mix_node.host, e
),
LocateError::NotFound(e) => {
warn!(
"❌ Location for {} not found. Response body: {}",
cache_item.mix_node.host, e
);
self.state
.inner
.mix_nodes
.set_location(&cache_item.mix_node.identity_key, None)
.await;
},
LocateError::RateLimited(e) => warn!(
"❌ Oh no, we've been rate limited! Location for {} failed. Response body: {}",
cache_item.mix_node.host, e
),
},
}
}

trace!("All mix nodes located");
}
}

#[derive(Debug, Error)]
enum LocateError {
#[error("Oops, we have made too many requests and are being rate limited. Request body: {0}")]
RateLimited(String),

#[error("Geolocation not found. Request body: {0}")]
NotFound(String),

#[error(transparent)]
ReqwestError(#[from] ReqwestError),
}

async fn locate(ip: &str) -> Result<GeoLocation, LocateError> {
let api_key = ::std::env::var("GEO_IP_SERVICE_API_KEY")
.expect("Env var GEO_IP_SERVICE_API_KEY is not set");
let uri = format!("{}/{}?apikey={}", crate::GEO_IP_SERVICE, ip, api_key);
match reqwest::get(uri.clone()).await {
Ok(response) => {
if response.status() == 429 {
return Err(LocateError::RateLimited(
response
.text()
.await
.unwrap_or_else(|_| "(the response body is empty)".to_string()),
));
}
if response.status() == 404 {
return Err(LocateError::NotFound(
response
.text()
.await
.unwrap_or_else(|_| "(the response body is empty)".to_string()),
));
}
let location = response.json::<GeoLocation>().await?;
Ok(location)
}
Err(e) => Err(LocateError::ReqwestError(e)),
}
}
96 changes: 2 additions & 94 deletions explorer-api/src/country_statistics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1,4 @@
use log::{info, trace, warn};
use reqwest::Error as ReqwestError;

use crate::country_statistics::country_nodes_distribution::CountryNodesDistribution;
use crate::mix_nodes::{GeoLocation, Location};
use crate::state::ExplorerApiStateContext;

pub mod country_nodes_distribution;
pub mod distribution;
pub mod geolocate;
pub mod http;

pub(crate) struct CountryStatistics {
state: ExplorerApiStateContext,
}

impl CountryStatistics {
pub(crate) fn new(state: ExplorerApiStateContext) -> Self {
CountryStatistics { state }
}

pub(crate) fn start(mut self) {
info!("Spawning task runner...");
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(std::time::Duration::from_secs(60 * 60));
loop {
// wait for the next interval tick
interval_timer.tick().await;

info!("Running task...");
self.calculate_nodes_per_country().await;
info!("Done");
}
});
}

/// Retrieves the current list of mixnodes from the validators and calculates how many nodes are in each country
async fn calculate_nodes_per_country(&mut self) {
// force the mixnode cache to invalidate
let mixnode_bonds = self.state.inner.mix_nodes.refresh_and_get().await.value;

let mut distribution = CountryNodesDistribution::new();

info!("Locating mixnodes...");
for (i, cache_item) in mixnode_bonds.values().enumerate() {
match locate(&cache_item.bond.mix_node.host).await {
Ok(geo_location) => {
let location = Location::new(geo_location);

*(distribution.entry(location.three_letter_iso_country_code.to_string()))
.or_insert(0) += 1;

trace!(
"Ip {} is located in {:#?}",
cache_item.bond.mix_node.host,
location.three_letter_iso_country_code,
);

self.state
.inner
.mix_nodes
.set_location(&cache_item.bond.mix_node.identity_key, location)
.await;

if (i % 100) == 0 {
info!(
"Located {} mixnodes in {} countries",
i + 1,
distribution.len()
);
}
}
Err(e) => warn!("❌ Oh no! Location failed {}", e),
}
}

// replace the shared distribution to be the new distribution
self.state
.inner
.country_node_distribution
.set_all(distribution)
.await;

info!(
"Locating mixnodes done: {:?}",
self.state.inner.country_node_distribution.get_all().await
);

// keep state on disk, so that when this process dies it can start up again and users get some data
self.state.write_to_file().await;
}
}

async fn locate(ip: &str) -> Result<GeoLocation, ReqwestError> {
let response = reqwest::get(format!("{}{}", crate::GEO_IP_SERVICE, ip)).await?;
let location = response.json::<GeoLocation>().await?;
Ok(location)
}
9 changes: 7 additions & 2 deletions explorer-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod mix_nodes;
mod ping;
mod state;

const GEO_IP_SERVICE: &str = "https://freegeoip.app/json/";
const GEO_IP_SERVICE: &str = "https://api.freegeoip.app/json";

#[tokio::main]
async fn main() {
Expand All @@ -36,7 +36,12 @@ impl ExplorerApi {
info!("Explorer API starting up...");

// spawn concurrent tasks
country_statistics::CountryStatistics::new(self.state.clone()).start();
mix_nodes::tasks::MixNodesTasks::new(self.state.clone()).start();
country_statistics::distribution::CountryStatisticsDistributionTask::new(
self.state.clone(),
)
.start();
country_statistics::geolocate::GeoLocateTask::new(self.state.clone()).start();
http::start(self.state.clone());

// wait for user to press ctrl+C
Expand Down
Loading