Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
satip client: fix the network grouping handling
  • Loading branch information
perexg committed May 25, 2016
1 parent 8a35074 commit bb64db6
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 87 deletions.
165 changes: 87 additions & 78 deletions src/input/mpegts/satip/satip_frontend.c
Expand Up @@ -29,11 +29,6 @@
#include <sys/socket.h>
#endif

/*
*
*/
static void satip_frontend_add_to_waiting( satip_frontend_t *lfe );

/*
*
*/
Expand Down Expand Up @@ -592,10 +587,10 @@ satip_frontend_warm_mux
if (r)
return r;

lfe->sf_netlimit = 0;
if (lfe->sf_positions > 0) {
lfe->sf_position = satip_satconf_get_position(lfe, mmi->mmi_mux,
&lfe->sf_netlimit, 2, 0, -1);
&lfe->sf_netposhash,
2, 0, -1);
if (lfe->sf_position <= 0)
return SM_CODE_TUNING_FAILED;
}
Expand Down Expand Up @@ -635,6 +630,8 @@ satip_frontend_start_mux
if (lfe->sf_device->sd_can_weight)
tr->sf_weight = weight;

tr->sf_netposhash = lfe->sf_netposhash;

pthread_mutex_lock(&lfe->sf_dvr_lock);
lfe->sf_req = tr;
lfe->sf_running = 1;
Expand All @@ -645,9 +642,6 @@ satip_frontend_start_mux
lfe->sf_status = SIGNAL_NONE;
pthread_mutex_unlock(&mmi->tii_stats_mutex);

if (lfe->sf_type == DVB_TYPE_S)
satip_frontend_add_to_waiting(lfe);

/* notify thread that we are ready */
tvh_write(lfe->sf_dvr_pipe.wr, "s", 1);

Expand Down Expand Up @@ -988,67 +982,82 @@ satip_frontend_pid_changed( http_client_t *rtsp,
return 0;
}

static void
satip_frontend_add_to_waiting( satip_frontend_t *lfe )
static int
satip_frontend_other_is_waiting( satip_frontend_t *lfe )
{
satip_device_t *sd = lfe->sf_device;
satip_frontend_t *lfe2;
int r = 0, hash1, hash2;

if (lfe->sf_master) {
lfe2 = lfe;
} else {
TAILQ_FOREACH(lfe2, &lfe->sf_device->sd_frontends, sf_link)
if (lfe2 != lfe && lfe2->sf_master == lfe->sf_number)
break;
}
if (lfe2 == NULL && lfe->sf_netlimit <= 0)
return;
if (lfe->sf_type != DVB_TYPE_S)
return 0;

pthread_mutex_lock(&lfe->sf_dvr_lock);
hash1 = lfe->sf_req ? lfe->sf_req->sf_netposhash : 0;
pthread_mutex_unlock(&lfe->sf_dvr_lock);

if (hash1 == 0)
return 0;

pthread_mutex_lock(&sd->sd_tune_mutex);
if (lfe->sf_serialize)
TAILQ_REMOVE(&sd->sd_serialize_queue, lfe, sf_serialize_link);
lfe->sf_serialize = 1;
TAILQ_INSERT_TAIL(&sd->sd_serialize_queue, lfe, sf_serialize_link);
tvhtrace("satip", "add to waiting: %s", lfe->mi_name);
TAILQ_FOREACH(lfe2, &lfe->sf_device->sd_frontends, sf_link) {
if (lfe2 == lfe)
continue;
if ((lfe->sf_master && lfe2->sf_number != lfe->sf_master) ||
(lfe2->sf_master && lfe->sf_number != lfe2->sf_master))
continue;
pthread_mutex_lock(&lfe2->sf_dvr_lock);
hash2 = lfe2->sf_req_thread ? lfe2->sf_req_thread->sf_netposhash : 0;
pthread_mutex_unlock(&lfe2->sf_dvr_lock);
if (hash2 == 0)
continue;
if (hash2 != hash1) {
r = 1;
break;
}
}
pthread_mutex_unlock(&sd->sd_tune_mutex);
return r;
}

