Skip to content
This repository has been archived by the owner on Nov 16, 2019. It is now read-only.

Multi #3

Merged
merged 20 commits into from Jan 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions ext/curly/curly.c
@@ -1,11 +1,13 @@
#include <ruby.h>
#include "request.h"
#include "response.h"
#include "multi.h"

void Init_curly(void)
{
VALUE curly_mod = rb_define_module("Curly");

Init_curly_response(curly_mod);
Init_curly_request(curly_mod);
Init_curly_multi(curly_mod);
}
53 changes: 53 additions & 0 deletions ext/curly/exstr.c
@@ -0,0 +1,53 @@
#include "exstr.h"
#include <stdlib.h>
#include <string.h>

static int exstr_grow(exstr* s, size_t at_least);

int exstr_alloc(exstr* s, size_t capacity)
{
s->capacity = capacity;
s->length = 0;
s->value = malloc(capacity);

return s->value ? 0 : -1;
}

void exstr_free(exstr* s)
{
free(s->value);
}

int exstr_append(exstr* s, char* buffer, size_t length)
{
if (exstr_grow(s, s->length + length) < 0) {
return -1;
}

memcpy(&s->value[s->length], buffer, length);
s->length += length;

return 0;
}

static int exstr_grow(exstr* s, size_t at_least)
{
size_t new_capacity;
char* new_value;

if (s->capacity <= at_least) {
new_capacity = 2 * at_least;
new_value = realloc(s->value, new_capacity);

if (!new_value) {
/* realloc failed, did not grow */
return -1;
}

s->value = new_value;
s->capacity = new_capacity;

}

return 0;
}
14 changes: 14 additions & 0 deletions ext/curly/exstr.h
@@ -0,0 +1,14 @@
#ifndef _CURLY_EXSTR_H_
#define _CURLY_EXSTR_H_
#include <stdlib.h>

typedef struct exstr_ {
size_t capacity;
size_t length;
char* value;
} exstr;

int exstr_alloc(exstr* s, size_t capacity);
void exstr_free(exstr* s);
int exstr_append(exstr* s, char* buffer, size_t length);
#endif
99 changes: 99 additions & 0 deletions ext/curly/multi.c
@@ -0,0 +1,99 @@
#include "multi.h"
#include "native.h"
#include "request.h"
#include "response.h"
#include <curl/curl.h>
#include <unistd.h>

VALUE multi_run(VALUE self);

void Init_curly_multi(VALUE curly)
{
VALUE multi = rb_define_class_under(curly, "Multi", rb_cObject);

rb_define_method(multi, "run", multi_run, 0);
}

static VALUE multi_run_unblocked(void* m)
{
CURLM* c = (CURLM*)m;
int pending = 0;
struct timeval timeout;
int rc;
fd_set fr, fw, fx;
int maxfd;
long curl_timeout;

curl_multi_perform(c, &pending);

do {
FD_ZERO(&fr);
FD_ZERO(&fw);
FD_ZERO(&fx);

maxfd = -1;
curl_timeout = -1;

timeout.tv_sec = 1;
timeout.tv_usec = 0;

curl_multi_timeout(c, &curl_timeout);

if (curl_timeout >= 0) {
timeout.tv_sec = curl_timeout / 1000;
timeout.tv_sec = timeout.tv_sec > 1 ? 1 : (curl_timeout % 1000) * 1000;
}

curl_multi_fdset(c, &fr, &fw, &fx, &maxfd);
rc = select(maxfd, &fr, &fw, &fx, &timeout);

curl_multi_perform(c, &pending);

} while (pending);

return Qnil;
}

