Skip to content

Commit

Permalink
We are asynchronous now
Browse files Browse the repository at this point in the history
  • Loading branch information
nagisa committed Feb 11, 2020
1 parent b1b4425 commit ca11c1b
Show file tree
Hide file tree
Showing 6 changed files with 644 additions and 428 deletions.
25 changes: 12 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,33 @@ maintenance = { status = "actively-developed" }

[features]
default = []
google = ["base64", "google-pubsub1", "yup-oauth2"]
mock = []
google = ["base64", "yup-oauth2", "hyper", "http"]

[[example]]
name = "publish"
required-features = ["google"]

[dependencies]
base64 = { version = "^0.10", optional = true }
thiserror = "1"
google-pubsub1 = { version = "^1.0.8", optional = true }
# This project intentionally uses an old version of Hyper. See
# https://github.com/Byron/google-apis-rs/issues/173 for more
# information.
hyper = "^0.10"
hyper-rustls = "^0.6"
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
url = "^1.7"
serde_json = "^1"
uuid = { version = "^0.7", features = ["serde", "v4"] }
valico = { version = "^3.1" }
yup-oauth2 = { version = "^1.0", optional = true }
valico = { version = "^3.2" }
url = "2"
futures = "0.3"

base64 = { version = "^0.10", optional = true }
yup-oauth2 = { version = "4", optional = true }
hyper = { version = "0.13.2", optional = true }
http = { version = "0.2", optional = true }

[dev-dependencies]
assert_matches = "^1.3"
rust-embed="^4.3"
strum = "^0.15"
strum_macros = "^0.15"
hyper-openssl = "0.8.0"
tokio = "0.2"

[package.metadata.docs.rs]
all-features = true
72 changes: 51 additions & 21 deletions examples/publish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::env;

use hedwig::{GooglePublisher, Hedwig, MajorVersion, Message, MinorVersion, Version};
use hedwig::{
publishers::GooglePubSubPublisher, Hedwig, MajorVersion, Message, MinorVersion, Version,
};
use serde::Serialize;
use strum_macros::IntoStaticStr;

Expand All @@ -19,21 +21,7 @@ const VERSION_1_0: Version = Version(MajorVersion(1), MinorVersion(0));

const PUBLISHER: &str = "myapp";

fn router(t: MessageType, v: MajorVersion) -> Option<&'static str> {
match (t, v) {
(MessageType::UserCreated, MajorVersion(1)) => Some("dev-user-created-v1"),
_ => None,
}
}

fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
let google_credentials = env::var("GOOGLE_APPLICATION_CREDENTIALS")
.expect("env var GOOGLE_APPLICATION_CREDENTIALS is required");
let google_project =
env::var("GOOGLE_CLOUD_PROJECT").expect("env var GOOGLE_CLOUD_PROJECT is required");

let schema = r#"
{
const SCHEMA: &str = r#"{
"$id": "https://hedwig.standard.ai/schema",
"$schema": "https://json-schema.org/draft-04/schema#",
"description": "Example Schema",
Expand Down Expand Up @@ -65,24 +53,66 @@ fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
}
}"#;

let publisher = GooglePublisher::new(google_credentials, google_project)?;
fn router(t: MessageType, v: MajorVersion) -> Option<&'static str> {
match (t, v) {
(MessageType::UserCreated, MajorVersion(1)) => Some("dev-user-created-v1"),
_ => None,
}
}

async fn run() -> Result<(), Box<dyn std::error::Error + 'static>> {
let google_project =
env::var("GOOGLE_CLOUD_PROJECT").expect("env var GOOGLE_CLOUD_PROJECT is required");
let google_credentials = env::var("GOOGLE_APPLICATION_CREDENTIALS")
.expect("env var GOOGLE_APPLICATION_CREDENTIALS is required");
let secret = yup_oauth2::read_service_account_key(google_credentials)
.await
.expect("$GOOGLE_APPLICATION_CREDENTIALS is not a valid service account key");

let client = hyper::Client::builder().build(hyper_openssl::HttpsConnector::new()?);
let authenticator = yup_oauth2::ServiceAccountAuthenticator::builder(secret)
.hyper_client(client.clone())
.build()
.await
.expect("could not create an authenticator");

