Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
use monotonic thread conditions where appropriate
  • Loading branch information
perexg committed Mar 4, 2016
1 parent cfe0b1b commit 6ff6709
Show file tree
Hide file tree
Showing 30 changed files with 239 additions and 180 deletions.
19 changes: 8 additions & 11 deletions src/descrambler/capmt.c
Expand Up @@ -256,7 +256,7 @@ typedef struct capmt_adapter {
typedef struct capmt {
caclient_t;

pthread_cond_t capmt_cond;
tvh_cond_t capmt_cond;

struct capmt_service_list capmt_services;

Expand Down Expand Up @@ -1641,7 +1641,6 @@ capmt_thread(void *aux)
capmt_t *capmt = aux;
capmt_adapter_t *ca;
capmt_opaque_t *t;
struct timespec ts;
int d, i, j, fatal;

tvhlog(LOG_INFO, "capmt", "%s active", capmt_name(capmt));
Expand Down Expand Up @@ -1677,7 +1676,7 @@ capmt_thread(void *aux)
pthread_mutex_lock(&capmt->capmt_mutex);

while(capmt->capmt_running && capmt->cac_enabled == 0)
pthread_cond_wait(&capmt->capmt_cond, &capmt->capmt_mutex);
tvh_cond_wait(&capmt->capmt_cond, &capmt->capmt_mutex);

pthread_mutex_unlock(&capmt->capmt_mutex);

Expand Down Expand Up @@ -1770,12 +1769,10 @@ capmt_thread(void *aux)
d = 60;
}

clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += d;

tvhlog(LOG_INFO, "capmt", "%s: Automatic reconnection attempt in in %d seconds", idnode_get_title(&capmt->cac_id, NULL), d);

pthread_cond_timedwait(&capmt->capmt_cond, &capmt->capmt_mutex, &ts);
tvh_cond_timedwait(&capmt->capmt_cond, &capmt->capmt_mutex,
getmonoclock() + d * MONOCLOCK_RESOLUTION);

pthread_mutex_unlock(&capmt->capmt_mutex);
}
Expand Down Expand Up @@ -2195,7 +2192,7 @@ capmt_service_start(caclient_t *cac, service_t *s)
LIST_INSERT_HEAD(&capmt->capmt_services, ct, ct_link);

/* wake-up idle thread */
pthread_cond_signal(&capmt->capmt_cond);
tvh_cond_signal(&capmt->capmt_cond, 0);

