Skip to content

Commit

Permalink
Fix queue mutation lock release mixup
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jun 2, 2021
1 parent 0f0a2f4 commit 1fc1890
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
4 changes: 1 addition & 3 deletions examples/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ use dstore::Queue;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
print!("x");
let mut queue = Queue::connect("[::1]:50051").await?;
print!("x");
queue
.push_back(Bytes::from("Hello"), Bytes::from("World"))
.await?;
let popped = queue.pop_front(Bytes::from("Hello")).await?.to_vec();
println!("Hello, {}", String::from_utf8(popped)?);
eprintln!("Hello, {}", String::from_utf8(popped)?);

Ok(())
}
12 changes: 5 additions & 7 deletions src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,13 @@ impl Dstore for Global {
/// RPC to push_back VALUEs onto a queue corresponding to a given KEY
async fn en_queue(&self, args: Request<KeyValue>) -> Result<Response<Null>, Status> {
let KeyValue { key, value } = args.into_inner();
match self.queues.lock().await.get(&key[..]) {
let mut queues = self.queues.lock().await;
match queues.get(&key[..]) {
Some(queue) => queue.lock().await.push_back(Bytes::from(value)),
None => {
let mut queue = VecDeque::new();
queue.push_back(Bytes::from(value));
self.queues
.lock()
.await
.insert(Bytes::from(key), Mutex::new(queue));
queues.insert(Bytes::from(key), Mutex::new(queue));
}
}

Expand All @@ -194,8 +192,8 @@ impl Dstore for Global {

/// RPC to pop_front VALUEs from a queue corresponding to a given KEY
async fn de_queue(&self, args: Request<Byte>) -> Result<Response<Byte>, Status> {
let Byte { body } = args.into_inner();
match self.queues.lock().await.get(&body[..]) {
let Byte { body: key } = args.into_inner();
match self.queues.lock().await.get(&key[..]) {
Some(queue) => match queue.lock().await.pop_front() {
Some(value) => Ok(Response::new(Byte {
body: value.to_vec(),
Expand Down

0 comments on commit 1fc1890

Please sign in to comment.