Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: use json payloads #34

Merged
merged 11 commits into from Aug 1, 2022
18 changes: 15 additions & 3 deletions .github/workflows/ci.yml
Expand Up @@ -114,7 +114,7 @@ jobs:
testground healthcheck --runner local:docker --fix;
shell: bash

- name: Run testground plan (sdk-rust)
- name: Run testground plan (case=example)
run: |
testground run single \
--plan=sdk-rust \
Expand All @@ -124,12 +124,24 @@ jobs:
--instances=1 \
--wait \
--collect \
--collect-file ./result.tgz
--collect-file ./result_example.tgz

- name: Run testground plan (case=publish-subscribe)
run: |
testground run single \
--plan=sdk-rust \
--testcase=publish-subscribe \
--builder=docker:generic \
--runner=local:docker \
--instances=2 \
--wait \
--collect \
--collect-file ./result_publish_subscribe.tgz

- uses: actions/upload-artifact@v3
if: ${{ always() }}
with:
name: testground-output
path: |
testground/daemon.*
result.tgz
result*.tgz
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -21,10 +21,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
other instances to do the same. Also makes `Client::wait_network_initialized`
private, as it is included in `Client::new_and_init` now. See [PR 25].

- Use a JSON Payload instead of string in `publish` and `subscribe`. See [PR 34].

[PR 26]: https://github.com/testground/sdk-rust/pull/26
[PR 25]: https://github.com/testground/sdk-rust/pull/25
[PR 27]: https://github.com/testground/sdk-rust/pull/27
[PR 29]: https://github.com/testground/sdk-rust/pull/29
[PR 34]: https://github.com/testground/sdk-rust/pull/34

## [0.3.0]
### Added
Expand Down
56 changes: 56 additions & 0 deletions examples/example.rs
@@ -1,7 +1,19 @@
use std::borrow::Cow;

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = testground::client::Client::new_and_init().await?;

match client.run_parameters().test_case.as_str() {
"example" => example(client).await,
"publish-subscribe" => publish_subscribe(client).await,
_ => panic!("Unknown test case: {}", client.run_parameters().test_case),
}
}

async fn example(client: testground::client::Client) -> Result<(), Box<dyn std::error::Error>> {
client.record_message(format!(
"{}, sdk-rust!",
client
Expand All @@ -11,7 +23,51 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap()
));

let json = serde_json::json!({"foo": "bar"});
client.publish("demonstration", Cow::Owned(json)).await?;
client.record_success().await?;

Ok(())
}

async fn publish_subscribe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for providing an example along with your patch. This is great engineering.

Given that the old fn example is quite simple, I would be fine for fn publish_subscribe to replace it. Just an idea to reduce complexity. Not a strong opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @mxinden!
If you don't mind I would keep it, not as a test case, I agree it's probably not super useful, but as an example for future users. I like the idea of having a bare minimum "here is how you write a test that passes in testground", before moving into more complex synchronization patterns.

client: testground::client::Client,
) -> Result<(), Box<dyn std::error::Error>> {
client.record_message("running the publish_subscribe test");

match client.global_seq() {
1 => {
client.record_message("I am instance 1: acting as the leader");

let json = serde_json::json!({"foo": "bar"});
client.publish("demonstration", Cow::Owned(json)).await?;
client.record_success().await?;
}
_ => {
client.record_message(format!(
"I am instance {}: acting as a follower",
client.global_seq()
));

let payload = client
.subscribe("demonstration")
.await
.take(1)
.map(|x| x.unwrap())
.next()
.await
.unwrap();

client.record_message(format!("I received the payload: {}", payload));

if payload["foo"].as_str() == Some("bar") {
client.record_success().await?;
} else {
client
.record_failure(format!("invalid payload: {}", payload))
.await?;
}
}
}
Ok(())
}
6 changes: 5 additions & 1 deletion manifest.toml
Expand Up @@ -15,4 +15,8 @@ name = "example"
instances = { min = 1, max = 1, default = 1 }

[testcases.params]
greeting = { type = "string", desc = "greeting", default = "Hello" }
greeting = { type = "string", desc = "greeting", default = "Hello" }

