Skip to content

Commit

Permalink
Queue events on their subscription object instead of adding them to t…
Browse files Browse the repository at this point in the history
…he thread pool immediately.

Events destined for a non-responding control point would flood the thread
pool and prevent correct dispatching to other clients, sometimes to the
point of disabling the device. The new code queues events without
allocating thread resources and properly discards events when a client is
not accepting them.

Signed-off-by: Marcelo Roberto Jimenez <mroberto@users.sourceforge.net>
  • Loading branch information
medoc92 authored and mrjimenez committed Jun 16, 2017
1 parent 26cdf14 commit c54ecdd
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 41 deletions.
164 changes: 123 additions & 41 deletions upnp/src/gena/gena_device.c
Expand Up @@ -39,6 +39,8 @@
#if EXCLUDE_GENA == 0
#ifdef INCLUDE_DEVICE_APIS

#include <assert.h>

#include "gena.h"
#include "httpparser.h"
#include "httpreadwrite.h"
Expand All @@ -54,6 +56,8 @@
#define snprintf _snprintf
#endif

#define STALE_JOBID (INVALID_JOB_ID -1)

/*!
* \brief Unregisters a device.
*
Expand Down Expand Up @@ -314,9 +318,6 @@ static void genaNotifyThread(
notify_thread_struct *in = (notify_thread_struct *) input;
int return_code;
struct Handle_Info *handle_info;
ThreadPoolJob job;

memset(&job, 0, sizeof(job));

/* This should be a HandleLock and not a HandleReadLock otherwise if there
* is a lot of notifications, then multiple threads will acquire a read
Expand All @@ -340,22 +341,6 @@ static void genaNotifyThread(
HandleUnlock();
return;
}
#ifdef UPNP_ENABLE_NOTIFICATION_REORDERING
/*If the event is out of order push it back to the job queue */
if (in->eventKey != sub->ToSendEventKey) {
TPJobInit(&job, (start_routine) genaNotifyThread, input);
TPJobSetFreeFunction(&job, (free_function) free_notify_struct);
TPJobSetPriority(&job, MED_PRIORITY);
/* Sleep a little before creating another thread otherwise if there is
* a lot of notifications to send, the device will take 100% of the CPU
* to create threads and push them back to the job queue. */
imillisleep(1);
ThreadPoolAdd(&gSendThreadPool, &job, NULL);
freeSubscription(&sub_copy);
HandleUnlock();
return;
}
#endif

HandleUnlock();

