Skip to content

Commit

Permalink
Refactor the library
Browse files Browse the repository at this point in the history
This commit refactors this library, almost from scratch, in order to:

1. Achieve more conventional and easier-to-use APIs;
2. More correct and flexible error handling;
3. Internal segmentation of batches inside of google publisher.

The initial motivation was originally 3, but then old way of implicit
error handling came up as a problem as well. I attempted to introduce
some mechanism to handle errors in a more granular way without adjusting
API of the library too much, but it turned out to be not very feasible.

The new API is centered around the new `hedwig::Message` trait which the
users would implement to associate data types with various information
that is necessary for the hewdig library – the topic name, the
validators applicable to the data, the encoding mechanisms, etc.
Furthermore, it now allows retaining all of auxiliary data alongside the
message data as well, giving further flexibility in how this library is
used by the users.

All in all, the trait was designed in a way that would allow to move the
message-encoding specific pieces of code away from the business logic as
much as possible. For now the implementation of this trait remains
manual, but there is nothing that would prevent it from being
`derive`able in some way.

---

The error handling was made entirely explicit in the new API. In
particular we no longer implicitly re-queue messages in the batch (nor
do we reuse the batches, anymore). Instead the users receive a `Stream`
of `(result, topic, validated message)`. They can then choose to `push`
the validated message back into a new batch they are constructing to
retry the message… or just handle the error in whatever other way that
makes sense to them.

One important improvement as part of this change is simplification of
behaviour when the futures/streams are cancelled. Previously dropping
the `publish` future would leave one in a fairly difficult to understand
state where the batch might have some of the messages re-queued and some
of them cancelled, etc.

Since there's no longer any state remaining in `hedwig`, dropping
streams to cancel starts making significantly more sense as well.

---

Google PubSub has various limitations on the API requests that are made
to its endpoints. As we are in charge of batching the requests up to
implement publishing more efficiently, it is also up to us to ensure
that we do not exceed any of the limits upstream has placed on us.

To do this we implement a batch segmentation algorithm in
`GoogleMessageSegmenter` that the GooglePubSubPublisher uses internally
to split up the messages.

Fixes #18
  • Loading branch information
nagisa committed Sep 15, 2020
1 parent 438f953 commit 8fb7ba8
Show file tree
Hide file tree
Showing 15 changed files with 1,492 additions and 1,171 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/hedwig.yml
Expand Up @@ -41,6 +41,10 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: doc
args: --all-features --manifest-path=Cargo.toml
env:
RUSTDOCFLAGS: --cfg docsrs
if: ${{ matrix.rust_toolchain == 'nightly' }}
- name: Build without features
uses: actions-rs/cargo@v1
with:
Expand Down
35 changes: 19 additions & 16 deletions Cargo.toml
Expand Up @@ -15,36 +15,39 @@ keywords = ["pubsub", "messagebus", "microservices"]
categories = ["asynchronous", "web-programming"]

[badges]
travis-ci = { repository = "standard-ai/hedwig-rust" }
maintenance = { status = "actively-developed" }

[features]
default = []
google = ["base64", "yup-oauth2", "hyper", "http"]
# Publishers
google = ["base64", "yup-oauth2", "hyper", "http", "serde_json", "serde", "serde/derive"]
# Validators
json-schema = ["valico", "serde_json", "serde"]
protobuf = ["prost"]

[[example]]
name = "publish"
required-features = ["google"]
required-features = ["google", "json-schema"]

[dependencies]
futures = "0.3"
serde_json = "^1"
serde = { version = "^1.0", features = ["derive"] }
thiserror = "1"
url = "2"
uuid = { version = "^0.8", features = ["serde", "v4"] }
valico = { version = "^3.2" }
futures-util = { version = "0.3", features = ["std"], default-features = false }
thiserror = { version = "1", default-features = false }
url = { version = "2", default-features = false }
uuid = { version = "^0.8", features = ["v4"], default-features = false }

base64 = { version = "^0.12", optional = true }
http = { version = "^0.2", optional = true }
hyper = { version = "^0.13.1", optional = true }
yup-oauth2 = { version = "4", optional = true }
serde = { version = "^1.0", optional = true, default-features = false }
serde_json = { version = "^1", features = ["std"], optional = true, default-features = false }
valico = { version = "^3.2", optional = true, default-features = false }
base64 = { version = "^0.12", optional = true, default-features = false }
http = { version = "^0.2", optional = true, default-features = false }
hyper = { version = "^0.13.1", optional = true, default-features = false }
yup-oauth2 = { version = "4", optional = true, default-features = false }
prost = { version = "0.6", optional = true, default-features = false }

