Skip to content

Commit

Permalink
BUG 1283: erlang_js uses non-thread-safe driver function
Browse files Browse the repository at this point in the history
Using `driver_output_term()` in an async worker pool thread
is a big no-no.  A solution is to use `driver_output_term()`
in the `ready_async()` function, which executes in the proper
thread context.

Tested via:

* Run with a debug SMP build of the Erlang VM.  There are no
  `beam/global.h:1345: Lock check assertion` errors.
* Use the following to run a large number of Map/Reduce jobs to
  try to provoke a memory leak.

The following can be cut-and-paste'd into a Riak console/shell.
On my MBP, it runs for (very roughly) an hour.  Perhaps it should
run even longer to try to find very small memory leaks?

    {ok, C} = riak:local_client().
    [C:put(riak_object:new(<<"test">>, list_to_binary(integer_to_list(X)), list_to_binary(integer_to_list(X)))) || X <- lists:seq(1,500)].
    [C:put(riak_object:new(<<"test">>, list_to_binary(integer_to_list(X)) ,<<"98123489123498123498123498123893489712348974123789432789234178942318794213897234178912348791234789412378923417894123789412378941237894123789412387943128794312879123478941237894123789412378941239781243789213487914237891423789142378914234">>)) || X <- lists:seq(1,250)].
    io:format("Start time = ~p ~p\n", [time(), now()]), [ spawn(fun() -> {ok, C2} = riak:local_client(), [{ok,[_]} = C2:mapred_bucket(<<"test">>, [{map, {jsfun, <<"Riak.mapValuesJson">>}, none, false}, {reduce, {jsfun, <<"Riak.reduceSum">>}, none, true}]) || _ <- lists:seq(1,6*15)] , if YY == 1 -> os:cmd("say I am done now"), io:format("End time = ~p ~p\n", [time(), now()]); true -> ok end end) || YY <- lists:seq(1,200)].
  • Loading branch information
slfritchie committed Nov 13, 2011
1 parent 1fd579d commit 8122b34
Showing 1 changed file with 69 additions and 26 deletions.
95 changes: 69 additions & 26 deletions c_src/spidermonkey_drv.c
Expand Up @@ -14,6 +14,7 @@
limitations under the License. */

#include <string.h>
#include <assert.h>
#include <erl_driver.h>

#include "spidermonkey.h"
Expand All @@ -32,6 +33,10 @@ typedef struct _spidermonkey_drv_t {
typedef struct _js_call_t {
spidermonkey_drv_t *driver_data;
ErlDrvBinary *args;
ErlDrvTermData return_terms[20];
char return_call_id[32];
int return_term_count;
const char *return_string;
} js_call;

typedef void (*asyncfun)(void *);
Expand All @@ -42,6 +47,7 @@ static ErlDrvData start(ErlDrvPort port, char *cmd);
static int init();
static void stop(ErlDrvData handle);
static void process(ErlDrvData handle, ErlIOVec *ev);
static void ready_async(ErlDrvData handle, ErlDrvThreadData async_data);

static ErlDrvEntry spidermonkey_drv_entry = {
init, /* init */
Expand All @@ -56,7 +62,7 @@ static ErlDrvEntry spidermonkey_drv_entry = {
NULL, /* control */
NULL, /* timeout */
process, /* process */
NULL, /* ready_async */
ready_async, /* ready_async */
NULL, /* flush */
NULL, /* call */
NULL, /* event */
Expand All @@ -67,46 +73,67 @@ static ErlDrvEntry spidermonkey_drv_entry = {
};


void send_output(ErlDrvPort port, ErlDrvTermData *terms, int term_count) {
driver_output_term(port, terms, term_count);
void send_immediate_ok_response(spidermonkey_drv_t *dd, const char *call_id) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY, (ErlDrvTermData) call_id, strlen(call_id),
ERL_DRV_ATOM, dd->atom_ok,
ERL_DRV_TUPLE, 2};
driver_output_term(dd->port, terms, sizeof(terms) / sizeof(terms[0]));
}

void send_ok_response(spidermonkey_drv_t *dd, const char *call_id) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY, (ErlDrvTermData) call_id, strlen(call_id),
#define COPY_DATA(CD, CID, TERMS) \
do { \
assert(strlen(CID) < sizeof(CD->return_call_id) - 1); \
strcpy(CD->return_call_id, CID); \
assert(sizeof(TERMS) <= sizeof(CD->return_terms)); \
memcpy(CD->return_terms, TERMS, sizeof(TERMS)); \
CD->return_term_count = sizeof(TERMS) / sizeof(TERMS[0]); \
} while (0)

void send_ok_response(spidermonkey_drv_t *dd, js_call *call_data,
const char *call_id) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY,
(ErlDrvTermData) call_data->return_call_id,strlen(call_id),
ERL_DRV_ATOM, dd->atom_ok,
ERL_DRV_TUPLE, 2};
send_output(dd->port, terms, sizeof(terms) / sizeof(terms[0]));
COPY_DATA(call_data, call_id, terms);
}

void send_error_string_response(spidermonkey_drv_t *dd, const char *call_id, const char *msg) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY, (ErlDrvTermData) call_id, strlen(call_id),
void send_error_string_response(spidermonkey_drv_t *dd, js_call *call_data,
const char *call_id, const char *msg) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY,
(ErlDrvTermData) call_data->return_call_id,strlen(call_id),
ERL_DRV_ATOM, dd->atom_error,
ERL_DRV_BUF2BINARY, (ErlDrvTermData) msg, strlen(msg),
ERL_DRV_TUPLE, 3};
send_output(dd->port, terms, sizeof(terms) / sizeof(terms[0]));
COPY_DATA(call_data, call_id, terms);
call_data->return_string = msg;
}

