Skip to content

Commit

Permalink
more solid multithreading support for v8
Browse files Browse the repository at this point in the history
  • Loading branch information
unbit committed Apr 1, 2013
1 parent 41da717 commit bb3f6bc
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 64 deletions.
4 changes: 4 additions & 0 deletions plugins/v8/plugin.c
Expand Up @@ -5,6 +5,8 @@ void uwsgi_v8_apps(void);
void uwsgi_v8_configurator(char *, char **); void uwsgi_v8_configurator(char *, char **);
uint16_t uwsgi_v8_rpc(void *, uint8_t, char **, uint16_t *, char *); uint16_t uwsgi_v8_rpc(void *, uint8_t, char **, uint16_t *, char *);
int uwsgi_v8_signal_handler(uint8_t, void *); int uwsgi_v8_signal_handler(uint8_t, void *);
void uwsgi_v8_init_thread(int);
void uwsgi_v8_enable_threads();


static void uwsgi_v8_register(void) { static void uwsgi_v8_register(void) {
uwsgi_register_configurator(".js", uwsgi_v8_configurator); uwsgi_register_configurator(".js", uwsgi_v8_configurator);
Expand All @@ -26,4 +28,6 @@ struct uwsgi_plugin v8_plugin = {
.rpc = uwsgi_v8_rpc, .rpc = uwsgi_v8_rpc,
.request = uwsgi_v8_request, .request = uwsgi_v8_request,
.signal_handler = uwsgi_v8_signal_handler, .signal_handler = uwsgi_v8_signal_handler,
.enable_threads = uwsgi_v8_enable_threads,
.init_thread = uwsgi_v8_init_thread,
}; };
127 changes: 63 additions & 64 deletions plugins/v8/v8_uwsgi.cc
Expand Up @@ -4,24 +4,24 @@
// as we have isolates in multithread modes, we need to maintain // as we have isolates in multithread modes, we need to maintain
// special tables for the handlers (mules and spooler just run on the core 0) // special tables for the handlers (mules and spooler just run on the core 0)
struct uwsgi_v8_signal_table { struct uwsgi_v8_signal_table {
void *func; v8::Persistent<v8::Function> *func;
uint8_t registered;
}; };


struct uwsgi_v8_rpc_table { struct uwsgi_v8_rpc_table {
char *name; char *name;
v8::Persistent<v8::Function> *func; v8::Persistent<v8::Function> *func;
}; };


v8::Persistent<v8::Function> handler1;

struct uwsgi_v8 { struct uwsgi_v8 {
v8::Persistent<v8::Context> *contexts; v8::Persistent<v8::Context> *contexts;
v8::Isolate **isolates; v8::Isolate **isolates;
struct uwsgi_string_list *load; struct uwsgi_string_list *load;
struct uwsgi_v8_signal_table **sigtable; struct uwsgi_v8_signal_table *sigtable;
struct uwsgi_v8_rpc_table *rpctable; struct uwsgi_v8_rpc_table *rpctable;
int current_core; pthread_key_t current_core;
int preemptive; int preemptive;
uint64_t gc_freq;
} uv8; } uv8;


extern struct uwsgi_server uwsgi; extern struct uwsgi_server uwsgi;
Expand All @@ -41,10 +41,25 @@ static v8::Handle<v8::Value> uwsgi_v8_api_register_signal(const v8::Arguments& a


v8::Persistent<v8::Function> func = v8::Persistent<v8::Function>::New(v8::Handle<v8::Function>::Cast(args[2])); v8::Persistent<v8::Function> func = v8::Persistent<v8::Function>::New(v8::Handle<v8::Function>::Cast(args[2]));


if (uwsgi_register_signal(uwsgi_signal, *signal_kind, *func, v8_plugin.modifier1)) { int core_id = (long) pthread_getspecific(uv8.current_core);
struct uwsgi_v8_signal_table *uvst = &uv8.sigtable[uwsgi_signal];

int need_register = 1;
if (uvst->registered == 1) {
need_register = 0;
}

uvst->func[core_id] = func;

if (!need_register) {
return v8::True();
}

if (uwsgi_register_signal(uwsgi_signal, *signal_kind, uvst, v8_plugin.modifier1)) {
uwsgi_log("[uwsgi-v8] unable to register signal %d\n", uwsgi_signal); uwsgi_log("[uwsgi-v8] unable to register signal %d\n", uwsgi_signal);
return v8::Undefined(); return v8::Undefined();
} }
uvst->registered = 1 ;


return v8::True(); return v8::True();
} }
Expand All @@ -61,21 +76,9 @@ static v8::Handle<v8::Value> uwsgi_v8_api_register_rpc(const v8::Arguments& args
j_argc = args[2]->Uint32Value(); j_argc = args[2]->Uint32Value();
} }



