Skip to content

Commit

Permalink
completed spooler support in ruby
Browse files Browse the repository at this point in the history
  • Loading branch information
roberto@oneiric64 committed Dec 8, 2011
1 parent fe4c6ba commit b48597d
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 32 deletions.
45 changes: 13 additions & 32 deletions plugins/python/uwsgi_pymodule.c
Expand Up @@ -148,24 +148,14 @@ char *uwsgi_encode_pydict(PyObject * pydict, uint16_t * size) {
keysize = PyString_Size(key);
valsize = PyString_Size(val);
if (bufptr + keysize + 2 + valsize + 2 <= buf + *size) {
#ifdef __BIG_ENDIAN__
keysize = uwsgi_swap16(keysize);
#endif
memcpy(bufptr, &keysize, 2);
bufptr += 2;
#ifdef __BIG_ENDIAN__
keysize = uwsgi_swap16(keysize);
#endif

*bufptr++ = (uint8_t) (keysize & 0xff);
*bufptr++ = (uint8_t) ((keysize >> 8) & 0xff);
memcpy(bufptr, PyString_AsString(key), keysize);
bufptr += keysize;
#ifdef __BIG_ENDIAN__
valsize = uwsgi_swap16(valsize);
#endif
memcpy(bufptr, &valsize, 2);
bufptr += 2;
#ifdef __BIG_ENDIAN__
valsize = uwsgi_swap16(valsize);
#endif

*bufptr++ = (uint8_t) (valsize & 0xff);
*bufptr++ = (uint8_t) ((valsize >> 8) & 0xff);
memcpy(bufptr, PyString_AsString(val), valsize);
bufptr += valsize;
}
Expand Down Expand Up @@ -1600,24 +1590,15 @@ PyObject *py_uwsgi_send_spool(PyObject * self, PyObject * args, PyObject *kw) {
valsize = PyString_Size(val);
if (cur_buf + keysize + 2 + valsize + 2 <= spool_buffer + UMAX16) {

#ifdef __BIG_ENDIAN__
keysize = uwsgi_swap16(keysize);
#endif
memcpy(cur_buf, &keysize, 2);
cur_buf += 2;
#ifdef __BIG_ENDIAN__
keysize = uwsgi_swap16(keysize);
#endif
*cur_buf++ = (uint8_t) (keysize & 0xff);
*cur_buf++ = (uint8_t) ((keysize >> 8) & 0xff);

memcpy(cur_buf, PyString_AsString(key), keysize);
cur_buf += keysize;
#ifdef __BIG_ENDIAN__
valsize = uwsgi_swap16(valsize);
#endif
memcpy(cur_buf, &valsize, 2);
cur_buf += 2;
#ifdef __BIG_ENDIAN__
valsize = uwsgi_swap16(valsize);
#endif

*cur_buf++ = (uint8_t) (valsize & 0xff);
*cur_buf++ = (uint8_t) ((valsize >> 8) & 0xff);

memcpy(cur_buf, PyString_AsString(val), valsize);
cur_buf += valsize;
}
Expand Down
114 changes: 114 additions & 0 deletions plugins/rack/rack_api.c
Expand Up @@ -747,6 +747,118 @@ VALUE uwsgi_ruby_signal(int argc, VALUE *argv, VALUE *class) {
return Qtrue;
}

int rack_uwsgi_build_spool(VALUE rbkey, VALUE rbval, VALUE argv) {
char **sa = (char **) argv;

char *cur_buf = sa[0];
char *watermark = sa[1];

if (TYPE(rbkey) != T_STRING || TYPE(rbval) != T_STRING) {
rb_raise(rb_eRuntimeError, "spool hash must contains only strings");
return ST_STOP;
}

char *key = RSTRING_PTR(rbkey); uint16_t keylen = RSTRING_LEN(rbkey);
char *val = RSTRING_PTR(rbval); uint16_t vallen = RSTRING_LEN(rbval);

if (cur_buf + (2+keylen+2+vallen) > watermark) {
rb_raise(rb_eRuntimeError, "spool hash size can be no more than 64K");
return ST_STOP;
}

*cur_buf++ = (uint8_t) (keylen & 0xff);
*cur_buf++ = (uint8_t) ((keylen >> 8) & 0xff);
memcpy(cur_buf, key, keylen); cur_buf += keylen;

*cur_buf++ = (uint8_t) (vallen & 0xff);
*cur_buf++ = (uint8_t) ((vallen >> 8) & 0xff);
memcpy(cur_buf, val, vallen); cur_buf += vallen;

// fix the ptr
sa[0] = cur_buf;

return ST_CONTINUE;
}


VALUE rack_uwsgi_send_spool(VALUE *class, VALUE args) {

char spool_filename[1024];
struct wsgi_request *wsgi_req = current_wsgi_req();
char *priority = NULL;
long numprio = 0;
time_t at = 0;
char *body = NULL;
size_t body_len= 0;

Check_Type(args, T_HASH);

// priority
#ifdef RUBY19
VALUE rbprio = rb_hash_lookup(args, rb_str_new2("priority"));
#else
VALUE rbprio = rb_hash_aref(args, rb_str_new2("priority"));
#endif
if (TYPE(rbprio) == T_FIXNUM) {
numprio = NUM2INT(rbprio);
rb_hash_delete(args, rb_str_new2("priority"));
}

// at
#ifdef RUBY19
VALUE rbat = rb_hash_lookup(args, rb_str_new2("at"));
#else
VALUE rbat = rb_hash_aref(args, rb_str_new2("at"));
#endif
if (TYPE(rbat) == T_FIXNUM) {
at = NUM2INT(rbat);
rb_hash_delete(args, rb_str_new2("at"));
}

// body
#ifdef RUBY19
VALUE rbbody = rb_hash_lookup(args, rb_str_new2("body"));
#else
VALUE rbbody = rb_hash_aref(args, rb_str_new2("body"));
#endif
if (TYPE(rbbody) == T_STRING) {
body = RSTRING_PTR(rbbody);
body_len = RSTRING_LEN(rbbody);
rb_hash_delete(args, rb_str_new2("body"));
}

char *spool_buffer = uwsgi_malloc(UMAX16);
char *argv[2];
argv[0] = spool_buffer;
argv[1] = spool_buffer + UMAX16 ;

rb_hash_foreach(args, rack_uwsgi_build_spool, (VALUE) argv);

if (numprio) {
priority = uwsgi_num2str(numprio);
}

int ret = spool_request(spool_filename, uwsgi.workers[0].requests + 1, wsgi_req->async_id, spool_buffer, argv[0] - spool_buffer, priority, at, body, body_len);

if (priority) {
free(priority);
}

free(spool_buffer);

if (ret > 0) {
char *slash = uwsgi_get_last_char(spool_filename, '/');
if (slash) {
return rb_str_new2(slash+1);
}
return rb_str_new2(spool_filename);
}

rb_raise(rb_eRuntimeError, "unable to spool job");
return Qnil;

}



void uwsgi_rack_init_api() {
Expand Down Expand Up @@ -783,6 +895,8 @@ void uwsgi_rack_init_api() {
uwsgi_rack_api("mule_id", rack_uwsgi_mule_id, 0);

uwsgi_rack_api("i_am_the_spooler", rack_uwsgi_i_am_the_spooler, 0);
uwsgi_rack_api("send_to_spooler", rack_uwsgi_send_spool, 1);
uwsgi_rack_api("spool", rack_uwsgi_send_spool, 1);

uwsgi_rack_api("log", rack_uwsgi_log, 1);
uwsgi_rack_api("logsize", rack_uwsgi_logsize, 0);
Expand Down

0 comments on commit b48597d

Please sign in to comment.