Skip to content

Commit

Permalink
Simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed Apr 15, 2019
1 parent 1f95d79 commit d8053d8
Showing 1 changed file with 34 additions and 41 deletions.
75 changes: 34 additions & 41 deletions core/isolate.rs
Expand Up @@ -137,6 +137,7 @@ pub struct Isolate<B: Dispatch> {
needs_init: bool,
shared: SharedQueue,
pending_ops: FuturesUnordered<PendingOp>,
have_unpolled_ops: bool,
}

unsafe impl<B: Dispatch> Send for Isolate<B> {}
Expand Down Expand Up @@ -187,6 +188,7 @@ impl<B: Dispatch> Isolate<B> {
shared,
needs_init,
pending_ops: FuturesUnordered::new(),
have_unpolled_ops: false,
};

// If we want to use execute this has to happen here sadly.
Expand Down Expand Up @@ -253,6 +255,7 @@ impl<B: Dispatch> Isolate<B> {
let _ = isolate.respond(Some(&res_record));
} else {
isolate.pending_ops.push(PendingOp { op, zero_copy_id });
isolate.have_unpolled_ops = true;
}
}

Expand Down Expand Up @@ -476,62 +479,52 @@ impl<B: Dispatch> Future for Isolate<B> {
// Lock the current thread for V8.
let _locker = LockerScope::new(self.libdeno_isolate);

let mut poll_again = false;
let mut overflow_response: Option<Buf> = None;

for _ in 0..1 {
poll_again = false;

while !self.pending_ops.is_empty() {
match self.pending_ops.poll() {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => panic!("unexpected end of op stream"),
Ok(NotReady) => break,
Ok(Ready(Some(r))) => {
if r.zero_copy_id > 0 {
self.zero_copy_release(r.zero_copy_id);
}
loop {
self.have_unpolled_ops = false;
match self.pending_ops.poll() {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
Ok(Ready(Some(r))) => {
if r.zero_copy_id > 0 {
self.zero_copy_release(r.zero_copy_id);
}

let successful_push = self.shared.push(&r.buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(r.buf);
break;
}
let successful_push = self.shared.push(&r.buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some(r.buf);
break;
}
}
}
}

if self.shared.size() > 0 {
poll_again = true;
self.respond(None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}

if overflow_response.is_some() {
poll_again = true;
let buf = overflow_response.take().unwrap();
self.respond(Some(&buf))?;
}
if self.shared.size() > 0 {
self.respond(None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}

self.check_promise_errors();
if let Some(err) = self.last_exception() {
return Err(err);
}
if overflow_response.is_some() {
let buf = overflow_response.take().unwrap();
self.respond(Some(&buf))?;
}

if !poll_again {
break;
}
self.check_promise_errors();
if let Some(err) = self.last_exception() {
return Err(err);
}

// We're idle if pending_ops is empty.
if self.pending_ops.is_empty() {
Ok(futures::Async::Ready(()))
} else {
if poll_again {
if self.have_unpolled_ops {
task::current().notify();
}
Ok(futures::Async::NotReady)
Expand Down

0 comments on commit d8053d8

Please sign in to comment.