void send_string_response(spidermonkey_drv_t *dd, const char *call_id, const char *result) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY, (ErlDrvTermData) call_id, strlen(call_id),
void send_string_response(spidermonkey_drv_t *dd, js_call *call_data,
const char *call_id, const char *result) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY,
(ErlDrvTermData) call_data->return_call_id,strlen(call_id),
ERL_DRV_ATOM, dd->atom_ok,
ERL_DRV_BUF2BINARY, (ErlDrvTermData) result, strlen(result),
ERL_DRV_TUPLE, 3};
send_output(dd->port, terms, sizeof(terms) / sizeof(terms[0]));
COPY_DATA(call_data, call_id, terms);
call_data->return_string = result;
}

void unknown_command(spidermonkey_drv_t *dd, const char *call_id) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY, (ErlDrvTermData) call_id, strlen(call_id),
void unknown_command(spidermonkey_drv_t *dd, js_call *call_data,
const char *call_id) {
ErlDrvTermData terms[] = {ERL_DRV_BUF2BINARY,
(ErlDrvTermData) call_data->return_call_id,strlen(call_id),
ERL_DRV_ATOM, dd->atom_error,
ERL_DRV_ATOM, dd->atom_unknown_cmd,
ERL_DRV_TUPLE, 3};
send_output(dd->port, terms, sizeof(terms) / sizeof(terms[0]));
COPY_DATA(call_data, call_id, terms);
}

void run_js(void *jsargs) {
js_call *call_data = (js_call *) jsargs;
spidermonkey_drv_t *dd = call_data->driver_data;
ErlDrvBinary *args = call_data->args;
driver_free(call_data);
char *data = args->orig_bytes;
char *command = read_command(&data);
char *call_id = read_string(&data);
Expand All @@ -116,39 +143,36 @@ void run_js(void *jsargs) {
char *code = read_string(&data);
result = sm_eval(dd->vm, filename, code, 1);
if ((strncmp(result, "[{\"error\":\"notfound\"}]", 22) == 0) || (strncmp(result, "{\"error\"", 8) == 0)) {
send_error_string_response(dd, call_id, result);
send_error_string_response(dd, call_data, call_id, result);
}
else {
send_string_response(dd, call_id, result);
send_string_response(dd, call_data, call_id, result);
}
driver_free(filename);
driver_free(code);
driver_free(result);
}
else if (strncmp(command, "dj", 2) == 0) {
char *filename = read_string(&data);
char *code = read_string(&data);
result = sm_eval(dd->vm, filename, code, 0);
if (result == NULL) {
send_ok_response(dd, call_id);
send_ok_response(dd, call_data, call_id);
}
else {
send_error_string_response(dd, call_id, result);
driver_free(result);
send_error_string_response(dd, call_data, call_id, result);
}
driver_free(filename);
driver_free(code);
}
else if (strncmp(command, "sd", 2) == 0) {
dd->shutdown = 1;
send_ok_response(dd, call_id);
send_ok_response(dd, call_data, call_id);
}
else {
unknown_command(dd, call_id);
unknown_command(dd, call_data, call_id);
}
driver_free(command);
driver_free(call_id);
driver_binary_dec_refc(args);
}

DRIVER_INIT(spidermonkey_drv) {
Expand Down Expand Up @@ -183,6 +207,7 @@ static void stop(ErlDrvData handle) {

static void process(ErlDrvData handle, ErlIOVec *ev) {
spidermonkey_drv_t *dd = (spidermonkey_drv_t *) handle;

char *data = ev->binv[1]->orig_bytes;
char *command = read_command(&data);
if (strncmp(command, "ij", 2) == 0) {
Expand All @@ -194,17 +219,35 @@ static void process(ErlDrvData handle, ErlIOVec *ev) {
thread_stack = thread_stack * (1024 * 1024);
int heap_size = read_int32(&data) * (1024 * 1024);
dd->vm = sm_initialize(thread_stack, heap_size);
send_ok_response(dd, call_id);
send_immediate_ok_response(dd, call_id);
driver_free(call_id);
}
else {
js_call *call_data = (js_call *) driver_alloc(sizeof(js_call));
call_data->driver_data = dd;
call_data->args = ev->binv[1];
call_data->return_terms[0] = 0;
call_data->return_term_count = 0;
call_data->return_string = NULL;
driver_binary_inc_refc(call_data->args);
ErlDrvPort port = dd->port;
unsigned long thread_key = (unsigned long) port;
driver_async(dd->port, (unsigned int *) &thread_key, (asyncfun) run_js, (void *) call_data, NULL);
}
driver_free(command);
}

static void
ready_async(ErlDrvData handle, ErlDrvThreadData async_data)
{
spidermonkey_drv_t *dd = (spidermonkey_drv_t *) handle;
js_call *call_data = (js_call *) async_data;

driver_output_term(dd->port,
call_data->return_terms, call_data->return_term_count);
driver_binary_dec_refc(call_data->args);
if (call_data->return_string != NULL) {
driver_free((void *) call_data->return_string);
}
driver_free(call_data);
}

0 comments on commit 8122b34

Please sign in to comment.