Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Move Agent thread to also have a Ruby thread representation

This is needed, because for example the object walker expects a native
environment setup if it walks C-API objects that end up calling
rb_gc_mark_maybe.

Fixes #2328
  • Loading branch information...
commit 6f86a41af8734ce07f93eca131abebfd1a4f9b78 1 parent 6affa69
@dbussink dbussink authored
View
98 vm/agent.cpp
@@ -30,6 +30,9 @@
#include "builtin/nativemethod.hpp"
#include "builtin/io.hpp"
+#include "builtin/thread.hpp"
+
+#include "dtrace/dtrace.h"
#include <ostream>
#include <sstream>
@@ -43,19 +46,27 @@ namespace rubinius {
// Ignore any errors, this is happening at shutdown.
}
+ Object* query_agent_tramp(STATE) {
+ state->shared().query_agent()->perform(state);
+ return cNil;
+ }
+
QueryAgent::QueryAgent(STATE)
: AuxiliaryThread()
, shared_(state->shared())
- , thread_(0)
+ , vm_(NULL)
+ , thread_(state)
, port_(0)
, server_fd_(-1)
, verbose_(false)
+ , exit_(false)
, max_fd_(0)
, vars_(0)
, local_only_(true)
, tmp_key_(0)
{
shared_.auxiliary_threads()->register_thread(this);
+ shared_.set_query_agent(this);
initialize(state);
start_thread(state);
@@ -419,26 +430,31 @@ namespace rubinius {
void QueryAgent::start_thread(STATE) {
SYNC(state);
- if(thread_) return;
- thread_ = new Thread(this);
+ if(vm_) return;
+
+ vm_ = state->shared().new_vm();
+ exit_ = false;
+ thread_.set(Thread::create(state, vm_, G(thread), query_agent_tramp, false, true));
+ run(state);
}
void QueryAgent::stop_thread(STATE) {
SYNC(state);
- if(!thread_) return;
+ if(!vm_) return;
- thread_->stop();
+ pthread_t os = vm_->os_thread();
+ exit_ = true;
wakeup();
-
- thread_->join();
- delete thread_;
- thread_ = NULL;
+ void* return_value;
+ pthread_join(os, &return_value);
+ vm_ = NULL;
}
void QueryAgent::wakeup() {
char buf = '!';
+ atomic::memory_barrier();
if(write(write_control(), &buf, 1) < 0) {
std::cerr << "[QA: Write error: " << strerror(errno) << "]\n";
}
@@ -470,6 +486,11 @@ namespace rubinius {
start_thread(state);
}
+ void QueryAgent::run(STATE) {
+ int error = thread_.get()->fork_attached(state);
+ if(error) rubinius::bug("Unable to start agent thread");
+ }
+
void QueryAgent::cleanup() {
remove_tmp_path();
}
@@ -527,24 +548,20 @@ namespace rubinius {
}
}
- QueryAgent::Thread::Thread(QueryAgent* agent)
- : utilities::thread::Thread(0, false)
- , agent_(agent)
- , exit_(false)
- {
- run();
- }
+ void QueryAgent::perform(STATE) {
+ GCTokenImpl gct;
+ const char* thread_name = "rbx.agent";
+ utilities::thread::Thread::set_os_name(thread_name);
- void QueryAgent::Thread::stop() {
- exit_ = true;
- }
+ RUBINIUS_THREAD_START(thread_name, state->vm()->thread_id(), 1);
+
+ state->vm()->thread->hard_unlock(state, gct, 0);
- void QueryAgent::Thread::perform() {
- utilities::thread::Thread::set_os_name("rbx.agent");
while(1) {
- fd_set read_fds = agent_->fds();
+ fd_set read_fds = fds();
- int ret = select(agent_->max_fd() + 1, &read_fds, 0, 0, 0);
+ state->gc_independent(gct, 0);
+ int ret = select(max_fd() + 1, &read_fds, 0, 0, 0);
if(exit_) return;
if(ret < 0) {
@@ -555,21 +572,21 @@ namespace rubinius {
continue;
}
- if(FD_ISSET(agent_->read_control(), &read_fds)) {
+ if(FD_ISSET(read_control(), &read_fds)) {
// Noop, just a wake up!
// Read the one byte written though so we don't clog up the
// pipe and have to use the ponies later.
char buf;
- if(read(agent_->read_control(), &buf, 1) < 0) {
+ if(read(read_control(), &buf, 1) < 0) {
std::cerr << "[QA: Read error: " << strerror(errno) << "]\n";
}
- } else if(agent_->server_fd() > 0 && FD_ISSET(agent_->server_fd(), &read_fds)) {
+ } else if(server_fd() > 0 && FD_ISSET(server_fd(), &read_fds)) {
// now accept an incoming connection
struct sockaddr_in sin;
socklen_t addr_size = sizeof(sin);
- int client = accept(agent_->server_fd(), (struct sockaddr *)&sin, &addr_size);
+ int client = accept(server_fd(), (struct sockaddr *)&sin, &addr_size);
- if(agent_->local_only()) {
+ if(local_only()) {
if(sin.sin_addr.s_addr != inet_addr("127.0.0.1")) {
std::cerr << "[QA: Reject connection from " << inet_ntoa(sin.sin_addr)
<< ":" << ntohs(sin.sin_port) << "]\n";
@@ -579,7 +596,7 @@ namespace rubinius {
Client cl(client);
- cl.begin_auth(agent_->incr_tmp_key());
+ cl.begin_auth(incr_tmp_key());
std::stringstream name;
name << "/tmp/agent-auth." << getuid() << "-" << getpid() << "." << cl.auth_key;
@@ -593,55 +610,56 @@ namespace rubinius {
close(file);
- if(agent_->verbose()) {
+ if(verbose()) {
std::cerr << "[QA: Requesting file auth from " << inet_ntoa(sin.sin_addr)
<< ":" << ntohs(sin.sin_port) << "]\n";
}
request_auth(client, name.str());
- agent_->add_fd(client);
+ add_fd(client);
- agent_->add_socket(cl);
+ add_socket(cl);
continue;
- } else if(agent_->use_password()) {
+ } else if(use_password()) {
Client cl(client);
cl.begin_auth(0);
- if(agent_->verbose()) {
+ if(verbose()) {
std::cerr << "[QA: Requesting password auth from " << inet_ntoa(sin.sin_addr)
<< ":" << ntohs(sin.sin_port) << "]\n";
}
request_password(client);
- agent_->add_fd(client);
+ add_fd(client);
- agent_->add_socket(cl);
+ add_socket(cl);
continue;
}
- if(agent_->verbose()) {
+ if(verbose()) {
std::cerr << "[QA: Connection from " << inet_ntoa(sin.sin_addr)
<< ":" << ntohs(sin.sin_port) << "]\n";
}
write_welcome(client);
- agent_->add_fd(client);
+ add_fd(client);
Client cl(client);
cl.set_running();
- agent_->add_socket(cl);
+ add_socket(cl);
} else {
- if(!agent_->process_clients(read_fds)) {
+ if(!process_clients(read_fds)) {
// TODO: cleanup
break;
}
}
}
+ RUBINIUS_THREAD_STOP(thread_name, state->vm()->thread_id(), 1);
}
}
View
22 vm/agent.hpp
@@ -5,12 +5,15 @@
#include "auxiliary_threads.hpp"
#include "gc/managed.hpp"
+#include "gc/root.hpp"
#include "util/thread.hpp"
#include "windows_compat.h"
namespace rubinius {
class SharedState;
+ class VM;
+ class Thread;
namespace agent {
class VariableAccess;
@@ -49,24 +52,14 @@ namespace rubinius {
};
- class Thread : public utilities::thread::Thread {
- private:
- QueryAgent* agent_;
- bool exit_;
-
- public:
- Thread(QueryAgent* agent);
-
- void stop();
- virtual void perform();
- };
-
private:
SharedState& shared_;
- Thread* thread_;
+ VM* vm_;
+ TypedRoot<Thread*> thread_;
int port_;
int server_fd_;
bool verbose_;
+ bool exit_;
fd_set fds_;
int max_fd_;
@@ -172,5 +165,8 @@ namespace rubinius {
void before_fork(STATE);
void after_fork_parent(STATE);
void after_fork_child(STATE);
+
+ void perform(STATE);
+ void run(STATE);
};
}
View
2  vm/environment.cpp
@@ -878,6 +878,7 @@ namespace rubinius {
load_argv(argc_, argv_);
+ start_signals();
state->vm()->initialize_config();
load_tool();
@@ -897,7 +898,6 @@ namespace rubinius {
load_kernel(runtime);
shared->finalizer_handler()->start_thread(state);
- start_signals();
run_file(runtime + "/loader.rbc");
state->vm()->thread_state()->clear();
View
2  vm/gc/finalize.hpp
@@ -53,8 +53,6 @@ namespace rubinius {
typedef std::list<FinalizeObject> FinalizeObjects;
typedef std::list<FinalizeObjects*> FinalizeObjectsList;
- Object* handle_tramp(STATE);
-
class FinalizerHandler : public AuxiliaryThread, public Lockable {
public:
class iterator {
View
1  vm/shared_state.cpp
@@ -30,6 +30,7 @@ namespace rubinius {
: auxiliary_threads_(0)
, signal_handler_(0)
, finalizer_handler_(0)
+ , query_agent_(0)
, world_(new WorldState(&check_global_interrupts_))
, method_count_(0)
, class_count_(0)
View
9 vm/shared_state.hpp
@@ -69,6 +69,7 @@ namespace rubinius {
AuxiliaryThreads* auxiliary_threads_;
SignalHandler* signal_handler_;
FinalizerHandler* finalizer_handler_;
+ QueryAgent* query_agent_;
CApiConstantNameMap capi_constant_name_map_;
CApiConstantHandleMap capi_constant_handle_map_;
@@ -154,6 +155,14 @@ namespace rubinius {
finalizer_handler_ = thr;
}
+ QueryAgent* query_agent() {
+ return query_agent_;
+ }
+
+ void set_query_agent(QueryAgent* thr) {
+ query_agent_ = thr;
+ }
+
VM* new_vm();
void remove_vm(VM*);
View
6 vm/signal.hpp
@@ -14,8 +14,6 @@ namespace rubinius {
struct CallFrame;
class Thread;
- Object* handle_tramp(STATE);
-
class SignalHandler : public AuxiliaryThread, public Lockable {
SharedState& shared_;
VM* target_;
@@ -45,7 +43,7 @@ namespace rubinius {
SignalHandler(STATE);
virtual ~SignalHandler();
- void perform(State*);
+ void perform(STATE);
void add_signal(State*, int sig, HandlerType type = eCustom);
void handle_signal(int sig);
@@ -64,7 +62,7 @@ namespace rubinius {
void after_fork_parent(STATE);
void after_fork_child(STATE);
- void run(State*);
+ void run(STATE);
};
}
Please sign in to comment.
Something went wrong with that request. Please try again.