Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: use json payloads #34

Merged
merged 11 commits into from Aug 1, 2022
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -21,10 +21,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
other instances to do the same. Also makes `Client::wait_network_initialized`
private, as it is included in `Client::new_and_init` now. See [PR 25].

- Use a JSON Payload instead of string in `publish` and `subscribe`. See [PR 34].

[PR 26]: https://github.com/testground/sdk-rust/pull/26
[PR 25]: https://github.com/testground/sdk-rust/pull/25
[PR 27]: https://github.com/testground/sdk-rust/pull/27
[PR 29]: https://github.com/testground/sdk-rust/pull/29
[PR 34]: https://github.com/testground/sdk-rust/pull/34

## [0.3.0]
### Added
Expand Down
6 changes: 6 additions & 0 deletions examples/example.rs
@@ -1,3 +1,5 @@
use std::borrow::Cow;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = testground::client::Client::new_and_init().await?;
Expand All @@ -11,6 +13,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap()
));

let json = serde_json::json!({"foo": "bar"});

client.publish("demonstration", Cow::Owned(json)).await?;

client.record_success().await?;

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions src/background.rs
Expand Up @@ -23,12 +23,12 @@ const WEBSOCKET_RECEIVER: &str = "Websocket Receiver";
pub enum Command {
Publish {
topic: String,
message: String,
message: serde_json::Value,
sender: oneshot::Sender<Result<u64, Error>>,
},
Subscribe {
topic: String,
stream: mpsc::Sender<Result<String, Error>>,
stream: mpsc::Sender<Result<serde_json::Value, Error>>,
},

SignalEntry {
Expand Down Expand Up @@ -89,7 +89,7 @@ enum PendingRequest {
sender: oneshot::Sender<Result<(), Error>>,
},
Subscribe {
stream: mpsc::Sender<Result<String, Error>>,
stream: mpsc::Sender<Result<serde_json::Value, Error>>,
},
}

Expand Down Expand Up @@ -233,7 +233,7 @@ impl BackgroundTask {
} => {
let topic = self.contextualize_topic(&topic);

self.publish(id, topic, PayloadType::String(message), sender)
self.publish(id, topic, PayloadType::Json(message), sender)
.await
}
Command::Subscribe { topic, stream } => {
Expand Down Expand Up @@ -406,7 +406,7 @@ impl BackgroundTask {
&mut self,
id: u64,
topic: String,
stream: mpsc::Sender<Result<String, Error>>,
stream: mpsc::Sender<Result<serde_json::Value, Error>>,
) {
let request = Request {
id: id.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions src/client.rs
Expand Up @@ -87,7 +87,7 @@ impl Client {
pub async fn publish(
&self,
topic: impl Into<Cow<'static, str>>,
message: impl Into<Cow<'static, str>>,
message: impl Into<Cow<'static, serde_json::Value>>,
) -> Result<u64, Error> {
let (sender, receiver) = oneshot::channel();

Expand All @@ -106,7 +106,7 @@ impl Client {
pub async fn subscribe(
&self,
topic: impl Into<Cow<'static, str>>,
) -> impl Stream<Item = Result<String, Error>> {
) -> impl Stream<Item = Result<serde_json::Value, Error>> {
let (stream, out) = mpsc::channel(1);

let cmd = Command::Subscribe {
Expand Down
2 changes: 1 addition & 1 deletion src/requests.rs
Expand Up @@ -17,7 +17,7 @@ pub struct Request {
pub enum PayloadType {
Event(EventType),

String(String),
Json(serde_json::Value),

Config(NetworkConfiguration),
}
Expand Down
54 changes: 44 additions & 10 deletions src/responses.rs
@@ -1,6 +1,24 @@
use serde::Deserialize;
use serde::{Deserialize, Deserializer};
use serde_with::rust::string_empty_as_none;

fn json_empty_string_is_none<'de, D>(data: D) -> Result<Option<serde_json::Value>, D::Error>
where
D: Deserializer<'de>,
{
let value: Option<serde_json::Value> = Deserialize::deserialize(data)?;

match value {
Some(v) => {
if v.is_string() && v.as_str().unwrap().is_empty() {
Ok(None)
} else {
Ok(Some(v))
}
}
None => Ok(None),
}
}

#[derive(Deserialize, Debug)]
pub struct SignalEntry {
pub seq: u64,
Expand All @@ -18,8 +36,8 @@ pub struct RawResponse {
#[serde(with = "string_empty_as_none")]
pub error: Option<String>,

#[serde(with = "string_empty_as_none")]
pub subscribe: Option<String>,
#[serde(deserialize_with = "json_empty_string_is_none")]
pub subscribe: Option<serde_json::Value>,

pub signal_entry: Option<SignalEntry>,

Expand All @@ -30,7 +48,7 @@ pub struct RawResponse {
pub enum ResponseType {
SignalEntry { seq: u64 },
Publish { seq: u64 },
Subscribe(String),
Subscribe(serde_json::Value),
Error(String),
Barrier,
}
Expand Down Expand Up @@ -58,11 +76,7 @@ impl From<RawResponse> for Response {
let error = serde_json::from_str(&error).expect("JSON Deserialization");
ResponseType::Error(error)
}
(None, Some(msg), None, None) => {
//Hack to remove extra escape characters
let msg = serde_json::from_str(&msg).expect("JSON Deserialization");
ResponseType::Subscribe(msg)
Comment on lines -62 to -64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the suggestion above, we would only do the following patch here:

-               //Hack to remove extra escape characters
-               let msg = serde_json::from_str(&msg).expect("JSON Deserialization");
                ResponseType::Subscribe(msg)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more and running the failing tests myself, I think instead of serializing and deserializing a JSON structure in sdk-rust, we should leave it up to the user of sdk-rust what to send, and instead do the serialization and deserialization in the concrete test plans.

The benefit is, that a user can publish any format and is not bound to JSON.

Could you share the benefits you see?
My thinking might be "tainted" by the go-sdk but I think defaulting to JSON is a good thing:

  • Test implementers don't have to think about serialization schemes,
  • SDKs can automate encoding/decoding (the go SDK uses reflection to convert JSON payloads from/to Go structures),
  • It's hard to create a JSON payload that cannot be used from another language
    • I'm seeing this now with the libp2p/ping go test, they publish a JSON we can use in Rust without much effort. If they had come up with their scheme, best case scenario we would use JSON, worst case we now have to support protocol buffer or a custom encoding.

For what it's worth, it looks like there is a type issue in the API that makes it confusing, but we don't have to leak this on the user side:
https://github.com/testground/sync-service/blob/d3e9e3e56403ed994f350942813d3d8aa746debc/types.go#L21
https://github.com/testground/sync-service/blob/d3e9e3e56403ed994f350942813d3d8aa746debc/types.go#L69
(string response)

}
(None, Some(msg), None, None) => ResponseType::Subscribe(msg),
(None, None, Some(signal), None) => ResponseType::SignalEntry { seq: signal.seq },
(None, None, None, Some(publish)) => ResponseType::Publish { seq: publish.seq },
(error, subscribe, signal_entry, publish) => {
Expand All @@ -89,7 +103,8 @@ mod tests {

#[test]
fn serde_test() {
let raw_response = "{\"id\":\"0\",\"error\":\"\",\"subscribe\":\"\",\"publish\":{\"seq\":1},\"signal_entry\":null}";
let raw_response =
"{\"id\":\"0\",\"error\":\"\",\"subscribe\":\"\",\"publish\":{\"seq\":1},\"signal_entry\":null}";

let response: RawResponse = serde_json::from_str(raw_response).unwrap();

Expand All @@ -103,4 +118,23 @@ mod tests {
response
);
}
#[test]
fn serde_test_complex_subscribe() {
let raw_response = "{\"id\":\"1\",\"error\":\"\",\"subscribe\":{\"Addrs\":[\"/ip4/16.3.0.3/tcp/45369\"],\"ID\":\"QmbSLMEMackm7vHiUGMB2EFAPbzeJNpeB9yTpzYKoojDWc\"}}";

let response: RawResponse = serde_json::from_str(raw_response).unwrap();

let response: Response = response.into();

assert_eq!(
Response {
id: "1".to_owned(),
response: ResponseType::Subscribe(serde_json::json!({
"Addrs": ["/ip4/16.3.0.3/tcp/45369"],
"ID": "QmbSLMEMackm7vHiUGMB2EFAPbzeJNpeB9yTpzYKoojDWc"
}))
},
response
);
}
}