let publisher = GooglePubSubPublisher::new(google_project, client, authenticator);

let hedwig = Hedwig::new(schema, PUBLISHER, publisher, router)?;
let hedwig = Hedwig::new(SCHEMA, PUBLISHER, publisher, router)?;

let data = UserCreatedData {
user_id: "U_123".into(),
};

let message_id = uuid::Uuid::new_v4();
let mut builder = hedwig.build_publish();
let mut builder = hedwig.build_batch();
builder.message(
Message::new(MessageType::UserCreated, VERSION_1_0, data)
.id(message_id)
.header("request_id", uuid::Uuid::new_v4().to_string()),
)?;
builder.publish()?;

println!("Published message {}", message_id);
println!("Published messages {:?}", builder.publish().await?);

Ok(())
}

fn main() {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("runtime builds");
match rt.block_on(run()) {
Ok(_) => std::process::exit(0),
Err(e) => {
eprintln!("error: {}", e);
let mut source = e.source();
while let Some(src) = source {
eprintln!(" caused by: {}", src);
source = src.source();
}
std::process::exit(1);
}
}
}
208 changes: 208 additions & 0 deletions src/google_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use base64;
use futures::{TryFutureExt, TryStreamExt};
use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc, task};
use yup_oauth2::authenticator::Authenticator;

use crate::{PublisherResult, ValidatedMessage};

