Skip to content
Permalink
Browse files

inspector: fix process._debugEnd() for inspector

This change ensures that the WebSocket server can be stopped
(and restarted if needed) buy calling process._debugEnd.

PR-URL: #12777
Fixes: #12559
Reviewed-By: Sam Roberts <vieuxtech@gmail.com>
Reviewed-By: Refael Ackermann <refack@gmail.com>
  • Loading branch information...
eugeneo authored and jasnell committed May 1, 2017
1 parent e6c03c7 commit 77d5e6f8dae671081c6f0eba3ae36ebc33c36a1f
@@ -38,6 +38,18 @@ using v8_inspector::V8Inspector;
static uv_sem_t inspector_io_thread_semaphore;
static uv_async_t start_inspector_thread_async;

class StartIoTask : public v8::Task {
public:
explicit StartIoTask(Agent* agent) : agent(agent) {}

void Run() override {
agent->StartIoThread(false);
}

private:
Agent* agent;
};

std::unique_ptr<StringBuffer> ToProtocolString(Isolate* isolate,
Local<Value> value) {
TwoByteValue buffer(isolate, value);
@@ -46,9 +58,14 @@ std::unique_ptr<StringBuffer> ToProtocolString(Isolate* isolate,

// Called from the main thread.
void StartInspectorIoThreadAsyncCallback(uv_async_t* handle) {
static_cast<Agent*>(handle->data)->StartIoThread();
static_cast<Agent*>(handle->data)->StartIoThread(false);
}

void StartIoCallback(Isolate* isolate, void* agent) {
static_cast<Agent*>(agent)->StartIoThread(false);
}


#ifdef __POSIX__
static void EnableInspectorIOThreadSignalHandler(int signo) {
uv_sem_post(&inspector_io_thread_semaphore);
@@ -57,7 +74,9 @@ static void EnableInspectorIOThreadSignalHandler(int signo) {
inline void* InspectorIoThreadSignalThreadMain(void* unused) {
for (;;) {
uv_sem_wait(&inspector_io_thread_semaphore);
uv_async_send(&start_inspector_thread_async);
Agent* agent = static_cast<Agent*>(start_inspector_thread_async.data);
if (agent != nullptr)
agent->RequestIoStart();
}
return nullptr;
}
@@ -103,7 +122,9 @@ static int RegisterDebugSignalHandler() {

#ifdef _WIN32
DWORD WINAPI EnableDebugThreadProc(void* arg) {
uv_async_send(&start_inspector_thread_async);
Agent* agent = static_cast<Agent*>(start_inspector_thread_async.data);
if (agent != nullptr)
agent->RequestIoStart();
return 0;
}

@@ -387,29 +408,29 @@ bool Agent::Start(v8::Platform* platform, const char* path,
new NodeInspectorClient(parent_env_, platform));
inspector_->contextCreated(parent_env_->context(), "Node.js Main Context");
platform_ = platform;
CHECK_EQ(0, uv_async_init(uv_default_loop(),
&start_inspector_thread_async,
StartInspectorIoThreadAsyncCallback));
start_inspector_thread_async.data = this;
uv_unref(reinterpret_cast<uv_handle_t*>(&start_inspector_thread_async));

RegisterDebugSignalHandler();
if (options.inspector_enabled()) {
return StartIoThread();
} else {
CHECK_EQ(0, uv_async_init(uv_default_loop(),
&start_inspector_thread_async,
StartInspectorIoThreadAsyncCallback));
start_inspector_thread_async.data = this;
uv_unref(reinterpret_cast<uv_handle_t*>(&start_inspector_thread_async));

RegisterDebugSignalHandler();
return true;
return StartIoThread(options.wait_for_connect());
}
return true;
}

bool Agent::StartIoThread() {
bool Agent::StartIoThread(bool wait_for_connect) {
if (io_ != nullptr)
return true;

CHECK_NE(inspector_, nullptr);

enabled_ = true;
io_ = std::unique_ptr<InspectorIo>(
new InspectorIo(parent_env_, platform_, path_, debug_options_));
new InspectorIo(parent_env_, platform_, path_, debug_options_,
wait_for_connect));
if (!io_->Start()) {
inspector_.reset();
return false;
@@ -440,8 +461,10 @@ bool Agent::StartIoThread() {
}

void Agent::Stop() {
if (io_ != nullptr)
if (io_ != nullptr) {
io_->Stop();
io_.reset();
}
}

void Agent::Connect(InspectorSessionDelegate* delegate) {
@@ -502,6 +525,18 @@ void Agent::InitJSBindings(Local<Object> target, Local<Value> unused,
if (agent->debug_options_.wait_for_connect())
env->SetMethod(target, "callAndPauseOnStart", CallAndPauseOnStart);
}

void Agent::RequestIoStart() {
// We need to attempt to interrupt V8 flow (in case Node is running
// continuous JS code) and to wake up libuv thread (in case Node is wating
// for IO events)
uv_async_send(&start_inspector_thread_async);
v8::Isolate* isolate = parent_env_->isolate();
platform_->CallOnForegroundThread(isolate, new StartIoTask(this));
isolate->RequestInterrupt(StartIoCallback, this);
uv_async_send(&start_inspector_thread_async);
}

} // namespace inspector
} // namespace node

@@ -51,7 +51,6 @@ class Agent {

bool Start(v8::Platform* platform, const char* path,
const DebugOptions& options);
bool StartIoThread();
void Stop();

bool IsStarted();
@@ -72,6 +71,13 @@ class Agent {
v8::Local<v8::Context> context,
void* priv);

bool StartIoThread(bool wait_for_connect);
InspectorIo* io() {
return io_.get();
}
// Can be called from any thread
void RequestIoStart();

private:
node::Environment* parent_env_;
std::unique_ptr<NodeInspectorClient> inspector_;
@@ -20,6 +20,7 @@
namespace node {
namespace inspector {
namespace {
using AsyncAndAgent = std::pair<uv_async_t, Agent*>;
using v8_inspector::StringBuffer;
using v8_inspector::StringView;

@@ -96,6 +97,12 @@ int CloseAsyncAndLoop(uv_async_t* async) {
return uv_loop_close(async->loop);
}

void ReleasePairOnAsyncClose(uv_handle_t* async) {
AsyncAndAgent* pair = node::ContainerOf(&AsyncAndAgent::first,
reinterpret_cast<uv_async_t*>(async));
delete pair;
}

} // namespace

std::unique_ptr<StringBuffer> Utf8ToStringView(const std::string& message) {
@@ -127,6 +134,9 @@ class InspectorIoDelegate: public node::inspector::SocketServerDelegate {
std::string GetTargetTitle(const std::string& id) override;
std::string GetTargetUrl(const std::string& id) override;
bool IsConnected() { return connected_; }
void ServerDone() override {
io_->ServerDone();
}
private:
InspectorIo* io_;
bool connected_;
@@ -137,53 +147,70 @@ class InspectorIoDelegate: public node::inspector::SocketServerDelegate {
bool waiting_;
};

void InterruptCallback(v8::Isolate*, void* io) {
static_cast<InspectorIo*>(io)->DispatchMessages();
void InterruptCallback(v8::Isolate*, void* agent) {
InspectorIo* io = static_cast<Agent*>(agent)->io();
if (io != nullptr)
io->DispatchMessages();
}

class DispatchOnInspectorBackendTask : public v8::Task {
class DispatchMessagesTask : public v8::Task {
public:
explicit DispatchOnInspectorBackendTask(InspectorIo* io) : io_(io) {}
explicit DispatchMessagesTask(Agent* agent) : agent_(agent) {}

void Run() override {
io_->DispatchMessages();
InspectorIo* io = agent_->io();
if (io != nullptr)
io->DispatchMessages();
}

private:
InspectorIo* io_;
Agent* agent_;
};

InspectorIo::InspectorIo(Environment* env, v8::Platform* platform,
const std::string& path, const DebugOptions& options)
const std::string& path, const DebugOptions& options,
bool wait_for_connect)
: options_(options), thread_(), delegate_(nullptr),
shutting_down_(false), state_(State::kNew),
parent_env_(env), io_thread_req_(),
platform_(platform), dispatching_messages_(false),
session_id_(0), script_name_(path) {
CHECK_EQ(0, uv_async_init(env->event_loop(), &main_thread_req_,
state_(State::kNew), parent_env_(env),
io_thread_req_(), platform_(platform),
dispatching_messages_(false), session_id_(0),
script_name_(path),
wait_for_connect_(wait_for_connect) {
main_thread_req_ = new AsyncAndAgent({uv_async_t(), env->inspector_agent()});
CHECK_EQ(0, uv_async_init(env->event_loop(), &main_thread_req_->first,
InspectorIo::MainThreadAsyncCb));
uv_unref(reinterpret_cast<uv_handle_t*>(&main_thread_req_));
uv_unref(reinterpret_cast<uv_handle_t*>(&main_thread_req_->first));
CHECK_EQ(0, uv_sem_init(&start_sem_, 0));
}

InspectorIo::~InspectorIo() {
uv_sem_destroy(&start_sem_);
uv_close(reinterpret_cast<uv_handle_t*>(&main_thread_req_->first),
ReleasePairOnAsyncClose);
}

bool InspectorIo::Start() {
CHECK_EQ(state_, State::kNew);
CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadCbIO, this), 0);
uv_sem_wait(&start_sem_);

if (state_ == State::kError) {
Stop();
return false;
}
state_ = State::kAccepting;
if (options_.wait_for_connect()) {
if (wait_for_connect_) {
DispatchMessages();
}
return true;
}

void InspectorIo::Stop() {
CHECK(state_ == State::kAccepting || state_ == State::kConnected);
Write(TransportAction::kKill, 0, StringView());
int err = uv_thread_join(&thread_);
CHECK_EQ(err, 0);
state_ = State::kShutDown;
DispatchMessages();
}

bool InspectorIo::IsConnected() {
@@ -195,8 +222,10 @@ bool InspectorIo::IsStarted() {
}

void InspectorIo::WaitForDisconnect() {
if (state_ == State::kAccepting)
state_ = State::kDone;
if (state_ == State::kConnected) {
shutting_down_ = true;
state_ = State::kShutDown;
Write(TransportAction::kStop, 0, StringView());
fprintf(stderr, "Waiting for the debugger to disconnect...\n");
fflush(stderr);
@@ -222,6 +251,9 @@ void InspectorIo::WriteCbIO(uv_async_t* async) {
io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_messages);
for (const auto& outgoing : outgoing_messages) {
switch (std::get<0>(outgoing)) {
case TransportAction::kKill:
io_and_transport->first->TerminateConnections();
// Fallthrough
case TransportAction::kStop:
io_and_transport->first->Stop(nullptr);
break;
@@ -253,7 +285,7 @@ void InspectorIo::WorkerRunIO() {
uv_fs_req_cleanup(&req);
}
InspectorIoDelegate delegate(this, script_path, script_name_,
options_.wait_for_connect());
wait_for_connect_);
delegate_ = &delegate;
InspectorSocketServer server(&delegate,
options_.host_name(),
@@ -266,14 +298,12 @@ void InspectorIo::WorkerRunIO() {
uv_sem_post(&start_sem_);
return;
}
if (!options_.wait_for_connect()) {
if (!wait_for_connect_) {
uv_sem_post(&start_sem_);
}
uv_run(&loop, UV_RUN_DEFAULT);
io_thread_req_.data = nullptr;
server.Stop(nullptr);
server.TerminateConnections(nullptr);
CHECK_EQ(CloseAsyncAndLoop(&io_thread_req_), 0);
CHECK_EQ(uv_loop_close(&loop), 0);
delegate_ = nullptr;
}

@@ -298,11 +328,12 @@ void InspectorIo::PostIncomingMessage(InspectorAction action, int session_id,
const std::string& message) {
if (AppendMessage(&incoming_message_queue_, action, session_id,
Utf8ToStringView(message))) {
Agent* agent = main_thread_req_->second;
v8::Isolate* isolate = parent_env_->isolate();
platform_->CallOnForegroundThread(isolate,
new DispatchOnInspectorBackendTask(this));
isolate->RequestInterrupt(InterruptCallback, this);
CHECK_EQ(0, uv_async_send(&main_thread_req_));
new DispatchMessagesTask(agent));
isolate->RequestInterrupt(InterruptCallback, agent);
CHECK_EQ(0, uv_async_send(&main_thread_req_->first));
}
NotifyMessageReceived();
}
@@ -344,7 +375,7 @@ void InspectorIo::DispatchMessages() {
break;
case InspectorAction::kEndSession:
CHECK_NE(session_delegate_, nullptr);
if (shutting_down_) {
if (state_ == State::kShutDown) {
state_ = State::kDone;
} else {
state_ = State::kAccepting;
@@ -363,12 +394,18 @@ void InspectorIo::DispatchMessages() {

// static
void InspectorIo::MainThreadAsyncCb(uv_async_t* req) {
InspectorIo* io = node::ContainerOf(&InspectorIo::main_thread_req_, req);
io->DispatchMessages();
AsyncAndAgent* pair = node::ContainerOf(&AsyncAndAgent::first, req);
// Note that this may be called after io was closed or even after a new
// one was created and ran.
InspectorIo* io = pair->second->io();
if (io != nullptr)
io->DispatchMessages();
}

void InspectorIo::Write(TransportAction action, int session_id,
const StringView& inspector_message) {
if (state_ == State::kShutDown)
return;
AppendMessage(&outgoing_message_queue_, action, session_id,
StringBuffer::create(inspector_message));
int err = uv_async_send(&io_thread_req_);

0 comments on commit 77d5e6f

Please sign in to comment.
You can’t perform that action at this time.