fin:
pthread_mutex_unlock(&capmt->capmt_mutex);
Expand Down Expand Up @@ -2245,7 +2242,7 @@ capmt_conf_changed(caclient_t *cac)
}
pthread_mutex_lock(&capmt->capmt_mutex);
capmt->capmt_reconfigure = 1;
pthread_cond_signal(&capmt->capmt_cond);
tvh_cond_signal(&capmt->capmt_cond, 0);
pthread_mutex_unlock(&capmt->capmt_mutex);
tvh_write(capmt->capmt_pipe.wr, "", 1);
} else {
Expand All @@ -2254,7 +2251,7 @@ capmt_conf_changed(caclient_t *cac)
pthread_mutex_lock(&capmt->capmt_mutex);
capmt->capmt_running = 0;
capmt->capmt_reconfigure = 0;
pthread_cond_signal(&capmt->capmt_cond);
tvh_cond_signal(&capmt->capmt_cond, 0);
tid = capmt->capmt_tid;
pthread_mutex_unlock(&capmt->capmt_mutex);
tvh_write(capmt->capmt_pipe.wr, "", 1);
Expand Down Expand Up @@ -2319,7 +2316,7 @@ caclient_t *capmt_create(void)
capmt_t *capmt = calloc(1, sizeof(*capmt));

pthread_mutex_init(&capmt->capmt_mutex, NULL);
pthread_cond_init(&capmt->capmt_cond, NULL);
tvh_cond_init(&capmt->capmt_cond);
TAILQ_INIT(&capmt->capmt_writeq);
tvh_pipe(O_NONBLOCK, &capmt->capmt_pipe);

Expand Down
30 changes: 13 additions & 17 deletions src/descrambler/cwc.c
Expand Up @@ -198,11 +198,11 @@ typedef struct cwc {

pthread_t cwc_tid;

pthread_cond_t cwc_cond;
tvh_cond_t cwc_cond;

pthread_mutex_t cwc_mutex;
pthread_mutex_t cwc_writer_mutex;
pthread_cond_t cwc_writer_cond;
tvh_cond_t cwc_writer_cond;
int cwc_writer_running;
struct cwc_message_queue cwc_writeq;

Expand Down Expand Up @@ -443,7 +443,7 @@ cwc_send_msg(cwc_t *cwc, const uint8_t *msg, size_t len, int sid, int enq, uint1
cm->cm_len = len;
pthread_mutex_lock(&cwc->cwc_writer_mutex);
TAILQ_INSERT_TAIL(&cwc->cwc_writeq, cm, cm_link);
pthread_cond_signal(&cwc->cwc_writer_cond);
tvh_cond_signal(&cwc->cwc_writer_cond, 0);
pthread_mutex_unlock(&cwc->cwc_writer_mutex);
} else {
if (tvh_write(cwc->cwc_fd, buf, len))
Expand Down Expand Up @@ -1046,7 +1046,6 @@ cwc_writer_thread(void *aux)
{
cwc_t *cwc = aux;
cwc_message_t *cm;
struct timespec ts;
int r;

pthread_mutex_lock(&cwc->cwc_writer_mutex);
Expand All @@ -1068,10 +1067,9 @@ cwc_writer_thread(void *aux)

/* If nothing is to be sent in CWC_KEEPALIVE_INTERVAL seconds we
need to send a keepalive */
ts.tv_sec = time(NULL) + CWC_KEEPALIVE_INTERVAL;
ts.tv_nsec = 0;
r = pthread_cond_timedwait(&cwc->cwc_writer_cond,
&cwc->cwc_writer_mutex, &ts);
r = tvh_cond_timedwait(&cwc->cwc_writer_cond,
&cwc->cwc_writer_mutex,
getmonoclock() + CWC_KEEPALIVE_INTERVAL * MONOCLOCK_RESOLUTION);
if(r == ETIMEDOUT)
cwc_send_ka(cwc);
}
Expand Down Expand Up @@ -1146,7 +1144,7 @@ cwc_session(cwc_t *cwc)
* We do all requests from now on in a separate thread
*/
cwc->cwc_writer_running = 1;
pthread_cond_init(&cwc->cwc_writer_cond, NULL);
tvh_cond_init(&cwc->cwc_writer_cond);
pthread_mutex_init(&cwc->cwc_writer_mutex, NULL);
TAILQ_INIT(&cwc->cwc_writeq);
tvhthread_create(&writer_thread_id, NULL, cwc_writer_thread, cwc, "cwc-writer");
Expand All @@ -1168,7 +1166,7 @@ cwc_session(cwc_t *cwc)
*/
shutdown(cwc->cwc_fd, SHUT_RDWR);
cwc->cwc_writer_running = 0;
pthread_cond_signal(&cwc->cwc_writer_cond);
tvh_cond_signal(&cwc->cwc_writer_cond, 0);
pthread_join(writer_thread_id, NULL);
tvhlog(LOG_DEBUG, "cwc", "Write thread joined");
}
Expand All @@ -1184,7 +1182,6 @@ cwc_thread(void *aux)
char errbuf[100];
char hostname[256];
int port;
struct timespec ts;
int attempts = 0;

pthread_mutex_lock(&cwc->cwc_mutex);
Expand Down Expand Up @@ -1240,14 +1237,13 @@ cwc_thread(void *aux)
caclient_set_status((caclient_t *)cwc, CACLIENT_STATUS_DISCONNECTED);

d = 3;
ts.tv_sec = time(NULL) + d;
ts.tv_nsec = 0;

tvhlog(LOG_INFO, "cwc",
"%s:%i: Automatic connection attempt in %d seconds",
cwc->cwc_hostname, cwc->cwc_port, d-1);

pthread_cond_timedwait(&cwc->cwc_cond, &cwc->cwc_mutex, &ts);
tvh_cond_timedwait(&cwc->cwc_cond, &cwc->cwc_mutex,
getmonoclock() + d * MONOCLOCK_RESOLUTION);
}

tvhlog(LOG_INFO, "cwc", "%s:%i inactive",
Expand Down Expand Up @@ -1737,14 +1733,14 @@ cwc_conf_changed(caclient_t *cac)
cwc->cwc_reconfigure = 1;
if(cwc->cwc_fd >= 0)
shutdown(cwc->cwc_fd, SHUT_RDWR);
pthread_cond_signal(&cwc->cwc_cond);
tvh_cond_signal(&cwc->cwc_cond, 0);
pthread_mutex_unlock(&cwc->cwc_mutex);
} else {
if (!cwc->cwc_running)
return;
pthread_mutex_lock(&cwc->cwc_mutex);
cwc->cwc_running = 0;
pthread_cond_signal(&cwc->cwc_cond);
tvh_cond_signal(&cwc->cwc_cond, 0);
tid = cwc->cwc_tid;
if (cwc->cwc_fd >= 0)
shutdown(cwc->cwc_fd, SHUT_RDWR);
Expand Down Expand Up @@ -1872,7 +1868,7 @@ caclient_t *cwc_create(void)
cwc_t *cwc = calloc(1, sizeof(*cwc));

pthread_mutex_init(&cwc->cwc_mutex, NULL);
pthread_cond_init(&cwc->cwc_cond, NULL);
tvh_cond_init(&cwc->cwc_cond);
cwc->cac_free = cwc_free;
cwc->cac_start = cwc_service_start;
cwc->cac_conf_changed = cwc_conf_changed;
Expand Down
2 changes: 1 addition & 1 deletion src/dvr/dvr_rec.c
Expand Up @@ -1266,7 +1266,7 @@ dvr_thread(void *aux)
while(run) {
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm == NULL) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
streaming_queue_remove(sq, sm);
Expand Down
8 changes: 4 additions & 4 deletions src/htsp_server.c
Expand Up @@ -164,7 +164,7 @@ typedef struct htsp_connection {
struct htsp_msg_q_queue htsp_active_output_queues;

pthread_mutex_t htsp_out_mutex;
pthread_cond_t htsp_out_cond;
tvh_cond_t htsp_out_cond;

htsp_msg_q_t htsp_hmq_ctrl;
htsp_msg_q_t htsp_hmq_epg;
Expand Down Expand Up @@ -415,7 +415,7 @@ htsp_send(htsp_connection_t *htsp, htsmsg_t *m, pktbuf_t *pb,

hmq->hmq_length++;
hmq->hmq_payload += payloadsize;
pthread_cond_signal(&htsp->htsp_out_cond);
tvh_cond_signal(&htsp->htsp_out_cond, 0);
pthread_mutex_unlock(&htsp->htsp_out_mutex);
}

Expand Down Expand Up @@ -3150,7 +3150,7 @@ htsp_write_scheduler(void *aux)

if((hmq = TAILQ_FIRST(&htsp->htsp_active_output_queues)) == NULL) {
/* Nothing to be done, go to sleep */
pthread_cond_wait(&htsp->htsp_out_cond, &htsp->htsp_out_mutex);
tvh_cond_wait(&htsp->htsp_out_cond, &htsp->htsp_out_mutex);
continue;
}

Expand Down Expand Up @@ -3268,7 +3268,7 @@ htsp_serve(int fd, void **opaque, struct sockaddr_storage *source,

pthread_mutex_lock(&htsp.htsp_out_mutex);
htsp.htsp_writer_run = 0;
pthread_cond_signal(&htsp.htsp_out_cond);
tvh_cond_signal(&htsp.htsp_out_cond, 0);
pthread_mutex_unlock(&htsp.htsp_out_mutex);

pthread_join(htsp.htsp_writer_thread, NULL);
Expand Down
10 changes: 5 additions & 5 deletions src/httpc.c
Expand Up @@ -85,7 +85,7 @@ static int http_running;
static tvhpoll_t *http_poll;
static TAILQ_HEAD(,http_client) http_clients;
static pthread_mutex_t http_lock;
static pthread_cond_t http_cond;
static tvh_cond_t http_cond;
static th_pipe_t http_pipe;
static char *http_user_agent;

Expand Down Expand Up @@ -1352,7 +1352,7 @@ http_client_thread ( void *p )
continue;
}
if (hc->hc_shutdown_wait) {
pthread_cond_broadcast(&http_cond);
tvh_cond_signal(&http_cond, 1);
/* Disable the poll looping for this moment */
http_client_poll_dir(hc, 0, 0);
pthread_mutex_unlock(&http_lock);
Expand All @@ -1364,7 +1364,7 @@ http_client_thread ( void *p )
pthread_mutex_lock(&http_lock);
hc->hc_running = 0;
if (hc->hc_shutdown_wait)
pthread_cond_broadcast(&http_cond);
tvh_cond_signal(&http_cond, 1);
pthread_mutex_unlock(&http_lock);
}
}
Expand Down Expand Up @@ -1529,7 +1529,7 @@ http_client_close ( http_client_t *hc )
pthread_mutex_lock(&http_lock);
hc->hc_shutdown_wait = 1;
while (hc->hc_running)
pthread_cond_wait(&http_cond, &http_lock);
tvh_cond_wait(&http_cond, &http_lock);
if (hc->hc_efd) {
memset(&ev, 0, sizeof(ev));
ev.fd = hc->hc_fd;
Expand Down Expand Up @@ -1572,7 +1572,7 @@ http_client_init ( const char *user_agent )

/* Setup list */
pthread_mutex_init(&http_lock, NULL);
pthread_cond_init(&http_cond, NULL);
tvh_cond_init(&http_cond);
TAILQ_INIT(&http_clients);

/* Setup pipe */
Expand Down
10 changes: 5 additions & 5 deletions src/idnode.c
Expand Up @@ -46,7 +46,7 @@ static RB_HEAD(,idclass_link) idclasses;
static RB_HEAD(,idclass_link) idrootclasses;
static TAILQ_HEAD(,idnode_save) idnodes_save;

pthread_cond_t save_cond;
tvh_cond_t save_cond;
pthread_t save_tid;
static int save_running;
static gtimer_t save_timer;
Expand Down Expand Up @@ -1096,7 +1096,7 @@ idnode_savefn ( idnode_t *self, char *filename, size_t fsize )
static void
idnode_save_trigger_thread_cb( void *aux )
{
pthread_cond_signal(&save_cond);
tvh_cond_signal(&save_cond, 0);
}

static void
Expand Down Expand Up @@ -1703,7 +1703,7 @@ save_thread ( void *aux )
if (ise)
gtimer_arm(&save_timer, idnode_save_trigger_thread_cb, NULL,
(ise->ise_reqtime + IDNODE_SAVE_DELAY) - dispatch_clock);
pthread_cond_wait(&save_cond, &global_lock);
tvh_cond_wait(&save_cond, &global_lock);
continue;
}
m = idnode_savefn(ise->ise_node, filename, sizeof(filename));
Expand Down Expand Up @@ -1749,7 +1749,7 @@ idnode_init(void)
RB_INIT(&idrootclasses);
TAILQ_INIT(&idnodes_save);

pthread_cond_init(&save_cond, NULL);
tvh_cond_init(&save_cond);
save_running = 1;
tvhthread_create(&save_tid, NULL, save_thread, NULL, "save");
}
Expand All @@ -1760,7 +1760,7 @@ idnode_done(void)
idclass_link_t *il;

save_running = 0;
pthread_cond_signal(&save_cond);
tvh_cond_signal(&save_cond, 0);
pthread_join(save_tid, NULL);
gtimer_disarm(&save_timer);

Expand Down

0 comments on commit 6ff6709

Please sign in to comment.