Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,38 @@
# example-rust
# Examples

This repository houses an example Alvarium Publisher and Receiver application split into 2 packages.
They can be run from the same machine or not, but they are 2 independent packages that encompass
the lifecycle of a data source annotating and being scored on the other end.

## Publishing
The publisher package provides an example of an application that produces data from 2 mock sensors.
This data is generated randomly and produces an sdk instance that contains a set of core annotators
from the Alvarium rust [SDK](https://github.com/project-alvarium/alvarium-sdk-rust). Additionally, it creates
a custom Annotator that checks if the values generated are within a specific threshold range. This
Annotator implements the [Annotator](https://github.com/project-alvarium/alvarium-annotator/blob/main/src/annotator.rs#L3)
trait, and is compatible with the sdk as a result.

Data is transported through the Demia Distributed Oracle Network (DON for short) which uses the IOTA
distributed ledger protocol at its foundation, providing auditability and immutability.

To run this example, simply navigate to the alvarium_demo_pub directory and run
```
cargo run --release
```

The `--release` flag will ensure that the PoW for the publishing is conducted more efficiently than in dev mode.

_**Note:**_ _*Configurations are set so that the "provider" is the local application. A demia based streams
implementation is mocked, but a proper oracle would be used in production and the configs would be updated
accordingly to reflect the oracle address*_

## Subscribing
The subscriber package provides an example of a scoring application that retrieves messages from the publisher
channel, and proceeds to locally store and sort Readings and Annotations. It also spins up a localised web
application at port 8000 where a visualiser is provided that lets you navigate through the reading history and
see the evaluated scores and associated annotations for the data sources provided in the publishing process.

To run this example, simply navigate to the alvarium_demo_sub directory and run
```
cargo run --release
```
5 changes: 5 additions & 0 deletions alvarium_demo_pub/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/target
.*/

Cargo.lock
*.bin
25 changes: 25 additions & 0 deletions alvarium_demo_pub/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "alvarium_demo_pub"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
streams = { git = "https://github.com/demia-protocol/streams", branch = "develop", default-features = false, features = ["utangle-client", "did"] }
# match crypto library with streams crypto
iota-crypto = {version = "0.15.3", features = ["ed25519", "sha", "rand", "random"]}
alvarium-annotator = { git = "https://github.com/project-alvarium/alvarium-annotator" }
alvarium-sdk-rust = { git = "https://github.com/project-alvarium/alvarium-sdk-rust" }
gethostname = "0.4.3"
serde = "1.0.164"
serde_json = "1.0.96"
hex = "0.4.3"
tokio = "1.28.2"
rand = "0.8.5"
hyper = { version = "0.14.26", features = ["server"] }
lazy_static = "1.4.0"
chrono = {version = "0.4.31", features = ["serde"] }
thiserror = "1.0.40"
log = "0.4.19"
fern = "0.6.2"
42 changes: 42 additions & 0 deletions alvarium_demo_pub/config/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"annotators": [
"pki",
"source",
"tls",
"threshold"
],
"hash": {
"type": "sha256"
},
"signature": {
"public": {
"type": "ed25519",
"path": "./config/keys/public.key"
},
"private": {
"type": "ed25519",
"path": "./config/keys/private.key"
}
},
"stream": {
"type": "iota",
"config": {
"provider": {
"host": "localhost",
"protocol": "http",
"port": 8900
},
"tangle": {
"host": "nodes.02.demia-testing-domain.com",
"protocol": "http",
"port": 14102
},
"encoding": "utf-8",
"topic": "Publisher Unique Topic",
"backup": {
"path": "sdk_user.bin",
"password": "annotator user backup"
}
}
}
}
1 change: 1 addition & 0 deletions alvarium_demo_pub/config/keys/private.key
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
680455bcc4d5da7115dffd0a7b8d8683134fde24491056df89cac6e58ac7d37d
1 change: 1 addition & 0 deletions alvarium_demo_pub/config/keys/public.key
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a50130940d0e244d4a3815a5dd2e996770bd7811f557625fa87925189b5c8347
80 changes: 80 additions & 0 deletions alvarium_demo_pub/src/custom_annotator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::ops::Range;
use std::string::ToString;
use alvarium_annotator::{Annotation, Annotator, constants, derive_hash, serialise_and_sign};
use alvarium_annotator::constants::AnnotationType;
use alvarium_sdk_rust::{config, factories::new_hash_provider, providers::sign_provider::SignatureProviderWrap};
use alvarium_sdk_rust::config::Signable;
use alvarium_sdk_rust::factories::new_signature_provider;
use crate::mock_sensor::SensorReading;
use crate::errors::Result;


lazy_static! {
/// Annotation Type definition
pub static ref ANNOTATION_THRESHOLD: AnnotationType = AnnotationType("threshold".to_string());
}

/// Defines a new annotator type that will implement the Annotator trait
pub struct ThresholdAnnotator {
/// Hashing algorithm used for checksums
hash: constants::HashType,
/// Type of annotation (a wrapper around a string definition)
kind: AnnotationType,
/// Signature provider for signing data
sign: SignatureProviderWrap,
/// Threshold limits for custom annotation
range: Range<u8>,
}

impl ThresholdAnnotator {
pub fn new(cfg: &config::SdkInfo, range: Range<u8>) -> Result<impl Annotator<Error = alvarium_sdk_rust::errors::Error>> {
Ok(ThresholdAnnotator {
hash: cfg.hash.hash_type.clone(),
kind: ANNOTATION_THRESHOLD.clone(),
sign: new_signature_provider(&cfg.signature)?,
range,
})
}

}

/// Implementation of the annotate() function for generating a threshold Annotation
impl Annotator for ThresholdAnnotator {
type Error = alvarium_sdk_rust::errors::Error;
fn annotate(&mut self, data: &[u8]) -> alvarium_sdk_rust::errors::Result<Annotation> {
let hasher = new_hash_provider(&self.hash)?;
let signable: std::result::Result<Signable, serde_json::Error> = serde_json::from_slice(data);
let key = match signable {
Ok(signable) => derive_hash(hasher, signable.seed.as_bytes()),
Err(_) => derive_hash(hasher, data),
};
match gethostname::gethostname().to_str() {
Some(host) => {
// For the sake of this example we will use both a signable and non signable reading
// So we will need to do a match to check which one
let reading: std::result::Result<SensorReading, serde_json::Error> = serde_json::from_slice(data);
let within_threshold = match reading {
Ok(reading) => reading.value <= self.range.end && reading.value >= self.range.start,
Err(_) => {
let signable: std::result::Result<Signable, serde_json::Error> = serde_json::from_slice(data);
match signable {
Ok(signable) => {
let reading: SensorReading = serde_json::from_str(&signable.seed).unwrap();
reading.value <= self.range.end && reading.value >= self.range.start
}
Err(_) => false
}
}
};

let mut annotation = Annotation::new(&key, self.hash.clone(), host, self.kind.clone(), within_threshold);
let signature = serialise_and_sign(&self.sign, &annotation)?;
annotation.with_signature(&signature);
Ok(annotation)
},
None => {
Err(alvarium_sdk_rust::errors::Error::NoHostName.into())
}
}
}
}
32 changes: 32 additions & 0 deletions alvarium_demo_pub/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use thiserror::Error;
pub type Result<T> = core::result::Result<T, Error>;


#[derive(Debug, Error)]
pub enum Error {
#[error("Fern error: {0}")]
LoggerSetupError(log::SetLoggerError),
#[error("Core Alvarium error: {0}")]
AlvariumCoreError(alvarium_annotator::Error),

#[error("Alvarium error: {0}")]
AlvariumSdkError(alvarium_sdk_rust::errors::Error),
}

impl From<alvarium_annotator::Error> for Error {
fn from(e: alvarium_annotator::Error) -> Self {
Error::AlvariumCoreError(e)
}
}

impl From<alvarium_sdk_rust::errors::Error> for Error {
fn from(e: alvarium_sdk_rust::errors::Error) -> Self {
Error::AlvariumSdkError(e)
}
}

impl From<Error> for alvarium_sdk_rust::errors::Error {
fn from(e: Error) -> Self {
alvarium_sdk_rust::errors::Error::External(Box::new(e))
}
}
124 changes: 124 additions & 0 deletions alvarium_demo_pub/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::net::SocketAddr;
use std::sync::{Arc};
use hyper::{Body, header, Request, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use streams::{Address, User};
use streams::id::{Permissioned, Psk};
use streams::transport::utangle::Client;
use crate::BASE_TOPIC;
use std::str::FromStr;
use tokio::sync::Mutex;


type GenericError = Box<dyn std::error::Error + Send + Sync>;

/// Starts an http server for receiving subscription requests
pub async fn start(user: Arc<Mutex<User<Client>>>) -> Result<(), GenericError> {
let addr = SocketAddr::from(([0, 0, 0, 0], 8900));

let service = make_service_fn(move |_| {
let user = user.clone();
async {
Ok::<_, GenericError>(service_fn(move |req| {
handle_request(req, user.clone())
}))
}
});

let server = Server::bind(&addr).serve(service);
server.await?;

Ok(())
}

// Handler to manage the get_announcement_id() and subscribe() api calls
async fn handle_request(req: Request<Body>, user: Arc<Mutex<User<Client>>>) -> Result<Response<Body>, GenericError> {
match req.uri().path() {
// Returns the announcement id of the stream created by the publisher instance
"/get_announcement_id" => {
// Generate a new announcement ID and send it through the channel.
let announcement_id = user.lock().await.stream_address().unwrap();

#[derive(serde::Serialize, serde::Deserialize)]
struct AnnouncementResponse {
announcement_id: String
}

let announcement = serde_json::to_vec(&AnnouncementResponse {
announcement_id: announcement_id.to_string()
}).unwrap();

// Respond with the announcement ID.
let response = Response::new(Body::from(announcement));
Ok(response)
},
// Adds subscriber to publisher
"/subscribe" => subscribe_response(req, user).await,
_ => {
// Respond with a 404 Not Found for other paths.
let response = Response::builder()
.status(404)
.body(Body::empty())
.unwrap();
Ok(response)
}
}
}

// Attempts to unpack a subscription request, if successful the subscription message will be
// retrieved from the distributed network, and once processed, a new branch will be created for the
pub async fn subscribe_response(
req: Request<Body>,
user: Arc<Mutex<User<Client>>>,
) -> Result<Response<Body>, GenericError> {
let data = hyper::body::to_bytes(req.into_body()).await?;

let response;
let json_data: serde_json::Result<SubscriptionRequest> = serde_json::from_slice(&data);
match json_data {
Ok(sub_req) => {
let mut user = user.lock().await;
let sub_address = Address::from_str(&sub_req.address).unwrap();
let msg = user.receive_message(sub_address).await.unwrap();
let sub = msg.as_subscription().unwrap();
let _ = user.new_branch(BASE_TOPIC, sub_req.topic.as_str()).await;
let psk = Psk::from_seed("A pre shared key seed").to_pskid();
let keyload = user.send_keyload(
sub_req.topic.as_str(),
vec![Permissioned::Admin(sub.subscriber_identifier.clone())],
vec![psk]
)
.await
.unwrap();

response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from("Subscription processed, keyload link: ".to_owned() + &keyload.address().to_string()))
.unwrap();
},
Err(e) => {
dbg!("Error in formatting: {:?}", e);
response = Response::builder()
.status(StatusCode::BAD_REQUEST)
.header(header::CONTENT_TYPE, "application/json")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from("Malformed json request"))
.unwrap();
}
}

Ok(response)
}


// Subscription Request as
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct SubscriptionRequest {
address: String,
identifier: String,
#[serde(rename="idType")]
id_type: u8,
topic: String,
}
28 changes: 28 additions & 0 deletions alvarium_demo_pub/src/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use log::LevelFilter;

pub fn init() -> crate::errors::Result<()> {
fern::Dispatch::new()
.format(|out, message, record| {

let source = format!("{}:{}", record.target(), record.line().unwrap_or_default());
let gap = if source.len() < 35 {
" ".repeat(35 - source.len())
} else {
" ".to_string()
};

out.finish(format_args!(
"[{} | {:6}| {}]{} {}",
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
record.level(),
source,
gap,
message
))
})
.level(LevelFilter::Info)
.chain(std::io::stdout())
.apply()
.map_err(|e| crate::errors::Error::LoggerSetupError(e))?;
Ok(())
}
Loading