Expand All @@ -380,6 +365,25 @@ static void genaNotifyThread(
if (sub->ToSendEventKey < 0)
/* wrap to 1 for overflow */
sub->ToSendEventKey = 1;

/* Remove head of event queue. Possibly activate next */
{
ListNode *node = ListHead(&sub->outgoing);
if (node)
ListDelNode(&sub->outgoing, node, 1);
if (ListSize(&sub->outgoing) > 0) {
ThreadPoolJob *job;
ListNode *node = ListHead(&sub->outgoing);
job = (ThreadPoolJob*)node->item;
/* The new head of queue should not have already been
added to the pool, else something is very wrong */
assert(job->jobId != STALE_JOBID);

ThreadPoolAdd(&gSendThreadPool, job, NULL);
job->jobId = STALE_JOBID;
}
}

if (return_code == GENA_E_NOTIFY_UNACCEPTED_REMOVE_SUB)
RemoveSubscriptionSID(in->sid, service);
free_notify_struct(in);
Expand Down Expand Up @@ -442,6 +446,29 @@ static char *AllocGenaHeaders(
return headers;
}

void freeSubscriptionQueuedEvents(subscription *sub)
{
if (ListSize(&sub->outgoing) > 0) {
/* The first event is discarded without dealing
notify_thread_struct: there is a mirror ThreadPool entry for
this one, and it will take care of the refcount etc. Other
entries must be fully cleaned-up here */
int first = 1;
ListNode *node = ListHead(&sub->outgoing);
while (node) {
ThreadPoolJob *job = (ThreadPoolJob *)node->item;
if (first) {
first = 0;
} else {
free_notify_struct((notify_thread_struct *)job->arg);
}
free(node->item);
ListDelNode(&sub->outgoing, node, 0);
node = ListHead(&sub->outgoing);
}
}
}

/* We take ownership of propertySet and will free it */
static int genaInitNotifyCommon(
UpnpDevice_Handle device_handle,
Expand All @@ -462,13 +489,19 @@ static int genaInitNotifyCommon(
subscription *sub = NULL;
service_info *service = NULL;
struct Handle_Info *handle_info;
ThreadPoolJob job;

memset(&job, 0, sizeof(job));
ThreadPoolJob *job = NULL;

UpnpPrintf(UPNP_INFO, GENA, __FILE__, __LINE__,
"GENA BEGIN INITIAL NOTIFY COMMON");

job = (ThreadPoolJob *)malloc(sizeof(ThreadPoolJob));
if (job == NULL) {
line = __LINE__;
ret = UPNP_E_OUTOF_MEMORY;
goto ExitFunction;
}
memset(job, 0, sizeof(ThreadPoolJob));

reference_count = (int *)malloc(sizeof (int));
if (reference_count == NULL) {
line = __LINE__;
Expand Down Expand Up @@ -545,24 +578,32 @@ static int genaInitNotifyCommon(
thread_struct->reference_count = reference_count;
thread_struct->device_handle = device_handle;

TPJobInit(&job, (start_routine)genaNotifyThread, thread_struct);
TPJobSetFreeFunction(&job, (free_routine)free_notify_struct);
TPJobSetPriority(&job, MED_PRIORITY);
TPJobInit(job, (start_routine)genaNotifyThread, thread_struct);
TPJobSetFreeFunction(job, (free_routine)free_notify_struct);
TPJobSetPriority(job, MED_PRIORITY);

ret = ThreadPoolAdd(&gSendThreadPool, &job, NULL);
ret = ThreadPoolAdd(&gSendThreadPool, job, NULL);
if (ret != 0) {
if (ret == EOUTOFMEM) {
line = __LINE__;
ret = UPNP_E_OUTOF_MEMORY;
}
} else {
line = __LINE__;
ret = GENA_SUCCESS;
ListNode *node = ListAddTail(&sub->outgoing, job);
if (node != NULL) {
((ThreadPoolJob *)node->item)->jobId = STALE_JOBID;
line = __LINE__;
ret = GENA_SUCCESS;
} else {
line = __LINE__;
ret = UPNP_E_OUTOF_MEMORY;
}
}
}

ExitFunction:
if (ret != GENA_SUCCESS) {
free(job);
free(thread_struct);
free(headers);
ixmlFreeDOMString(propertySet);
Expand Down Expand Up @@ -684,13 +725,11 @@ static int genaNotifyAllCommon(
subscription *finger = NULL;
service_info *service = NULL;
struct Handle_Info *handle_info;
ThreadPoolJob job;

memset(&job, 0, sizeof(job));

UpnpPrintf(UPNP_INFO, GENA, __FILE__, __LINE__,
"GENA BEGIN NOTIFY ALL COMMON");

/* Keep this allocation first */
reference_count = (int *)malloc(sizeof (int));
if (reference_count == NULL) {
line = __LINE__;
Expand Down Expand Up @@ -730,6 +769,9 @@ static int genaNotifyAllCommon(
if (service != NULL) {
finger = GetFirstSubscription(service);
while (finger) {
ThreadPoolJob *job = NULL;
ListNode *node;

thread_struct = (notify_thread_struct *)malloc(sizeof (notify_thread_struct));
if (thread_struct == NULL) {
line = __LINE__;
Expand All @@ -754,19 +796,50 @@ static int genaNotifyAllCommon(
finger->eventKey = 1;
}

TPJobInit(&job, (start_routine)genaNotifyThread, thread_struct);
TPJobSetFreeFunction(&job, (free_routine)free_notify_struct);
TPJobSetPriority(&job, MED_PRIORITY);
ret = ThreadPoolAdd(&gSendThreadPool, &job, NULL);
if (ret != 0) {
if (ListSize(&finger->outgoing) >=
MAX_SUBSCRIPTION_QUEUED_EVENTS) {
/* We discard the second event: first non-active */
ListNode *node = ListHead(&finger->outgoing);
if (node)
node = node->next;
if (node) {
/* We delete the node ourselves because we also need
to do the right thing about the thread struct */
job = (ThreadPoolJob *)node->item;
free_notify_struct((notify_thread_struct *)job->arg);
free(node->item);
ListDelNode(&finger->outgoing, node, 0);
}
}

job = (ThreadPoolJob *)malloc(sizeof(ThreadPoolJob));
if (job == NULL) {
line = __LINE__;
if (ret == EOUTOFMEM) {
ret = UPNP_E_OUTOF_MEMORY;
break;
}
memset(job, 0, sizeof(ThreadPoolJob));
TPJobInit(job, (start_routine)genaNotifyThread, thread_struct);
TPJobSetFreeFunction(job, (free_routine)free_notify_struct);
TPJobSetPriority(job, MED_PRIORITY);
node = ListAddTail(&finger->outgoing, job);

/* If there is only one element on the list (which we just
added), need to kickstart the threadpool */
if (ListSize(&finger->outgoing) == 1) {
ret = ThreadPoolAdd(&gSendThreadPool, job, NULL);
if (ret != 0) {
line = __LINE__;
ret = UPNP_E_OUTOF_MEMORY;
if (ret == EOUTOFMEM) {
line = __LINE__;
ret = UPNP_E_OUTOF_MEMORY;
}
break;
}
if (node) {
((ThreadPoolJob *)(node->item))->jobId = STALE_JOBID;
}
break;
}

finger = GetNextSubscription(service, finger);
}
} else {
Expand All @@ -776,7 +849,11 @@ static int genaNotifyAllCommon(
}

ExitFunction:
if (ret != GENA_SUCCESS || *reference_count == 0) {
/* The only case where we want to free memory here is if the
struct was never queued. Else, let the normal cleanup take place.
reference_count is allocated first so it's ok to do nothing if it's 0
*/
if (reference_count && *reference_count == 0) {
free(headers);
ixmlFreeDOMString(propertySet);
free(servId_copy);
Expand Down Expand Up @@ -1122,6 +1199,11 @@ void gena_process_subscription_request(
sub->DeliveryURLs.size = 0;
sub->DeliveryURLs.URLs = NULL;
sub->DeliveryURLs.parsedURLs = NULL;
if (ListInit(&sub->outgoing, 0, free) != 0) {
error_respond(info, HTTP_INTERNAL_SERVER_ERROR, request);
HandleUnlock();
goto exit_function;
}

/* check for valid callbacks */
if (httpmsg_find_hdr( request, HDR_CALLBACK, &callback_hdr) == NULL) {
Expand Down
2 changes: 2 additions & 0 deletions upnp/src/genlib/service_table/service_table.c
Expand Up @@ -74,6 +74,7 @@ copy_subscription( subscription * in,
copy_URL_list( &in->DeliveryURLs, &out->DeliveryURLs ) )
!= HTTP_SUCCESS )
return return_code;
ListInit(&out->outgoing, 0, 0);
out->next = NULL;
return HTTP_SUCCESS;
}
Expand Down Expand Up @@ -207,6 +208,7 @@ void freeSubscription(subscription *sub)
{
if (sub) {
free_URL_list(&sub->DeliveryURLs);
freeSubscriptionQueuedEvents(sub);
}
}

Expand Down
12 changes: 12 additions & 0 deletions upnp/src/inc/config.h
Expand Up @@ -142,6 +142,18 @@
#define MAX_JOBS_TOTAL 100
/* @} */

/*! \name MAX_SUBSCRIPTION_QUEUED_EVENTS
*
* The {\tt MAX_SUBSCRIPTION_QUEUED_EVENTS} determines the maximum number of
* events which can be queued for a given subscription before events begin
* to be discarded. This limits the amount of memory used for a
* non-responding subscribed entity.
*
* @{
*/
#define MAX_SUBSCRIPTION_QUEUED_EVENTS 5
/* @} */


/*!
* \name DEFAULT_SOAP_CONTENT_LENGTH
Expand Down
7 changes: 7 additions & 0 deletions upnp/src/inc/service_table.h
Expand Up @@ -61,9 +61,16 @@ typedef struct SUBSCRIPTION {
time_t expireTime;
int active;
URL_list DeliveryURLs;
/* List of queued events for this subscription. Only one event job
at a time goes into the thread pool. The first element in the
list is a copy of the active job. Others are activated on job
completion. */
LinkedList outgoing;
struct SUBSCRIPTION *next;
} subscription;

extern void freeSubscriptionQueuedEvents(subscription *sub);

typedef struct SERVICE_INFO {
DOMString serviceType;
DOMString serviceId;
Expand Down

0 comments on commit c54ecdd

Please sign in to comment.