static int
satip_frontend_other_is_waiting( satip_frontend_t *lfe )
static void
satip_frontend_wake_other_waiting
( satip_frontend_t *lfe, satip_tune_req_t *tr )
{
satip_device_t *sd = lfe->sf_device;
int r;
satip_frontend_t *lfe2;
int hash1, hash2;

pthread_mutex_lock(&sd->sd_tune_mutex);
if (lfe->sf_serialize)
r = TAILQ_FIRST(&sd->sd_serialize_queue) != lfe;
else
r = 0;
pthread_mutex_unlock(&sd->sd_tune_mutex);
return r;
if (tr == NULL)
return;

hash1 = tr->sf_netposhash;

TAILQ_FOREACH(lfe2, &lfe->sf_device->sd_frontends, sf_link) {
if (lfe2 == lfe)
continue;
if ((lfe->sf_master && lfe2->sf_number != lfe->sf_master) ||
(lfe2->sf_master && lfe->sf_number != lfe2->sf_master))
continue;
pthread_mutex_lock(&lfe2->sf_dvr_lock);
hash2 = lfe2->sf_req ? lfe2->sf_req->sf_netposhash : 0;
if (hash2 != 0 && hash1 == hash2 && lfe2->sf_running)
tvh_write(lfe2->sf_dvr_pipe.wr, "o", 1);
pthread_mutex_unlock(&lfe2->sf_dvr_lock);
}
}

static void
satip_frontend_wake_other_waiting( satip_frontend_t *lfe )
satip_frontend_request_cleanup
( satip_frontend_t *lfe, satip_tune_req_t *tr )
{
satip_device_t *sd = lfe->sf_device;
pthread_mutex_lock(&sd->sd_tune_mutex);
if (lfe->sf_serialize) {
tvhtrace("satip", "remove from waiting: %s", lfe->mi_name);
lfe->sf_serialize = 0;
TAILQ_REMOVE(&sd->sd_serialize_queue, lfe, sf_serialize_link);
lfe = TAILQ_FIRST(&sd->sd_serialize_queue);
if (lfe != NULL) {
tvhtrace("satip", "wake other waiting: %s", lfe->mi_name);
pthread_mutex_unlock(&sd->sd_tune_mutex);
pthread_mutex_lock(&lfe->sf_dvr_lock);
if (lfe->sf_running)
tvh_write(lfe->sf_dvr_pipe.wr, "o", 1);
pthread_mutex_unlock(&lfe->sf_dvr_lock);
return;
}
if (tr && tr != lfe->sf_req) {
mpegts_pid_done(&tr->sf_pids);
mpegts_pid_done(&tr->sf_pids_tuned);
free(tr);
}
pthread_mutex_unlock(&sd->sd_tune_mutex);
if (tr == lfe->sf_req_thread)
lfe->sf_req_thread = NULL;

}

static void
Expand Down Expand Up @@ -1112,7 +1121,8 @@ satip_frontend_extra_shutdown

static void
satip_frontend_shutdown
( satip_frontend_t *lfe, const char *name, http_client_t *rtsp, tvhpoll_t *efd )
( satip_frontend_t *lfe, const char *name, http_client_t *rtsp,
satip_tune_req_t *tr, tvhpoll_t *efd )
{
char b[32];
tvhpoll_event_t ev;
Expand Down Expand Up @@ -1144,6 +1154,10 @@ satip_frontend_shutdown
}
}
sbuf_free(&lfe->sf_sbuf);

pthread_mutex_lock(&lfe->sf_dvr_lock);
satip_frontend_request_cleanup(lfe, tr);
pthread_mutex_unlock(&lfe->sf_dvr_lock);
}

static void
Expand All @@ -1166,7 +1180,8 @@ satip_frontend_tuning_error ( satip_frontend_t *lfe, satip_tune_req_t *tr )