#[derive(Debug, thiserror::Error)]
enum GooglePubsubError {
#[error("Could not get authentication token")]
GetAuthToken(#[source] yup_oauth2::Error),
#[error("Could not POST the request with messages")]
PostMessages(#[source] hyper::Error),
#[error("Could not construct the request URI")]
ConstructRequestUri(#[source] http::uri::InvalidUri),
#[error("Could not construct the request")]
ConstructRequest(#[source] http::Error),
#[error("Could not serialize a publish request")]
SerializePublishRequest(#[source] serde_json::Error),
#[error("Publish request failed with status code {1}")]
ResponseStatus(
#[source] Option<Box<dyn std::error::Error + Sync + Send>>,
http::StatusCode,
),
#[error("Could not parse the response body")]
ResponseParse(#[source] serde_json::Error),
#[error("Could not receive the response body")]
ResponseBodyReceive(#[source] hyper::Error),
}

/// A publisher that uses the Google Cloud Pub/Sub service as a message transport
///
/// # Examples
///
/// ```no_run
/// async {
/// let google_project =
/// std::env::var("GOOGLE_CLOUD_PROJECT").unwrap();
/// let google_credentials = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").unwrap();
/// let secret = yup_oauth2::read_service_account_key(google_credentials)
/// .await
/// .expect("$GOOGLE_APPLICATION_CREDENTIALS is not a valid service account key");
/// let client = hyper::Client::builder().build(hyper_openssl::HttpsConnector::new()?);
/// let authenticator = yup_oauth2::ServiceAccountAuthenticator::builder(secret)
/// .hyper_client(client.clone())
/// .build()
/// .await
/// .expect("could not create an authenticator");
/// Ok::<_, Box<dyn std::error::Error>>(
/// hedwig::publishers::GooglePubSubPublisher::new(google_project, client, authenticator)
/// )
/// };
/// ```
#[allow(missing_debug_implementations)]
pub struct GooglePubSubPublisher<C>(Arc<PublisherInner<C>>);

struct PublisherInner<C> {
google_cloud_project: Cow<'static, str>,
client: hyper::Client<C>,
authenticator: Authenticator<C>,
}

impl<C> GooglePubSubPublisher<C> {
/// Create a new Google Cloud Pub/Sub publisher
pub fn new<P>(
project: P,
client: hyper::Client<C>,
authenticator: Authenticator<C>,
) -> GooglePubSubPublisher<C>
where
P: Into<Cow<'static, str>>,
{
GooglePubSubPublisher(Arc::new(PublisherInner {
google_cloud_project: project.into(),
client,
authenticator,
}))
}
}

impl<C> crate::Publisher for GooglePubSubPublisher<C>
where
C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
{
type MessageId = String;
type PublishFuture = GooglePubSubPublishFuture;

fn publish(&self, topic: &'static str, messages: Vec<ValidatedMessage>) -> Self::PublishFuture {
let arc = self.0.clone();
GooglePubSubPublishFuture(Box::pin(async move {
let result = async {
const AUTH_SCOPES: [&str; 1] = ["https://www.googleapis.com/auth/pubsub"];
let token = arc
.authenticator
.token(&AUTH_SCOPES)
.await
.map_err(GooglePubsubError::GetAuthToken)?;
let uri = http::Uri::from_maybe_shared(format!(
"https://pubsub.googleapis.com/v1/projects/{0}/topics/{1}:publish",
arc.google_cloud_project, topic
))
.map_err(GooglePubsubError::ConstructRequestUri)?;
let data = serde_json::to_vec(&PubsubPublishRequestSchema {
messages: &messages,
})
.map_err(GooglePubsubError::SerializePublishRequest)?;
let request = http::Request::post(uri)
.header(
http::header::AUTHORIZATION,
format!("Bearer {}", token.as_str()),
)
.header(http::header::ACCEPT, "application/json")
.body(hyper::Body::from(data))
.map_err(GooglePubsubError::ConstructRequest)?;
let response = arc
.client
.request(request)
.map_err(GooglePubsubError::PostMessages)
.await?;
let (parts, body) = response.into_parts();
let body_data = body
.map_ok(|v| v.to_vec())
.try_concat()
.map_err(GooglePubsubError::ResponseBodyReceive)
.await?;
if !parts.status.is_success() {
let src = serde_json::from_slice(&body_data)
.ok()
.map(|v: PubsubPublishFailResponseSchema| v.error.message.into());
return Err(GooglePubsubError::ResponseStatus(src, parts.status));
}
let rsp: PubsubPublishResponseSchema =
serde_json::from_slice(&body_data).map_err(GooglePubsubError::ResponseParse)?;
Ok(rsp)
}
.await;
match result {
Ok(PubsubPublishResponseSchema { message_ids }) => {
PublisherResult::Success(message_ids)
}
Err(e) => PublisherResult::OneError(e.into(), messages),
}
}))
}
}

/// The `GooglePubSubPublisher::publish` future
pub struct GooglePubSubPublishFuture(
Pin<Box<dyn Future<Output = PublisherResult<String>> + Send + 'static>>,
);

impl Future for GooglePubSubPublishFuture {
type Output = PublisherResult<String>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}

/// Schema for the Google PubsubMessage REST API type
#[derive(serde::Serialize)]
struct PubsubMessageSchema<'a> {
data: &'a str,
attributes: &'a crate::Headers,
}

/// Schema for the Google PubsubRequest REST API type
#[derive(serde::Serialize)]
struct PubsubPublishRequestSchema<'a> {
#[serde(serialize_with = "serialize_validated_messages")]
messages: &'a [ValidatedMessage],
}

fn serialize_validated_messages<S: serde::Serializer>(
msgs: &[ValidatedMessage],
serializer: S,
) -> Result<S::Ok, S::Error> {
let mut seq = serializer.serialize_seq(Some(msgs.len()))?;
for element in msgs {
// Would also be happy with `S::to_string(&element)` if it was a thing...?
let raw_message = base64::encode(&serde_json::to_string(&element).expect("welp"));
serde::ser::SerializeSeq::serialize_element(
&mut seq,
&PubsubMessageSchema {
data: &raw_message,
attributes: &element.metadata.headers,
},
)?;
}
serde::ser::SerializeSeq::end(seq)
}

/// Schema for the Google PubsubResponse REST API type
#[derive(serde::Deserialize)]
struct PubsubPublishResponseSchema {
#[serde(rename = "messageIds")]
message_ids: Vec<String>,
}

#[derive(serde::Deserialize)]
struct PubsubPublishFailResponseSchema {
error: PubsubPublishErrorSchema,
}

#[derive(serde::Deserialize)]
struct PubsubPublishErrorSchema {
message: String,
}
Loading

0 comments on commit ca11c1b

Please sign in to comment.