v8::Persistent<v8::Function> func = v8::Persistent<v8::Function>::New(v8::Handle<v8::Function>::Cast(args[1]));
//v8::Persistent<v8::Function> func = v8::Persistent<v8::Function>::New(v8::Handle<v8::Function>::Cast(args[1]));
v8::Local<v8::Function> l_func = v8::Local<v8::Function>::Cast(args[1]);
v8::Persistent<v8::Function> func = v8::Persistent<v8::Function>::New(l_func);

int core_id = uv8.current_core;
if (core_id < 0) {
struct wsgi_request *wsgi_req = current_wsgi_req();
core_id = wsgi_req->async_id;
}


if (core_id == 1) { int core_id = (long) pthread_getspecific(uv8.current_core);
uwsgi_log("OOOOps\n");
handler1 = v8::Persistent<v8::Function>::New(l_func);
}


// get the rpc slot // get the rpc slot
int i; int i;
Expand Down Expand Up @@ -151,15 +154,13 @@ static v8::Handle<v8::Value> uwsgi_v8_api_log(const v8::Arguments& args) {
} }




static v8::Persistent<v8::Context> uwsgi_v8_new_isolate(int core) { static v8::Persistent<v8::Context> uwsgi_v8_new_isolate(int core_id) {
if (core > 0) { // create a new isolate
// create a new isolate v8::Isolate *isolate = v8::Isolate::New();
v8::Isolate *isolate = v8::Isolate::New(); // set as the current isolate
// set as the current isolate isolate->Enter();
isolate->Enter();
}


uv8.isolates[core] = v8::Isolate::GetCurrent(); uv8.isolates[core_id] = v8::Isolate::GetCurrent();


v8::HandleScope handle_scope; v8::HandleScope handle_scope;


Expand All @@ -182,39 +183,34 @@ extern "C" int uwsgi_v8_init(){
uwsgi_log("Initializing V8 %s environment... (%d Isolates)\n", v8::V8::GetVersion(), uwsgi.cores); uwsgi_log("Initializing V8 %s environment... (%d Isolates)\n", v8::V8::GetVersion(), uwsgi.cores);
uv8.isolates = (v8::Isolate **) uwsgi_malloc( sizeof(v8::Isolate *) * uwsgi.cores ); uv8.isolates = (v8::Isolate **) uwsgi_malloc( sizeof(v8::Isolate *) * uwsgi.cores );
uv8.contexts = (v8::Persistent<v8::Context> *) uwsgi_malloc( sizeof(v8::Persistent<v8::Context>) * uwsgi.cores ); uv8.contexts = (v8::Persistent<v8::Context> *) uwsgi_malloc( sizeof(v8::Persistent<v8::Context>) * uwsgi.cores );
for(i=0;i<uwsgi.cores;i++) {
uv8.contexts[i] = uwsgi_v8_new_isolate(i);
}
// allocates rpc and signal tables // allocates rpc and signal tables
uv8.rpctable = (struct uwsgi_v8_rpc_table *) uwsgi_calloc(sizeof(struct uwsgi_v8_rpc_table) * uwsgi.rpc_max); uv8.rpctable = (struct uwsgi_v8_rpc_table *) uwsgi_calloc(sizeof(struct uwsgi_v8_rpc_table) * uwsgi.rpc_max);
for(i=0;i<(int)uwsgi.rpc_max;i++) { for(i=0;i<(int)uwsgi.rpc_max;i++) {
uv8.rpctable[i].func = (v8::Persistent<v8::Function>*) uwsgi_calloc(sizeof(v8::Persistent<v8::Function>) * uwsgi.cores); uv8.rpctable[i].func = (v8::Persistent<v8::Function>*) uwsgi_calloc(sizeof(v8::Persistent<v8::Function>) * uwsgi.cores);
} }
/* uv8.sigtable = (struct uwsgi_v8_signal_table *) uwsgi_calloc(sizeof(struct uwsgi_v8_signal_table) * 256);
uv8.sigtable = (struct uwsgi_v8_signal_table **) uwsgi_calloc(sizeof(struct uwsgi_v8_signal_table *) * 256);
for(i=0;i<256;i++) { for(i=0;i<256;i++) {
uv8.sigtable[i] = (struct uwsgi_v8_signal_table *) uwsgi_calloc(sizeof(struct uwsgi_v8_signal_table) * uwsgi.cores); uv8.sigtable[i].func = (v8::Persistent<v8::Function>*) uwsgi_calloc(sizeof(v8::Persistent<v8::Function>) * uwsgi.cores);
} }
*/
pthread_key_create(&uv8.current_core, NULL);
pthread_setspecific(uv8.current_core, (void *) 0);
uv8.contexts[0] = uwsgi_v8_new_isolate(0);


return 0; return 0;
} }


extern "C" void uwsgi_v8_apps() { static void uwsgi_v8_apps_do(int core_id) {

if (!uv8.load) return; if (!uv8.load) return;

struct uwsgi_string_list *usl = uv8.load;
int i; while(usl) {
for(i=0;i<uwsgi.cores;i++) { uwsgi_v8_load_file(core_id, usl->value);
uv8.current_core = i; usl = usl->next;
struct uwsgi_string_list *usl = uv8.load;
while(usl) {
uwsgi_v8_load_file(i, usl->value);
usl = usl->next;
}
} }
// inform the system to use current_wsgi_req }
uv8.current_core = -1;
extern "C" void uwsgi_v8_apps() {
uwsgi_v8_apps_do(0);
} }


extern "C" void uwsgi_v8_configurator(char *filename, char *magic_table[]) { extern "C" void uwsgi_v8_configurator(char *filename, char *magic_table[]) {
Expand Down Expand Up @@ -274,11 +270,7 @@ extern "C" void uwsgi_v8_configurator(char *filename, char *magic_table[]) {


extern "C" uint16_t uwsgi_v8_rpc(void * func, uint8_t argc, char **argv, uint16_t argvs[], char *buffer) { extern "C" uint16_t uwsgi_v8_rpc(void * func, uint8_t argc, char **argv, uint16_t argvs[], char *buffer) {


int core_id = 0; int core_id = (long) pthread_getspecific(uv8.current_core);
if (uwsgi.mywid > 0) {
struct wsgi_request *wsgi_req = current_wsgi_req();
core_id = wsgi_req->async_id;
}


uv8.isolates[core_id]->Enter(); uv8.isolates[core_id]->Enter();
uv8.contexts[core_id]->Enter(); uv8.contexts[core_id]->Enter();
Expand All @@ -294,11 +286,6 @@ extern "C" uint16_t uwsgi_v8_rpc(void * func, uint8_t argc, char **argv, uint16_


v8::Persistent<v8::Function> l_func = uvrt->func[core_id]; v8::Persistent<v8::Function> l_func = uvrt->func[core_id];


if (core_id == 1) {
uwsgi_log("111111\n");
l_func = handler1;
}

v8::Handle<v8::Value> result = l_func->Call(uv8.contexts[core_id]->Global(), argc, argj); v8::Handle<v8::Value> result = l_func->Call(uv8.contexts[core_id]->Global(), argc, argj);
if (result.IsEmpty()) { if (result.IsEmpty()) {
return 0; return 0;
Expand All @@ -318,15 +305,27 @@ extern "C" uint16_t uwsgi_v8_rpc(void * func, uint8_t argc, char **argv, uint16_


} }


extern "C" void uwsgi_v8_init_thread(int core_id) {
pthread_setspecific(uv8.current_core, (void *) ((long)core_id));
uv8.contexts[core_id] = uwsgi_v8_new_isolate(core_id);
uwsgi_v8_apps_do(core_id);
}

extern "C" void uwsgi_v8_enable_threads() {
}

extern "C" int uwsgi_v8_signal_handler(uint8_t sig, void *handler) { extern "C" int uwsgi_v8_signal_handler(uint8_t sig, void *handler) {
int ret = 0; int ret = 0;
v8::HandleScope handle_scope; int core_id = (long) pthread_getspecific(uv8.current_core);
struct wsgi_request *wsgi_req = current_wsgi_req();
v8::Context::Scope context_scope(uv8.contexts[wsgi_req->async_id]); uv8.isolates[core_id]->Enter();
v8::Persistent<v8::Function> l_func = static_cast<v8::Function*> (handler); uv8.contexts[core_id]->Enter();
v8::Handle<v8::Value> argj[1]; v8::HandleScope handle_scope;
v8::Handle<v8::Value> argj[1];
argj[0] = v8::Number::New(sig); argj[0] = v8::Number::New(sig);
v8::Handle<v8::Value> result = l_func->Call(l_func, 1, argj); struct uwsgi_v8_signal_table *uvst = (struct uwsgi_v8_signal_table *) handler;
v8::Persistent<v8::Function> l_func = uvst->func[core_id];
v8::Handle<v8::Value> result = l_func->Call(uv8.contexts[core_id]->Global(), 1, argj);
if (result.IsEmpty()) ret = -1; if (result.IsEmpty()) ret = -1;
while(!v8::V8::IdleNotification()) {}; while(!v8::V8::IdleNotification()) {};
return ret; return ret;
Expand Down

0 comments on commit bb3f6bc

Please sign in to comment.