Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
http client: add a lock (clang reported concurrency)
  • Loading branch information
perexg committed Mar 8, 2016
1 parent eaf5a79 commit dce8cd7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
6 changes: 4 additions & 2 deletions src/http.h
Expand Up @@ -268,6 +268,8 @@ struct http_client {

TAILQ_ENTRY(http_client) hc_link;

pthread_mutex_t hc_mutex;

int hc_id;
int hc_fd;
char *hc_scheme;
Expand Down Expand Up @@ -306,6 +308,8 @@ struct http_client {
size_t hc_chunk_alloc;
size_t hc_chunk_pos;
char *hc_location;
uint8_t hc_running; /* outside hc_mutex */
uint8_t hc_shutdown_wait;/* outside hc_mutex */
int hc_redirects;
int hc_result;
int hc_shutdown:1;
Expand All @@ -317,8 +321,6 @@ struct http_client {
int hc_in_rtp_data:1;
int hc_chunked:1;
int hc_chunk_trails:1;
int hc_running:1;
int hc_shutdown_wait:1;
int hc_handle_location:1; /* handle the redirection (location) requests */
int hc_pause:1;

Expand Down
67 changes: 53 additions & 14 deletions src/httpc.c
Expand Up @@ -152,8 +152,11 @@ http_client_shutdown ( http_client_t *hc, int force, int reconnect )
}
}
if (hc->hc_fd >= 0) {
if (hc->hc_conn_closed)
if (hc->hc_conn_closed) {
pthread_mutex_unlock(&hc->hc_mutex);
hc->hc_conn_closed(hc, -hc->hc_result);
pthread_mutex_lock(&hc->hc_mutex);
}
if (hc->hc_fd >= 0)
close(hc->hc_fd);
hc->hc_fd = -1;
Expand Down Expand Up @@ -663,11 +666,15 @@ http_client_finish( http_client_t *hc )
tvhlog_hexdump("httpc", hc->hc_data, hc->hc_csize);
}
if (hc->hc_in_rtp_data && hc->hc_rtp_data_complete) {
pthread_mutex_unlock(&hc->hc_mutex);
res = hc->hc_rtp_data_complete(hc);
pthread_mutex_lock(&hc->hc_mutex);
if (res < 0)
return http_client_flush(hc, res);
} else if (hc->hc_data_complete) {
pthread_mutex_unlock(&hc->hc_mutex);
res = hc->hc_data_complete(hc);
pthread_mutex_lock(&hc->hc_mutex);
if (res < 0)
return http_client_flush(hc, res);
}
Expand Down Expand Up @@ -731,7 +738,9 @@ http_client_data_copy( http_client_t *hc, char *buf, size_t len )
int res;

if (hc->hc_data_received) {
pthread_mutex_unlock(&hc->hc_mutex);
res = hc->hc_data_received(hc, buf, len);
pthread_mutex_lock(&hc->hc_mutex);
if (res < 0)
return res;
} else {
Expand Down Expand Up @@ -882,17 +891,14 @@ http_client_data_received( http_client_t *hc, char *buf, ssize_t len, int hdr )
return end ? 1 : 0;
}

int
http_client_run( http_client_t *hc )
static int
http_client_run0( http_client_t *hc )
{
char *buf, *saveptr, *argv[3], *d, *p;
int ver, res, delimsize = 4;
ssize_t r;
size_t len;

if (hc == NULL)
return 0;

if (hc->hc_shutdown) {
if (hc->hc_ssl && hc->hc_ssl->shutdown) {
r = http_client_ssl_shutdown(hc);
Expand Down Expand Up @@ -1038,7 +1044,9 @@ http_client_run( http_client_t *hc )
if (p)
hc->hc_chunked = strcasecmp(p, "chunked") == 0;
if (hc->hc_hdr_received) {
pthread_mutex_unlock(&hc->hc_mutex);
res = hc->hc_hdr_received(hc);
pthread_mutex_lock(&hc->hc_mutex);
if (res < 0)
return http_client_flush(hc, res);
}
Expand Down Expand Up @@ -1084,7 +1092,9 @@ http_client_run( http_client_t *hc )
return HTTP_CON_RECEIVING;
}
if (hc->hc_rtp_data_received) {
pthread_mutex_unlock(&hc->hc_mutex);
res = hc->hc_rtp_data_received(hc, hc->hc_rbuf + r, hc->hc_csize);
pthread_mutex_lock(&hc->hc_mutex);
if (res < 0)
return res;
} else {
Expand All @@ -1107,6 +1117,19 @@ http_client_run( http_client_t *hc )
return res;
}

static int
http_client_run( http_client_t *hc )
{
int r;

if (hc == NULL)
return 0;
pthread_mutex_lock(&hc->hc_mutex);
r = http_client_run0(hc);
pthread_mutex_unlock(&hc->hc_mutex);
return r;
}

/*
*
*/
Expand Down Expand Up @@ -1290,13 +1313,17 @@ int
http_client_simple( http_client_t *hc, const url_t *url )
{
http_arg_list_t h;
int r;

pthread_mutex_lock(&hc->hc_mutex);
http_arg_init(&h);
hc->hc_hdr_create(hc, &h, url, 0);
free(hc->hc_url);
hc->hc_url = url->raw ? strdup(url->raw) : NULL;
return http_client_send(hc, HTTP_CMD_GET, url->path, url->query,
&h, NULL, 0);
r = http_client_send(hc, HTTP_CMD_GET, url->path, url->query,
&h, NULL, 0);
pthread_mutex_unlock(&hc->hc_mutex);
return r;
}

void
Expand Down Expand Up @@ -1330,10 +1357,10 @@ http_client_thread ( void *p )
http_client_t *hc;
char c;

while (http_running) {
while (atomic_get(&http_running)) {
n = tvhpoll_wait(http_poll, &ev, 1, -1);
if (n < 0) {
if (http_running && !ERRNO_AGAIN(errno))
if (atomic_get(&http_running) && !ERRNO_AGAIN(errno))
tvherror("httpc", "tvhpoll_wait() error");
} else if (n > 0) {
if (&http_pipe == ev.data.ptr) {
Expand Down Expand Up @@ -1398,11 +1425,12 @@ http_client_reconnect
struct http_client_ssl *ssl;
char errbuf[256];

pthread_mutex_lock(&hc->hc_mutex);
free(hc->hc_scheme);
free(hc->hc_host);

if (scheme == NULL || host == NULL)
return -EINVAL;
goto errnval;

port = http_port(hc, scheme, port);
hc->hc_pevents = 0;
Expand All @@ -1414,7 +1442,7 @@ http_client_reconnect
hc->hc_fd = tcp_connect(host, port, hc->hc_bindaddr, errbuf, sizeof(errbuf), -1);
if (hc->hc_fd < 0) {
tvhlog(LOG_ERR, "httpc", "%04X: Unable to connect to %s:%i - %s", shortid(hc), host, port, errbuf);
return -EINVAL;
goto errnval;
}
hc->hc_einprogress = 1;
tvhtrace("httpc", "%04X: Connected to %s:%i", shortid(hc), host, port);
Expand Down Expand Up @@ -1448,18 +1476,25 @@ http_client_reconnect
}
}

pthread_mutex_unlock(&hc->hc_mutex);
return 0;

err4:
SSL_free(ssl->ssl);
ssl->ssl = NULL;
err3:
BIO_free(ssl->rbio);
BIO_free(ssl->wbio);
ssl->rbio = ssl->wbio = NULL;
err2:
SSL_CTX_free(ssl->ctx);
ssl->ctx = NULL;
err1:
close(hc->hc_fd);
hc->hc_fd = -1;
free(ssl);
errnval:
pthread_mutex_unlock(&hc->hc_mutex);
return -EINVAL;
}

Expand All @@ -1472,6 +1507,7 @@ http_client_connect
static int tally;

hc = calloc(1, sizeof(http_client_t));
pthread_mutex_init(&hc->hc_mutex, NULL);
hc->hc_id = ++tally;
hc->hc_aux = aux;
hc->hc_io_size = 1024;
Expand Down Expand Up @@ -1539,13 +1575,16 @@ http_client_close ( http_client_t *hc )
}
pthread_mutex_unlock(&http_lock);
}
pthread_mutex_lock(&hc->hc_mutex);
http_client_shutdown(hc, 1, 0);
http_client_flush(hc, 0);
tvhtrace("httpc", "%04X: Closed", shortid(hc));
while ((wcmd = TAILQ_FIRST(&hc->hc_wqueue)) != NULL)
http_client_cmd_destroy(hc, wcmd);
http_client_ssl_free(hc);
rtsp_clear_session(hc);
pthread_mutex_unlock(&hc->hc_mutex);
pthread_mutex_destroy(&hc->hc_mutex);
free(hc->hc_url);
free(hc->hc_location);
free(hc->hc_rbuf);
Expand Down Expand Up @@ -1587,7 +1626,7 @@ http_client_init ( const char *user_agent )
tvhpoll_add(http_poll, &ev, 1);

/* Setup thread */
http_running = 1;
atomic_set(&http_running, 1);
tvhthread_create(&http_client_tid, NULL, http_client_thread, NULL, "httpc");
#if HTTPCLIENT_TESTSUITE
http_client_testsuite_run();
Expand All @@ -1599,7 +1638,7 @@ http_client_done ( void )
{
http_client_t *hc;

http_running = 0;
atomic_set(&http_running, 0);
tvh_write(http_pipe.wr, "", 1);
pthread_join(http_client_tid, NULL);
tvh_pipe_close(&http_pipe);
Expand Down

0 comments on commit dce8cd7

Please sign in to comment.