Skip to content

Commit

Permalink
Listen to chain events and update twin on requests (#194)
Browse files Browse the repository at this point in the history
* Listen to chain events and update twin on requests

* Add retry connections to chain

* Make listener not fail on decode errors and db errors

* Update twin relays

* Use if let instead of match to check Err

* run clippy

---------

Co-authored-by: Muhamad Azamy <muhamad@incubaid.com>
  • Loading branch information
AbdelrahmanElawady and muhamadazmy authored Apr 5, 2024
1 parent f35d685 commit 610d871
Show file tree
Hide file tree
Showing 18 changed files with 227 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ url = "2.3.1"
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3.25"
jwt = "0.16"
subxt = "0.28.0"
subxt = { version = "0.28.0", features = ["substrate-compat"]}
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
itertools = "0.11"

# for static build
Expand Down
3 changes: 3 additions & 0 deletions _tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ impl TwinDB for InMemoryDB {
) -> anyhow::Result<Option<u32>> {
unimplemented!()
}
async fn set_twin(&self, twin: Twin) -> anyhow::Result<()> {
unimplemented!()
}
}

fn new_message(
Expand Down
Binary file added artifacts/network.scale
Binary file not shown.
2 changes: 2 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ message Envelope {
bytes plain = 13;
bytes cipher = 14;
}

repeated string relays = 17;
}
11 changes: 4 additions & 7 deletions src/bins/rmb-peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;

use anyhow::{Context, Result};
use clap::{builder::ArgAction, Args, Parser};
Expand Down Expand Up @@ -151,12 +150,10 @@ async fn app(args: Params) -> Result<()> {

// cache is a little bit tricky because while it improves performance it
// makes changes to twin data takes at least 5 min before they are detected
let db = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(60)),
)
.await
.context("cannot create substrate twin db object")?;
let db =
SubstrateTwinDB::<RedisCache>::new(args.substrate, RedisCache::new(pool.clone(), "twin"))
.await
.context("cannot create substrate twin db object")?;

let id = db
.get_twin_with_account(signer.account())
Expand Down
23 changes: 15 additions & 8 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use clap::{builder::ArgAction, Parser};
use rmb::cache::RedisCache;
use rmb::events;
use rmb::redis;
use rmb::relay::{
self,
Expand Down Expand Up @@ -142,14 +143,11 @@ async fn app(args: Args) -> Result<()> {
.await
.context("failed to initialize redis pool")?;

// we use 6 hours cache for twin information because twin id will not change anyway
// and we only need twin public key for validation only.
let twins = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(args.cache * 60)),
)
.await
.context("cannot create substrate twin db object")?;
let redis_cache = RedisCache::new(pool.clone(), "twin");

let twins = SubstrateTwinDB::<RedisCache>::new(args.substrate.clone(), redis_cache.clone())
.await
.context("cannot create substrate twin db object")?;

let max_users = args.workers as usize * args.user_per_worker as usize;
let opt = relay::SwitchOptions::new(pool.clone())
Expand All @@ -175,6 +173,15 @@ async fn app(args: Args) -> Result<()> {
let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter, ranker)
.await
.unwrap();

let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
l.listen()
.await
.context("failed to listen to chain events")
.unwrap();
});

r.start(&args.listen).await.unwrap();
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions src/cache/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ where
Some(v) => Ok(Some(v.clone())),
}
}
async fn flush(&self) -> Result<()> {
let mut mem = self.mem.write().await;
mem.clear();

Ok(())
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::marker::{Send, Sync};
pub trait Cache<T>: Send + Sync + 'static {
async fn set<S: ToString + Send + Sync>(&self, id: S, obj: T) -> Result<()>;
async fn get<S: ToString + Send + Sync>(&self, id: S) -> Result<Option<T>>;
async fn flush(&self) -> Result<()>;
}

#[async_trait]
Expand All @@ -31,6 +32,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
match self {
Some(cache) => cache.flush().await,
None => Ok(()),
}
}
}

#[derive(Clone, Copy)]
Expand All @@ -47,4 +54,7 @@ where
async fn get<S: ToString + Send + Sync>(&self, _id: S) -> Result<Option<T>> {
Ok(None)
}
async fn flush(&self) -> Result<()> {
Ok(())
}
}
34 changes: 16 additions & 18 deletions src/cache/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use super::Cache;

use anyhow::{Context, Result};
Expand All @@ -22,19 +20,13 @@ use serde::{de::DeserializeOwned, Serialize};
pub struct RedisCache {
pool: Pool<RedisConnectionManager>,
prefix: String,
ttl: Duration,
}

