Skip to content

Commit

Permalink
chore: [rust] use cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai132 committed Jan 4, 2024
1 parent 67276ba commit 96d58f5
Show file tree
Hide file tree
Showing 25 changed files with 461 additions and 317 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: cargo fmt -- --check
run: cargo fmt -- --check

- name: build
working-directory: rust
run: cargo build
Expand Down
5 changes: 4 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rpc-core"
version = "0.3.1"
version = "0.3.2"
description = "a tiny rpc library for rust"
categories = ["rpc", "rpc-core"]
homepage = "https://github.com/shuai132/rpc_core/tree/master/rust"
Expand All @@ -26,6 +26,9 @@ default = []
full = ["net"]
net = ["tokio", "tokio-stream"]

[package.metadata.docs.rs]
features = ["full"]

[dev-dependencies]
env_logger = "0.10.0"

Expand Down
2 changes: 1 addition & 1 deletion rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Or add the following line to your Cargo.toml:

```toml
[dependencies]
rpc-core = { version = "0.3.1", features = ["net"] }
rpc-core = { version = "0.3.2", features = ["net"] }
```

# Example
Expand Down
21 changes: 10 additions & 11 deletions rust/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,21 @@ impl Connection for DefaultConnection {
pub struct LoopbackConnection;

impl LoopbackConnection {
pub fn new() -> (Rc<RefCell<DefaultConnection>>, Rc<RefCell<DefaultConnection>>) {
pub fn new() -> (
Rc<RefCell<DefaultConnection>>,
Rc<RefCell<DefaultConnection>>,
) {
let c1 = Rc::new(RefCell::new(DefaultConnection::default()));
let c1_weak = Rc::downgrade(&c1);
let c2 = Rc::new(RefCell::new(DefaultConnection::default()));
let c2_weak = Rc::downgrade(&c2);

c1.borrow_mut().send_package_impl = Some(
Box::new(move |package: Vec<u8>| {
c2_weak.upgrade().unwrap().borrow().on_recv_package(package);
})
);
c2.borrow_mut().send_package_impl = Some(
Box::new(move |package: Vec<u8>| {
c1_weak.upgrade().unwrap().borrow().on_recv_package(package);
})
);
c1.borrow_mut().send_package_impl = Some(Box::new(move |package: Vec<u8>| {
c2_weak.upgrade().unwrap().borrow().on_recv_package(package);
}));
c2.borrow_mut().send_package_impl = Some(Box::new(move |package: Vec<u8>| {
c1_weak.upgrade().unwrap().borrow().on_recv_package(package);
}));
(c1, c2)
}
}
5 changes: 4 additions & 1 deletion rust/src/detail/coder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub fn deserialize(payload: &Vec<u8>) -> Option<MsgWrapper> {
cmd_len = *(p as *const u16);
p = p.add(2);

msg.cmd = String::from_utf8_unchecked(Vec::from(std::slice::from_raw_parts(p as *mut u8, cmd_len as usize)));
msg.cmd = String::from_utf8_unchecked(Vec::from(std::slice::from_raw_parts(
p as *mut u8,
cmd_len as usize,
)));
p = p.add(cmd_len as usize);

msg.type_ = MsgType::from_bits(*p).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions rust/src/detail/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub(crate) mod msg_wrapper;
pub(crate) mod msg_dispatcher;
pub(crate) mod coder;
pub(crate) mod msg_dispatcher;
pub(crate) mod msg_wrapper;
61 changes: 41 additions & 20 deletions rust/src/detail/msg_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ impl MsgDispatcher {
let this_weak = Rc::downgrade(&dispatcher);
dispatcher.borrow_mut().this = this_weak.clone();

conn.borrow_mut().set_recv_package_impl(Box::new(
move |payload| {
conn.borrow_mut()
.set_recv_package_impl(Box::new(move |payload| {
if this_weak.strong_count() == 0 {
return;
}
Expand All @@ -46,8 +46,7 @@ impl MsgDispatcher {
} else {
error!("deserialize error");
}
}
));
}));
dispatcher
}
}
Expand All @@ -65,25 +64,38 @@ impl MsgDispatcher {
}
}

pub fn subscribe_rsp(&mut self, seq: SeqType, rsp_handle: Rc<RspHandle>, timeout_cb: Option<Rc<TimeoutCb>>, timeout_ms: u32) {
pub fn subscribe_rsp(
&mut self,
seq: SeqType,
rsp_handle: Rc<RspHandle>,
timeout_cb: Option<Rc<TimeoutCb>>,
timeout_ms: u32,
) {
self.rsp_handle_map.insert(seq, rsp_handle);
if let Some(timer_impl) = &self.timer_impl {
let this_weak = self.this.clone();
timer_impl(timeout_ms, Box::new(move || {
if this_weak.strong_count() == 0 {
debug!("seq:{} timeout after destroy", seq);
return;
}
timer_impl(
timeout_ms,
Box::new(move || {
if this_weak.strong_count() == 0 {
debug!("seq:{} timeout after destroy", seq);
return;
}

let this = this_weak.upgrade().unwrap();
let mut this = this.borrow_mut();
if let Some(_) = this.rsp_handle_map.remove(&seq) {
if let Some(timeout_cb) = &timeout_cb {
timeout_cb();
let this = this_weak.upgrade().unwrap();
let mut this = this.borrow_mut();
if let Some(_) = this.rsp_handle_map.remove(&seq) {
if let Some(timeout_cb) = &timeout_cb {
timeout_cb();
}
trace!(
"Timeout seq={}, rsp_handle_map.size={}",
seq,
this.rsp_handle_map.len()
);
}
trace!("Timeout seq={}, rsp_handle_map.size={}", seq, this.rsp_handle_map.len());
}
}));
}),
);
} else {
warn!("no timeout will cause memory leak!");
}
Expand Down Expand Up @@ -131,7 +143,15 @@ impl MsgDispatcher {
}
} else if msg.type_.contains(MsgType::Response) {
// pong or response
debug!("<= seq:{} type:{}", msg.seq, if msg.type_.contains(MsgType::Pong) { "pong" } else {"rsp"});
debug!(
"<= seq:{} type:{}",
msg.seq,
if msg.type_.contains(MsgType::Pong) {
"pong"
} else {
"rsp"
}
);
if let Some(handle) = self.rsp_handle_map.remove(&msg.seq) {
if handle(msg) {
trace!("rsp_handle_map.size={}", self.rsp_handle_map.len());
Expand All @@ -148,7 +168,8 @@ impl MsgDispatcher {
}

pub fn set_timer_impl<F>(&mut self, timer_impl: F)
where F: Fn(u32, Box<TimeoutCb>) + 'static
where
F: Fn(u32, Box<TimeoutCb>) + 'static,
{
self.timer_impl = Some(Rc::new(timer_impl));
}
Expand Down
13 changes: 10 additions & 3 deletions rust/src/detail/msg_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@ impl MsgWrapper {
}

pub fn dump(&self) -> String {
format!("seq:{}, type:{}, cmd:{}", self.seq, self.type_.bits(), self.cmd)
format!(
"seq:{}, type:{}, cmd:{}",
self.seq,
self.type_.bits(),
self.cmd
)
}

pub fn unpack_as<'a, T>(&'a self) -> Result<T, Error>
where T: serde::Deserialize<'a>
where
T: serde::Deserialize<'a>,
{
let r = serde_json::from_slice::<T>(self.data.as_slice());
if r.is_err() {
Expand All @@ -49,7 +55,8 @@ impl MsgWrapper {
}

pub fn make_rsp<R>(seq: SeqType, rsp: R) -> MsgWrapper
where R: serde::Serialize
where
R: serde::Serialize,
{
let mut msg = MsgWrapper::new();
msg.type_ = MsgType::Response;
Expand Down
4 changes: 1 addition & 3 deletions rust/src/dispose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ pub struct Dispose {

impl Dispose {
pub fn new() -> Dispose {
Dispose {
requests: vec![],
}
Dispose { requests: vec![] }
}

pub fn dismiss(&mut self) {
Expand Down
68 changes: 36 additions & 32 deletions rust/src/examples/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,41 @@ fn main() {

runtime.block_on(async {
let local = tokio::task::LocalSet::new();
local.run_until(async move {
let rpc = Rpc::new(None);
let client = rpc_client::RpcClient::new(RpcConfigBuilder::new().rpc(Some(rpc.clone())).build());
client.on_open(|_: Rc<Rpc>| {
info!("on_open");
});
client.on_open_failed(|e| {
info!("on_open_failed: {:?}", e);
});
client.on_close(|| {
info!("on_close");
});
client.set_reconnect(1000);
client.open("localhost", 6666);
info!("start...");

loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

info!("usage: callback...");
rpc.cmd("cmd")
.msg("hello")
.rsp(|msg: String| {
info!("### rsp: {msg}");
})
.call();

info!("usage: future...");
let result = rpc.cmd("cmd").msg("hello").future::<String>().await;
info!("### rsp: {result:?}");
}
}).await;
local
.run_until(async move {
let rpc = Rpc::new(None);
let client = rpc_client::RpcClient::new(
RpcConfigBuilder::new().rpc(Some(rpc.clone())).build(),
);
client.on_open(|_: Rc<Rpc>| {
info!("on_open");
});
client.on_open_failed(|e| {
info!("on_open_failed: {:?}", e);
});
client.on_close(|| {
info!("on_close");
});
client.set_reconnect(1000);
client.open("localhost", 6666);
info!("start...");

loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

info!("usage: callback...");
rpc.cmd("cmd")
.msg("hello")
.rsp(|msg: String| {
info!("### rsp: {msg}");
})
.call();

info!("usage: future...");
let result = rpc.cmd("cmd").msg("hello").future::<String>().await;
info!("### rsp: {result:?}");
}
})
.await;
});
}
44 changes: 24 additions & 20 deletions rust/src/examples/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,32 @@ fn main() {

runtime.block_on(async {
let local = tokio::task::LocalSet::new();
local.run_until(async move {
let rpc = Rpc::new(None);
rpc.subscribe("cmd", |msg: String| -> String {
info!("cmd: {msg}");
"world".to_string()
});
local
.run_until(async move {
let rpc = Rpc::new(None);
rpc.subscribe("cmd", |msg: String| -> String {
info!("cmd: {msg}");
"world".to_string()
});

let server = rpc_server::RpcServer::new(6666, RpcConfigBuilder::new().rpc(Some(rpc.clone())).build());
server.on_session(move |session| {
info!("on_open");
let session = session.upgrade().unwrap();
session.on_close(|| {
info!("on_close");
let server = rpc_server::RpcServer::new(
6666,
RpcConfigBuilder::new().rpc(Some(rpc.clone())).build(),
);
server.on_session(move |session| {
info!("on_open");
let session = session.upgrade().unwrap();
session.on_close(|| {
info!("on_close");
});
});
});

info!("start...");
server.start();
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1000)).await;
}
}).await;
info!("start...");
server.start();
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1000)).await;
}
})
.await;
});
}

0 comments on commit 96d58f5

Please sign in to comment.