Skip to content

Commit

Permalink
simple hooks for processing incoming and outgoing rpcs
Browse files Browse the repository at this point in the history
svn:r466
  • Loading branch information
provos committed Nov 2, 2007
1 parent 18ac924 commit 65236aa
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 3 deletions.
1 change: 1 addition & 0 deletions ChangeLog
Expand Up @@ -29,3 +29,4 @@ Changes in current version:
o Fix implementation of strsep on platforms that lack it
o Fix implementation of getaddrinfo on platforms that lack it; mainly, this will make Windows http.c work better. Original patch by Lubomir Marinov.
o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa
o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication.
12 changes: 12 additions & 0 deletions evrpc-internal.h
Expand Up @@ -33,12 +33,24 @@ struct evrpc;

#define EVRPC_URI_PREFIX "/.rpc."

struct evrpc_hook {
TAILQ_ENTRY(evrpc_hook) (next);

/* returns -1; if the rpc should be aborted, is allowed to rewrite */
int (*process)(struct evhttp_request *, struct evbuffer *, void *);
void *process_arg;
};

struct evrpc_base {
/* the HTTP server under which we register our RPC calls */
struct evhttp* http_server;

/* a list of all RPCs registered with us */
TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs;

/* hooks for processing outbound and inbound rpcs */
TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks;
struct evrpc_hook_list output_hooks;
};

struct evrpc_req_generic;
Expand Down
108 changes: 105 additions & 3 deletions evrpc.c
Expand Up @@ -74,6 +74,8 @@ evrpc_init(struct evhttp *http_server)
evtag_init();

TAILQ_INIT(&base->registered_rpcs);
TAILQ_INIT(&base->input_hooks);
TAILQ_INIT(&base->output_hooks);
base->http_server = http_server;

return (base);
Expand All @@ -83,14 +85,95 @@ void
evrpc_free(struct evrpc_base *base)
{
struct evrpc *rpc;

struct evrpc_hook *hook;

while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
assert(evrpc_unregister_rpc(base, rpc->uri));
}

while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
assert(evrpc_remove_hook(base, INPUT, hook));
}
while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
assert(evrpc_remove_hook(base, OUTPUT, hook));
}
free(base);
}

void *
evrpc_add_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg)
{
struct evrpc_hook_list *head = NULL;
struct evrpc_hook *hook = NULL;
switch (hook_type) {
case INPUT:
head = &base->input_hooks;
break;
case OUTPUT:
head = &base->output_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
}

hook = calloc(1, sizeof(struct evrpc_hook));
assert(hook != NULL);

hook->process = cb;
hook->process_arg = cb_arg;
TAILQ_INSERT_TAIL(head, hook, next);

return (hook);
}

/*
* remove the hook specified by the handle
*/

int
evrpc_remove_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
void *handle)
{
struct evrpc_hook_list *head = NULL;
struct evrpc_hook *hook = NULL;
switch (hook_type) {
case INPUT:
head = &base->input_hooks;
break;
case OUTPUT:
head = &base->output_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
}

TAILQ_FOREACH(hook, head, next) {
if (hook == handle) {
TAILQ_REMOVE(head, hook, next);
free(hook);
return (1);
}
}

return (0);
}

static int
evrpc_process_hooks(struct evrpc_hook_list *head,
struct evhttp_request *req, struct evbuffer *evbuf)
{
struct evrpc_hook *hook;
TAILQ_FOREACH(hook, head, next) {
if (hook->process(req, evbuf, hook->process_arg) == -1)
return (-1);
}

return (0);
}

static void evrpc_pool_schedule(struct evrpc_pool *pool);
static void evrpc_request_cb(struct evhttp_request *, void *);
void evrpc_request_done(struct evrpc_req_generic*);
Expand Down Expand Up @@ -124,6 +207,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
{
char *constructed_uri = evrpc_construct_uri(rpc->uri);

rpc->base = base;
rpc->cb = cb;
rpc->cb_arg = cb_arg;

Expand Down Expand Up @@ -179,6 +263,15 @@ evrpc_request_cb(struct evhttp_request *req, void *arg)
EVBUFFER_LENGTH(req->input_buffer) <= 0)
goto error;

/*
* we might want to allow hooks to suspend the processing,
* but at the moment, we assume that they just act as simple
* filters.
*/
if (evrpc_process_hooks(&rpc->base->input_hooks,
req, req->input_buffer) == -1)
goto error;

rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
if (rpc_state == NULL)
goto error;
Expand Down Expand Up @@ -236,7 +329,7 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
{
struct evhttp_request *req = rpc_state->http_req;
struct evrpc *rpc = rpc_state->rpc;
struct evbuffer* data;
struct evbuffer* data = NULL;

if (rpc->reply_complete(rpc_state->reply) == -1) {
/* the reply was not completely filled in. error out */
Expand All @@ -251,6 +344,11 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
/* serialize the reply */
rpc->reply_marshal(data, rpc_state->reply);

/* do hook based tweaks to the request */
if (evrpc_process_hooks(&rpc->base->output_hooks,
req, data) == -1)
goto error;

evhttp_send_reply(req, HTTP_OK, "OK", data);

evbuffer_free(data);
Expand All @@ -260,6 +358,8 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
return;

error:
if (data != NULL)
evbuffer_free(data);
evrpc_reqstate_free(rpc_state);
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
return;
Expand Down Expand Up @@ -460,6 +560,8 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
event_del(&ctx->ev_timeout);

memset(&status, 0, sizeof(status));
status.http_req = req;

/* we need to get the reply now */
if (req != NULL) {
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
Expand Down
34 changes: 34 additions & 0 deletions evrpc.h
Expand Up @@ -99,6 +99,9 @@ struct evrpc {
/* the callback invoked for each received rpc */
void (*cb)(struct evrpc_req_generic *, void *);
void *cb_arg;

/* reference for further configuration */
struct evrpc_base *base;
};

#define EVRPC_STRUCT(rpcname) struct evrpc_req__##rpcname
Expand Down Expand Up @@ -140,6 +143,7 @@ EVRPC_STRUCT(rpcname) { \
struct reqstruct* request; \
struct rplystruct* reply; \
struct evrpc* rpc; \
struct evhttp_request* http_req; \
void (*done)(struct evrpc_status *, \
struct evrpc* rpc, void *request, void *reply); \
}; \
Expand Down Expand Up @@ -184,6 +188,11 @@ error: \
return (-1); \
}

/*
* Access to the underlying http object; can be used to look at headers or
* for getting the remote ip address
*/
#define EVRPC_REQUEST_HTTP(rpc_req) (rpc_req)->http_req

/*
* EVRPC_REQUEST_DONE is used to answer a request; the reply is expected
Expand Down Expand Up @@ -252,6 +261,9 @@ struct evrpc_status {
#define EVRPC_STATUS_ERR_BADPAYLOAD 2
#define EVRPC_STATUS_ERR_UNSTARTED 3
int error;

/* for looking at headers or other information */
struct evhttp_request *http_req;
};

struct evrpc_request_wrapper {
Expand Down Expand Up @@ -313,6 +325,28 @@ void evrpc_pool_add_connection(struct evrpc_pool *,
*/
void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);

/*
* Hooks for changing the input and output of RPCs; this can be used to
* implement compression, authentication, encryption, ...
*
* If a hook returns -1, the processing is aborted.
*
* The add functions return handles that can be used for removing hooks.
*/

enum EVRPC_HOOK_TYPE {
INPUT, OUTPUT
};

void *evrpc_add_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg);

int evrpc_remove_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
void *handle);

#ifdef __cplusplus
}
#endif
Expand Down
40 changes: 40 additions & 0 deletions test/regress_rpc.c
Expand Up @@ -91,11 +91,21 @@ EVRPC_HEADER(NeverReply, msg, kill);
EVRPC_GENERATE(Message, msg, kill);
EVRPC_GENERATE(NeverReply, msg, kill);

static int need_input_hook = 0;
static int need_output_hook = 0;

void
MessageCb(EVRPC_STRUCT(Message)* rpc, void *arg)
{
struct kill* kill_reply = rpc->reply;

if (need_input_hook) {
struct evhttp_request* req = EVRPC_REQUEST_HTTP(rpc);
const char *header = evhttp_find_header(
req->input_headers, "X-Hook");
assert(strcmp(header, "input") == 0);
}

/* we just want to fill in some non-sense */
EVTAG_ASSIGN(kill_reply, weapon, "dagger");
EVTAG_ASSIGN(kill_reply, action, "wave around like an idiot");
Expand Down Expand Up @@ -129,6 +139,9 @@ rpc_setup(struct evhttp **phttp, short *pport, struct evrpc_base **pbase)
*phttp = http;
*pport = port;
*pbase = base;

need_input_hook = 0;
need_output_hook = 0;
}

static void
Expand Down Expand Up @@ -330,6 +343,13 @@ GotKillCb(struct evrpc_status *status,
char *weapon;
char *action;

if (need_output_hook) {
struct evhttp_request *req = status->http_req;
const char *header = evhttp_find_header(
req->input_headers, "X-Hook");
assert(strcmp(header, "output") == 0);
}

if (status->error != EVRPC_STATUS_ERR_NONE)
goto done;

Expand Down Expand Up @@ -386,6 +406,18 @@ GotKillCbTwo(struct evrpc_status *status,
event_loopexit(NULL);
}

static int
rpc_hook_add_header(struct evhttp_request *req,
struct evbuffer *evbuf, void *arg)
{
const char *hook_type = arg;
if (strcmp("input", hook_type) == 0)
evhttp_add_header(req->input_headers, "X-Hook", hook_type);
else
evhttp_add_header(req->output_headers, "X-Hook", hook_type);
return (0);
}

static void
rpc_basic_client(void)
{
Expand All @@ -400,6 +432,14 @@ rpc_basic_client(void)

rpc_setup(&http, &port, &base);

need_input_hook = 1;
need_output_hook = 1;

assert(evrpc_add_hook(base, INPUT, rpc_hook_add_header, "input")
!= NULL);
assert(evrpc_add_hook(base, OUTPUT, rpc_hook_add_header, "output")
!= NULL);

pool = rpc_pool_with_connection(port);

/* set up the basic message */
Expand Down

0 comments on commit 65236aa

Please sign in to comment.