Skip to content

Commit

Permalink
Merge pull request #217 from earlchew/issues-215
Browse files Browse the repository at this point in the history
Only process pdus once when propagating cancellations or errors
  • Loading branch information
sahlberg committed May 12, 2017
2 parents a25e050 + 6afd405 commit 165e51a
Showing 1 changed file with 33 additions and 39 deletions.
72 changes: 33 additions & 39 deletions lib/init.c
Expand Up @@ -262,32 +262,51 @@ char *rpc_get_error(struct rpc_context *rpc)
return rpc->error_string;
}

void rpc_error_all_pdus(struct rpc_context *rpc, const char *error)
static void rpc_purge_all_pdus(struct rpc_context *rpc, int status, const char *error)
{
struct rpc_queue outqueue;
struct rpc_pdu *pdu;
unsigned int i;
int i;

assert(rpc->magic == RPC_CONTEXT_MAGIC);

while ((pdu = rpc->outqueue.head) != NULL) {
rpc->outqueue.head = pdu->next;
pdu->cb(rpc, RPC_STATUS_ERROR, (void *)error, pdu->private_data);
/* Remove all entries from each queue before cancellation to prevent
* the callbacks manipulating entries that are about to be removed.
*
* This code assumes that the callbacks will not enqueue any new
* pdus when called.
*/

outqueue = rpc->outqueue;

rpc_reset_queue(&rpc->outqueue);
while ((pdu = outqueue.head) != NULL) {
outqueue.head = pdu->next;
pdu->next = NULL;
pdu->cb(rpc, status, (void *) error, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}
rpc->outqueue.tail = NULL;

for (i = 0; i < HASHES; i++) {
struct rpc_queue *q = &rpc->waitpdu[i];
struct rpc_queue waitqueue = rpc->waitpdu[i];

while((pdu = q->head) != NULL) {
q->head = pdu->next;
pdu->cb(rpc, RPC_STATUS_ERROR, (void *)error,
pdu->private_data);
rpc_reset_queue(&rpc->waitpdu[i]);
while((pdu = waitqueue.head) != NULL) {
waitqueue.head = pdu->next;
pdu->next = NULL;
pdu->cb(rpc, status, (void *) error, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}
q->tail = NULL;
}
rpc->waitpdu_len = 0;

assert(!rpc->outqueue.head);
for (i = 0; i < HASHES; i++)
assert(!rpc->waitpdu[i].head);
}

void rpc_error_all_pdus(struct rpc_context *rpc, const char *error)
{
rpc_purge_all_pdus(rpc, RPC_STATUS_ERROR, error);
}

static void rpc_free_fragment(struct rpc_fragment *fragment)
Expand Down Expand Up @@ -335,34 +354,9 @@ int rpc_add_fragment(struct rpc_context *rpc, char *data, uint32_t size)

void rpc_destroy_context(struct rpc_context *rpc)
{
struct rpc_pdu *pdu;
unsigned int i;

assert(rpc->magic == RPC_CONTEXT_MAGIC);

/* If we are a server context, free all registered endpoints. */
while (rpc->endpoints != NULL) {
struct rpc_endpoint *next = rpc->endpoints->next;

free(rpc->endpoints);
rpc->endpoints = next;
}

while((pdu = rpc->outqueue.head) != NULL) {
LIBNFS_LIST_REMOVE(&rpc->outqueue.head, pdu);
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}

for (i = 0; i < HASHES; i++) {
struct rpc_queue *q = &rpc->waitpdu[i];

while((pdu = q->head) != NULL) {
LIBNFS_LIST_REMOVE(&q->head, pdu);
pdu->cb(rpc, RPC_STATUS_CANCEL, NULL, pdu->private_data);
rpc_free_pdu(rpc, pdu);
}
}
rpc_purge_all_pdus(rpc, RPC_STATUS_CANCEL, NULL);

rpc_free_all_fragments(rpc);

Expand Down

0 comments on commit 165e51a

Please sign in to comment.