VALUE multi_run(VALUE self)
{
CURLM* c = curl_multi_init();
VALUE requests = rb_iv_get(self, "@requests");
size_t req_len = RARRAY_LEN(requests);
VALUE* req_ptr = RARRAY_PTR(requests);
size_t i;
native_curly* n = malloc(sizeof(native_curly) * req_len);
VALUE resp;
long code;

for(i = 0; i < req_len; ++i) {
native_curly_alloc(&n[i]);
request_prepare(req_ptr[i], &n[i]);
curl_multi_add_handle(c, n[i].handle);
}

rb_thread_blocking_region(multi_run_unblocked, c, NULL, NULL);

/* cleanup */
for(i = 0; i < req_len; ++i) {
resp = response_new();

rb_iv_set(resp, "@body", rb_str_new(n[i].body.value, n[i].body.length));
rb_iv_set(resp, "@head", rb_str_new(n[i].head.value, n[i].head.length));
rb_iv_set(resp, "@curl_code", INT2NUM(n[i].curl_rc));

if (n[i].curl_rc == CURLE_OK) {
curl_easy_getinfo(n[i].handle, CURLINFO_RESPONSE_CODE, &code);
rb_iv_set(resp, "@status", LONG2NUM(code));
} else {
rb_iv_set(resp, "@curl_error", rb_str_new2(curl_easy_strerror(n[i].curl_rc)));
}
rb_iv_set(req_ptr[i], "@response", resp);

curl_multi_remove_handle(c, n[i].handle);
native_curly_free(&n[i]);
}

curl_multi_cleanup(c);

return Qnil;
}
6 changes: 6 additions & 0 deletions ext/curly/multi.h
@@ -0,0 +1,6 @@
#ifndef _CURLY_MULTI_H_
#define _CURLY_MULTI_H_
#include <ruby.h>

void Init_curly_multi(VALUE curly);
#endif
88 changes: 88 additions & 0 deletions ext/curly/native.c
@@ -0,0 +1,88 @@
#include <curl/curl.h>
#include "native.h"

#define INITIAL_BODY_CAPACITY 4096
#define INITIAL_HEAD_CAPACITY 512

static size_t header_callback(void* buffer, size_t size, size_t count, void* n_);
static size_t data_callback (void* buffer, size_t size, size_t count, void* n_);

int native_curly_alloc(native_curly* n)
{
n->curl_rc = 0;
n->handle = curl_easy_init();

if (exstr_alloc(&n->body, INITIAL_BODY_CAPACITY) < 0) {
return -1;
}

if (exstr_alloc(&n->head, INITIAL_HEAD_CAPACITY) < 0) {
exstr_free(&n->body);
return -1;
}

n->req_headers = NULL;

return 0;
}

void native_curly_free(native_curly* n)
{
curl_easy_cleanup(n->handle);
curl_slist_free_all(n->req_headers);

exstr_free(&n->body);
exstr_free(&n->head);
}

void native_curly_add_header(native_curly* n, const char* hdr)
{
n->req_headers = curl_slist_append(n->req_headers, hdr);
}

void native_curly_prepare(native_curly* n, const char* url, long timeout,
const char* body)
{
/* invoke header_callback when a header is received and pass `resp` */
curl_easy_setopt(n->handle, CURLOPT_HEADERFUNCTION, header_callback);
curl_easy_setopt(n->handle, CURLOPT_WRITEHEADER, n);

/* invoke data_callback when a chunk of data is received and pass `resp` */
curl_easy_setopt(n->handle, CURLOPT_WRITEFUNCTION, data_callback);
curl_easy_setopt(n->handle, CURLOPT_WRITEDATA, n);

curl_easy_setopt(n->handle, CURLOPT_URL, url);
curl_easy_setopt(n->handle, CURLOPT_HTTPHEADER, n->req_headers);

if (timeout > 0) {
curl_easy_setopt(n->handle, CURLOPT_TIMEOUT_MS, timeout);
}

if (body) {
curl_easy_setopt(n->handle, CURLOPT_POSTFIELDS, body);
}

}

void native_curly_run_simple(native_curly* n)
{
n->curl_rc = curl_easy_perform(n->handle);
}

/* invoked by curl_easy_perform when a header is available */
static size_t header_callback(void* buffer, size_t size, size_t count, void* n_)
{
native_curly* n = (native_curly*)n_;
exstr_append(&n->head, buffer, size * count);

return size * count;
}

/* invoked by curl_easy_perform when data is available */
static size_t data_callback(void* buffer, size_t size, size_t count, void* n_)
{
native_curly* n = (native_curly*)n_;
exstr_append(&n->body, buffer, size * count);

return size * count;
}
24 changes: 24 additions & 0 deletions ext/curly/native.h
@@ -0,0 +1,24 @@
#ifndef _CURLY_NATIVE_H_
#define _CURLY_NATIVE_H_

#include <curl/curl.h>
#include "exstr.h"

typedef struct native_curly_ {
CURL* handle;
int curl_rc;
exstr body;
exstr head;

struct curl_slist* req_headers;
} native_curly;

int native_curly_alloc(native_curly* n);
void native_curly_free(native_curly* n);
void native_curly_run_simple(native_curly* n);
void native_curly_add_header(native_curly* n, const char* hdr);
void native_curly_prepare(native_curly* n, const char* url, long timeout,
const char* body);


#endif