static void
satip_frontend_close_rtsp
( satip_frontend_t *lfe, const char *name, tvhpoll_t *efd, http_client_t **rtsp )
( satip_frontend_t *lfe, const char *name, tvhpoll_t *efd,
http_client_t **rtsp, satip_tune_req_t **tr )
{
tvhpoll_event_t ev;

Expand All @@ -1176,7 +1191,7 @@ satip_frontend_close_rtsp
ev.data.ptr = NULL;
tvhpoll_rem(efd, &ev, 1);

satip_frontend_shutdown(lfe, name, *rtsp, efd);
satip_frontend_shutdown(lfe, name, *rtsp, *tr, efd);

memset(&ev, 0, sizeof(ev));
ev.events = TVHPOLL_IN;
Expand All @@ -1185,8 +1200,9 @@ satip_frontend_close_rtsp
tvhpoll_add(efd, &ev, 1);

http_client_close(*rtsp);
satip_frontend_wake_other_waiting(lfe);
satip_frontend_wake_other_waiting(lfe, *tr);
*rtsp = NULL;
*tr = NULL;
}

static int
Expand Down Expand Up @@ -1332,7 +1348,7 @@ satip_frontend_input_thread ( void *aux )
lfe_master = NULL;

if (rtsp && !lfe->sf_device->sd_fast_switch)
satip_frontend_close_rtsp(lfe, buf, efd, &rtsp);
satip_frontend_close_rtsp(lfe, buf, efd, &rtsp, &tr);

if (rtsp)
rtsp->hc_rtp_data_received = NULL;
Expand All @@ -1348,22 +1364,18 @@ satip_frontend_input_thread ( void *aux )

u64_2 = getfastmonoclock();

pthread_mutex_lock(&lfe->sf_dvr_lock);
if (!satip_frontend_other_is_waiting(lfe)) start |= 2; else start &= ~2;
pthread_mutex_unlock(&lfe->sf_dvr_lock);

while (start != 3) {

nfds = tvhpoll_wait(efd, ev, 1, rtsp ? 55 : -1);

pthread_mutex_lock(&lfe->sf_dvr_lock);
if (!satip_frontend_other_is_waiting(lfe)) start |= 2; else start &= ~2;
pthread_mutex_unlock(&lfe->sf_dvr_lock);

if (!tvheadend_is_running()) { exit_flag = 1; goto done; }
if (rtsp && (getfastmonoclock() - u64_2 > 50000 || /* 50ms */
satip_frontend_other_is_waiting(lfe)))
satip_frontend_close_rtsp(lfe, buf, efd, &rtsp);
(start & 2) == 0))
satip_frontend_close_rtsp(lfe, buf, efd, &rtsp, &tr);
if (nfds <= 0) continue;

if (ev[0].data.ptr == NULL) {
Expand Down Expand Up @@ -1406,6 +1418,7 @@ satip_frontend_input_thread ( void *aux )
lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf));

pthread_mutex_lock(&lfe->sf_dvr_lock);
satip_frontend_request_cleanup(lfe, tr);
lfe->sf_req_thread = tr = lfe->sf_req;
pthread_mutex_unlock(&lfe->sf_dvr_lock);