[dev-dependencies]
hyper-tls = "0.4.0"
strum = "^0.18"
strum_macros = "^0.18"
tokio = { version = "^0.2.4", features = ["macros"] }
serde = { version = "1", features = ["derive"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
94 changes: 46 additions & 48 deletions examples/publish.rs
@@ -1,29 +1,37 @@
use std::env;
use std::sync::Arc;

use hedwig::{
publishers::GooglePubSubPublisher, Hedwig, MajorVersion, Message, MinorVersion, Version,
};
use serde::Serialize;
use strum_macros::IntoStaticStr;

#[derive(Clone, Copy, Debug, IntoStaticStr, Hash, PartialEq, Eq)]
pub enum MessageType {
#[strum(serialize = "user.created")]
UserCreated,
}

#[derive(Serialize)]
struct UserCreatedData {
use futures_util::stream::StreamExt;
use hedwig::{publishers::GooglePubSubPublisher, Headers, Message, Publisher};
use std::{env, sync::Arc, time::SystemTime};

#[derive(serde::Serialize)]
struct UserCreatedMessage {
#[serde(skip)]
uuid: uuid::Uuid,
user_id: String,
}

const VERSION_1_0: Version = Version(MajorVersion(1), MinorVersion(0));
impl<'a> Message for &'a UserCreatedMessage {
type Error = hedwig::validators::JsonSchemaValidatorError;
type Validator = hedwig::validators::JsonSchemaValidator;
fn topic(&self) -> &'static str {
"user.created"
}
fn encode(self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator
.validate(
self.uuid,
SystemTime::now(),
"https://hedwig.corp/schema#/schemas/user.created/1.0",
Headers::new(),
self,
)
.unwrap())
}
}

const PUBLISHER: &str = "myapp";

const SCHEMA: &str = r#"{
"$id": "https://hedwig.standard.ai/schema",
"$id": "https://hedwig.corp/schema",
"$schema": "https://json-schema.org/draft-04/schema#",
"description": "Example Schema",
"schemas": {
Expand All @@ -39,7 +47,7 @@ const SCHEMA: &str = r#"{
],
"properties": {
"user_id": {
"$ref": "https://hedwig.standard.ai/schema#/definitions/UserId/1.0"
"$ref": "https://hedwig.corp/schema#/definitions/UserId/1.0"
}
}
}
Expand All @@ -54,13 +62,6 @@ const SCHEMA: &str = r#"{
}
}"#;

fn router(t: MessageType, v: MajorVersion) -> Option<&'static str> {
match (t, v) {
(MessageType::UserCreated, MajorVersion(1)) => Some("dev-user-created-v1"),
_ => None,
}
}

async fn run() -> Result<(), Box<dyn std::error::Error + 'static>> {
let google_project =
env::var("GOOGLE_CLOUD_PROJECT").expect("env var GOOGLE_CLOUD_PROJECT is required");
Expand All @@ -71,29 +72,26 @@ async fn run() -> Result<(), Box<dyn std::error::Error + 'static>> {
.expect("$GOOGLE_APPLICATION_CREDENTIALS is not a valid service account key");

let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());
let authenticator = Arc::new(yup_oauth2::ServiceAccountAuthenticator::builder(secret)
.hyper_client(client.clone())
.build()
.await
.expect("could not create an authenticator"));

let publisher = GooglePubSubPublisher::new(google_project, client, authenticator);

let hedwig = Hedwig::new(SCHEMA, PUBLISHER, publisher, router)?;

let data = UserCreatedData {
let authenticator = Arc::new(
yup_oauth2::ServiceAccountAuthenticator::builder(secret)
.hyper_client(client.clone())
.build()
.await
.expect("could not create an authenticator"),
);

let publisher = GooglePubSubPublisher::new(PUBLISHER, google_project, client, authenticator);
let validator = hedwig::validators::JsonSchemaValidator::new(SCHEMA).unwrap();
let message = UserCreatedMessage {
uuid: uuid::Uuid::new_v4(),
user_id: "U_123".into(),
};

let message_id = uuid::Uuid::new_v4();
let mut builder = hedwig.build_batch();
builder.message(
Message::new(MessageType::UserCreated, VERSION_1_0, data)
.id(message_id)
.header("request_id", uuid::Uuid::new_v4().to_string()),
)?;

println!("Published messages {:?}", builder.publish().await?);
let topic = Message::topic(&&message);
let validated = message.encode(&validator).unwrap();
let mut publish = publisher.publish(topic, [validated].iter());
while let Some(r) = publish.next().await {
println!("publish result: {:?}", r?);
}

Ok(())
}
Expand Down

0 comments on commit 8fb7ba8

Please sign in to comment.