impl RedisCache {
pub fn new<P: Into<String>>(
pool: Pool<RedisConnectionManager>,
prefix: P,
ttl: Duration,
) -> Self {
pub fn new<P: Into<String>>(pool: Pool<RedisConnectionManager>, prefix: P) -> Self {
Self {
pool,
prefix: prefix.into(),
ttl,
}
}

Expand All @@ -57,22 +49,23 @@ where
async fn set<S: ToString + Send + Sync>(&self, key: S, obj: T) -> Result<()> {
let mut conn = self.get_connection().await?;
let obj = serde_json::to_vec(&obj).context("unable to serialize twin object for redis")?;
let key = format!("{}.{}", self.prefix, key.to_string());
cmd("SET")
.arg(key)
cmd("HSET")
.arg(&self.prefix)
.arg(key.to_string())
.arg(obj)
.arg("EX")
.arg(self.ttl.as_secs())
.query_async(&mut *conn)
.await?;

Ok(())
}
async fn get<S: ToString + Send + Sync>(&self, key: S) -> Result<Option<T>> {
let mut conn = self.get_connection().await?;
let key = format!("{}.{}", self.prefix, key.to_string());

let ret: Option<Vec<u8>> = cmd("GET").arg(key).query_async(&mut *conn).await?;
let ret: Option<Vec<u8>> = cmd("HGET")
.arg(&self.prefix)
.arg(key.to_string())
.query_async(&mut *conn)
.await?;

match ret {
Some(val) => {
Expand All @@ -84,6 +77,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;

Ok(())
}
}

#[cfg(test)]
Expand All @@ -93,7 +92,6 @@ mod tests {
use super::*;

const PREFIX: &str = "twin";
const TTL: u64 = 20;

async fn create_redis_cache() -> RedisCache {
let manager = RedisConnectionManager::new("redis://127.0.0.1/")
Expand All @@ -105,7 +103,7 @@ mod tests {
.context("unable to build pool or redis connection manager")
.unwrap();

RedisCache::new(pool, PREFIX, Duration::from_secs(TTL))
RedisCache::new(pool, PREFIX)
}

#[tokio::test]
Expand Down
105 changes: 105 additions & 0 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::{collections::LinkedList, time::Duration};

use crate::{cache::Cache, tfchain::tfchain, twin::Twin};
use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};

#[derive(Clone)]
pub struct Listener<C>
where
C: Cache<Twin>,
{
cache: C,
api: OnlineClient<PolkadotConfig>,
substrate_urls: LinkedList<String>,
}

impl<C> Listener<C>
where
C: Cache<Twin> + Clone,
{
pub async fn new(substrate_urls: Vec<String>, cache: C) -> Result<Self> {
let mut urls = LinkedList::from_iter(substrate_urls);

let api = Self::connect(&mut urls).await?;

cache.flush().await?;
Ok(Listener {
api,
cache,
substrate_urls: urls,
})
}

async fn connect(urls: &mut LinkedList<String>) -> Result<OnlineClient<PolkadotConfig>> {
let trials = urls.len() * 2;
for _ in 0..trials {
let url = match urls.front() {
Some(url) => url,
None => anyhow::bail!("substrate urls list is empty"),
};

match OnlineClient::<PolkadotConfig>::from_url(url).await {
Ok(client) => return Ok(client),
Err(err) => {
log::error!(
"failed to create substrate client with url \"{}\": {}",
url,
err
);
}
}

if let Some(front) = urls.pop_front() {
urls.push_back(front);
}
}

anyhow::bail!("failed to connect to substrate using the provided urls")
}

pub async fn listen(&mut self) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = self.cache.flush().await {
log::error!("failed to flush redis cache {}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
if let Err(err) = self.handle_events().await {
log::error!("error listening to events {}", err);
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
}
}
}
}

async fn handle_events(&self) -> Result<()> {
log::info!("started chain events listener");
let mut blocks_sub = self.api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
let events = block?.events().await?;
for evt in events.iter() {
let evt = match evt {
Err(err) => {
log::error!("failed to decode event {}", err);
continue;
}
Ok(e) => e,
};
if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinStored>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
} else if let Ok(Some(twin)) =
evt.as_event::<tfchain::tfgrid_module::events::TwinUpdated>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
}
}
}
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ extern crate anyhow;
extern crate mime;

pub mod cache;
pub mod events;
pub mod identity;
pub mod peer;
pub mod redis;
pub mod relay;
pub mod tfchain;
pub mod token;
pub mod twin;
pub mod types;
Loading

0 comments on commit 610d871

Please sign in to comment.