Expand Down Expand Up @@ -1721,7 +1734,7 @@ satip_frontend_input_thread ( void *aux )
case RTSP_CMD_PLAY:
if (!running)
break;
satip_frontend_wake_other_waiting(lfe);
satip_frontend_wake_other_waiting(lfe, tr);
if (rtsp->hc_code == 200 && play2) {
play2 = 0;
if (satip_frontend_pid_changed(rtsp, lfe, buf) > 0) {
Expand Down Expand Up @@ -1856,21 +1869,13 @@ satip_frontend_input_thread ( void *aux )
tvhpoll_rem(efd, ev, nfds);

if (exit_flag) {
satip_frontend_shutdown(lfe, buf, rtsp, efd);
satip_frontend_shutdown(lfe, buf, rtsp, tr, efd);
http_client_close(rtsp);
rtsp = NULL;
tr = NULL;
}

done:
pthread_mutex_lock(&lfe->sf_dvr_lock);
if (tr && tr != lfe->sf_req) {
mpegts_pid_done(&tr->sf_pids);
mpegts_pid_done(&tr->sf_pids_tuned);
free(tr);
}
lfe->sf_req_thread = tr = NULL;
pthread_mutex_unlock(&lfe->sf_dvr_lock);

udp_close(rtcp);
udp_close(rtp);
rtcp = rtp = NULL;
Expand All @@ -1884,14 +1889,18 @@ satip_frontend_input_thread ( void *aux )
if (!exit_flag)
goto new_tune;

pthread_mutex_lock(&lfe->sf_dvr_lock);
satip_frontend_request_cleanup(lfe, tr);
pthread_mutex_unlock(&lfe->sf_dvr_lock);

if (rtsp)
http_client_close(rtsp);

tvhpoll_destroy(efd);
lfe->sf_display_name = NULL;
lfe->sf_curmux = NULL;

satip_frontend_wake_other_waiting(lfe);
satip_frontend_wake_other_waiting(lfe, tr);

return NULL;
#undef PKTS
Expand Down
8 changes: 5 additions & 3 deletions src/input/mpegts/satip/satip_private.h
Expand Up @@ -108,6 +108,8 @@ struct satip_tune_req {

int sf_weight;
int sf_weight_tuned;

int sf_netposhash;
};

struct satip_frontend
Expand Down Expand Up @@ -158,8 +160,7 @@ struct satip_frontend
dvb_mux_t *sf_curmux;
time_t sf_last_data_tstamp;
int sf_netlimit;
int sf_serialize;
TAILQ_ENTRY(satip_frontend)sf_serialize_link;
int sf_netposhash;

/*
* Configuration
Expand Down Expand Up @@ -256,7 +257,8 @@ int satip_satconf_get_grace
( satip_frontend_t *lfe, mpegts_mux_t *mm );

int satip_satconf_get_position
( satip_frontend_t *lfe, mpegts_mux_t *mm, int *netlimit, int check, int flags, int weight );
( satip_frontend_t *lfe, mpegts_mux_t *mm, int *hash,
int check, int flags, int weight );

/*
* RTSP part
Expand Down
13 changes: 7 additions & 6 deletions src/input/mpegts/satip/satip_satconf.c
Expand Up @@ -96,7 +96,7 @@ static int
satip_satconf_hash ( mpegts_mux_t *mm, int position )
{
dvb_mux_conf_t *mc = &((dvb_mux_t *)mm)->lm_tuning;
assert(position <= 0x7fff);
assert(position <= 0x7fff);
return 1 | (mc->dmc_fe_freq > 11700000 ? 2 : 0) |
((int)mc->u.dmc_fe_qpsk.polarisation << 8) |
(position << 16);
Expand Down Expand Up @@ -176,13 +176,14 @@ satip_satconf_check_limits

int
satip_satconf_get_position
( satip_frontend_t *lfe, mpegts_mux_t *mm, int *netlimit, int check, int flags, int weight )
( satip_frontend_t *lfe, mpegts_mux_t *mm, int *hash,
int check, int flags, int weight )
{
satip_satconf_t *sfc;
sfc = satip_satconf_find_ele(lfe, mm);
if (sfc && sfc->sfc_enabled) {
if (netlimit)
*netlimit = sfc->sfc_network_limit;
if (hash)
*hash = sfc->sfc_network_group > 0 ? satip_satconf_hash(mm, sfc->sfc_position) : 0;
if (!check)
return sfc->sfc_position;
if (check > 1) {
Expand All @@ -195,8 +196,8 @@ satip_satconf_get_position
return sfc->sfc_position;
}
} else {
if (netlimit)
*netlimit = 0;
if (hash)
*hash = 0;
}
return 0;
}
Expand Down

0 comments on commit bb64db6

Please sign in to comment.