[[testcases]]
name = "publish-subscribe"
instances = { min = 2, max = 10, default = 2 }
10 changes: 5 additions & 5 deletions src/background.rs
Expand Up @@ -23,12 +23,12 @@ const WEBSOCKET_RECEIVER: &str = "Websocket Receiver";
pub enum Command {
Publish {
topic: String,
message: String,
message: serde_json::Value,
sender: oneshot::Sender<Result<u64, Error>>,
},
Subscribe {
topic: String,
stream: mpsc::Sender<Result<String, Error>>,
stream: mpsc::Sender<Result<serde_json::Value, Error>>,
},

SignalEntry {
Expand Down Expand Up @@ -89,7 +89,7 @@ enum PendingRequest {
sender: oneshot::Sender<Result<(), Error>>,
},
Subscribe {
stream: mpsc::Sender<Result<String, Error>>,
stream: mpsc::Sender<Result<serde_json::Value, Error>>,
},
}

Expand Down Expand Up @@ -233,7 +233,7 @@ impl BackgroundTask {
} => {
let topic = self.contextualize_topic(&topic);

self.publish(id, topic, PayloadType::String(message), sender)
self.publish(id, topic, PayloadType::Json(message), sender)
.await
}
Command::Subscribe { topic, stream } => {
Expand Down Expand Up @@ -406,7 +406,7 @@ impl BackgroundTask {
&mut self,
id: u64,
topic: String,
stream: mpsc::Sender<Result<String, Error>>,
stream: mpsc::Sender<Result<serde_json::Value, Error>>,
) {
let request = Request {
id: id.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions src/client.rs
Expand Up @@ -87,7 +87,7 @@ impl Client {
pub async fn publish(
&self,
topic: impl Into<Cow<'static, str>>,
message: impl Into<Cow<'static, str>>,
message: impl Into<Cow<'static, serde_json::Value>>,
) -> Result<u64, Error> {
let (sender, receiver) = oneshot::channel();

Expand All @@ -106,7 +106,7 @@ impl Client {
pub async fn subscribe(
&self,
topic: impl Into<Cow<'static, str>>,
) -> impl Stream<Item = Result<String, Error>> {
) -> impl Stream<Item = Result<serde_json::Value, Error>> {
let (stream, out) = mpsc::channel(1);

let cmd = Command::Subscribe {
Expand Down
2 changes: 1 addition & 1 deletion src/requests.rs
Expand Up @@ -17,7 +17,7 @@ pub struct Request {
pub enum PayloadType {
Event(EventType),

String(String),
Json(serde_json::Value),

Config(NetworkConfiguration),
}
Expand Down
32 changes: 26 additions & 6 deletions src/responses.rs
Expand Up @@ -30,7 +30,7 @@ pub struct RawResponse {
pub enum ResponseType {
SignalEntry { seq: u64 },
Publish { seq: u64 },
Subscribe(String),
Subscribe(serde_json::Value),
Error(String),
Barrier,
}
Expand All @@ -54,14 +54,14 @@ impl From<RawResponse> for Response {
let response = match (error, subscribe, signal_entry, publish) {
(None, None, None, None) => ResponseType::Barrier,
(Some(error), None, None, None) => {
//Hack to remove extra escape characters
// Hack to remove extra escape characters
let error = serde_json::from_str(&error).expect("JSON Deserialization");
ResponseType::Error(error)
}
(None, Some(msg), None, None) => {
//Hack to remove extra escape characters
let msg = serde_json::from_str(&msg).expect("JSON Deserialization");
ResponseType::Subscribe(msg)
Comment on lines -62 to -64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the suggestion above, we would only do the following patch here:

-               //Hack to remove extra escape characters
-               let msg = serde_json::from_str(&msg).expect("JSON Deserialization");
                ResponseType::Subscribe(msg)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more and running the failing tests myself, I think instead of serializing and deserializing a JSON structure in sdk-rust, we should leave it up to the user of sdk-rust what to send, and instead do the serialization and deserialization in the concrete test plans.

The benefit is, that a user can publish any format and is not bound to JSON.

Could you share the benefits you see?
My thinking might be "tainted" by the go-sdk but I think defaulting to JSON is a good thing:

  • Test implementers don't have to think about serialization schemes,
  • SDKs can automate encoding/decoding (the go SDK uses reflection to convert JSON payloads from/to Go structures),
  • It's hard to create a JSON payload that cannot be used from another language
    • I'm seeing this now with the libp2p/ping go test, they publish a JSON we can use in Rust without much effort. If they had come up with their scheme, best case scenario we would use JSON, worst case we now have to support protocol buffer or a custom encoding.

For what it's worth, it looks like there is a type issue in the API that makes it confusing, but we don't have to leak this on the user side:
https://github.com/testground/sync-service/blob/d3e9e3e56403ed994f350942813d3d8aa746debc/types.go#L21
https://github.com/testground/sync-service/blob/d3e9e3e56403ed994f350942813d3d8aa746debc/types.go#L69
(string response)

// the Subscribe payload is a json encoded string, so we need to deserialize it
laurentsenta marked this conversation as resolved.
Show resolved Hide resolved
let payload = serde_json::from_str(&msg).expect("JSON Deserialization");
ResponseType::Subscribe(payload)
}
(None, None, Some(signal), None) => ResponseType::SignalEntry { seq: signal.seq },
(None, None, None, Some(publish)) => ResponseType::Publish { seq: publish.seq },
Expand Down Expand Up @@ -89,7 +89,8 @@ mod tests {

#[test]
fn serde_test() {
let raw_response = "{\"id\":\"0\",\"error\":\"\",\"subscribe\":\"\",\"publish\":{\"seq\":1},\"signal_entry\":null}";
let raw_response =
"{\"id\":\"0\",\"error\":\"\",\"subscribe\":\"\",\"publish\":{\"seq\":1},\"signal_entry\":null}";

let response: RawResponse = serde_json::from_str(raw_response).unwrap();

Expand All @@ -103,4 +104,23 @@ mod tests {
response
);
}
#[test]
fn serde_test_complex_subscribe() {
let raw_response = "{\"id\":\"1\",\"error\":\"\",\"subscribe\":\"{\\\"Addrs\\\":[\\\"/ip4/16.3.0.3/tcp/45369\\\"],\\\"ID\\\":\\\"QmbSLMEMackm7vHiUGMB2EFAPbzeJNpeB9yTpzYKoojDWc\\\"}\"}";

let response: RawResponse = serde_json::from_str(raw_response).unwrap();

let response: Response = response.into();

assert_eq!(
Response {
id: "1".to_owned(),
response: ResponseType::Subscribe(serde_json::json!({
"Addrs": ["/ip4/16.3.0.3/tcp/45369"],
"ID": "QmbSLMEMackm7vHiUGMB2EFAPbzeJNpeB9yTpzYKoojDWc"
}))
},
response
);
}
}