Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
SAT>IP client: serialize request to server for consistency
  • Loading branch information
perexg committed Mar 25, 2016
1 parent 41707e1 commit e488066
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/input/mpegts/satip/satip.c
Expand Up @@ -588,6 +588,7 @@ satip_device_create( satip_device_info_t *info )
pthread_mutex_init(&sd->sd_tune_mutex, NULL);

TAILQ_INIT(&sd->sd_frontends);
TAILQ_INIT(&sd->sd_serialize_queue);

/* we may check if uuid matches, but the SHA hash should be enough */
if (sd->sd_info.uuid)
Expand Down
92 changes: 66 additions & 26 deletions src/input/mpegts/satip/satip_frontend.c
Expand Up @@ -29,6 +29,11 @@
#include <sys/socket.h>
#endif

/*
*
*/
static void satip_frontend_add_to_waiting( satip_frontend_t *lfe );

/*
*
*/
Expand Down Expand Up @@ -534,8 +539,10 @@ satip_frontend_is_enabled
return 0; /* master must be running */
return satip_frontend_match_satcfg(lfe2, mm, flags, weight);
}
if (lfe2->sf_master == lfe->sf_number && lfe2->sf_running)
return satip_frontend_match_satcfg(lfe2, mm, flags, weight);
if (lfe2->sf_master == lfe->sf_number) {
if (lfe2->sf_running)
return satip_frontend_match_satcfg(lfe2, mm, flags, weight);
}
}
return 1;
}
Expand Down Expand Up @@ -564,7 +571,7 @@ satip_frontend_stop_mux
mpegts_pid_done(&tr->sf_pids);
free(tr);
}
lfe->sf_running = 0;
lfe->sf_running = 0;
lfe->sf_req = NULL;
pthread_mutex_unlock(&lfe->sf_dvr_lock);
}
Expand All @@ -580,8 +587,10 @@ satip_frontend_warm_mux
if (r)
return r;

lfe->sf_netlimit = 0;
if (lfe->sf_positions > 0) {
lfe->sf_position = satip_satconf_get_position(lfe, mmi->mmi_mux, NULL, 2, 0, -1);
lfe->sf_position = satip_satconf_get_position(lfe, mmi->mmi_mux,
&lfe->sf_netlimit, 2, 0, -1);
if (lfe->sf_position <= 0)
return SM_CODE_TUNING_FAILED;
}
Expand Down Expand Up @@ -631,6 +640,9 @@ satip_frontend_start_mux
lfe->sf_status = SIGNAL_NONE;
pthread_mutex_unlock(&mmi->tii_stats_mutex);

if (lfe->sf_type == DVB_TYPE_S)
satip_frontend_add_to_waiting(lfe);

/* notify thread that we are ready */
tvh_write(lfe->sf_dvr_pipe.wr, "s", 1);

Expand Down Expand Up @@ -971,37 +983,65 @@ satip_frontend_pid_changed( http_client_t *rtsp,
return 0;
}

static void
satip_frontend_add_to_waiting( satip_frontend_t *lfe )
{
satip_device_t *sd = lfe->sf_device;
satip_frontend_t *lfe2;

if (lfe->sf_master) {
lfe2 = lfe;
} else {
TAILQ_FOREACH(lfe2, &lfe->sf_device->sd_frontends, sf_link)
if (lfe2 != lfe && lfe2->sf_master == lfe->sf_number)
break;
}
if (lfe2 == NULL && lfe->sf_netlimit <= 0)
return;

pthread_mutex_lock(&sd->sd_tune_mutex);
if (lfe->sf_serialize)
TAILQ_REMOVE(&sd->sd_serialize_queue, lfe, sf_serialize_link);
lfe->sf_serialize = 1;
TAILQ_INSERT_TAIL(&sd->sd_serialize_queue, lfe, sf_serialize_link);
tvhtrace("satip", "add to waiting: %s", lfe->mi_name);
pthread_mutex_unlock(&sd->sd_tune_mutex);
}

static int
satip_frontend_other_is_waiting( satip_frontend_t *lfe )
{
satip_frontend_t *lfe2;
satip_device_t *sd = lfe->sf_device;
int r;

TAILQ_FOREACH(lfe2, &lfe->sf_device->sd_frontends, sf_link) {
if (lfe == lfe2) continue;
pthread_mutex_lock(&lfe2->sf_dvr_lock);
r = lfe2->sf_wait_for & (1 << lfe->sf_number);
pthread_mutex_unlock(&lfe2->sf_dvr_lock);
if (r)
return 1;
}
return 0;
pthread_mutex_lock(&sd->sd_tune_mutex);
if (lfe->sf_serialize)
r = TAILQ_FIRST(&sd->sd_serialize_queue) != lfe;
else
r = 0;
pthread_mutex_unlock(&sd->sd_tune_mutex);
return r;
}

static void
satip_frontend_wake_other_waiting( satip_frontend_t *lfe )
{
satip_frontend_t *lfe2;

TAILQ_FOREACH(lfe2, &lfe->sf_device->sd_frontends, sf_link) {
if (lfe == lfe2) continue;
pthread_mutex_lock(&lfe2->sf_dvr_lock);
if (lfe2->sf_running && lfe2->sf_wait_for & (1 << lfe->sf_number)) {
lfe2->sf_wait_for &= ~(1 << lfe->sf_number);
tvh_write(lfe2->sf_dvr_pipe.wr, "o", 1);
satip_device_t *sd = lfe->sf_device;
pthread_mutex_lock(&sd->sd_tune_mutex);
if (lfe->sf_serialize) {
tvhtrace("satip", "remove from waiting: %s", lfe->mi_name);
lfe->sf_serialize = 0;
TAILQ_REMOVE(&sd->sd_serialize_queue, lfe, sf_serialize_link);
lfe = TAILQ_FIRST(&sd->sd_serialize_queue);
if (lfe != NULL) {
tvhtrace("satip", "wake other waiting: %s", lfe->mi_name);
pthread_mutex_lock(&lfe->sf_dvr_lock);
if (lfe->sf_running)
tvh_write(lfe->sf_dvr_pipe.wr, "o", 1);
pthread_mutex_unlock(&lfe->sf_dvr_lock);
}
pthread_mutex_unlock(&lfe2->sf_dvr_lock);
}
pthread_mutex_unlock(&sd->sd_tune_mutex);
}

static void
Expand Down Expand Up @@ -1302,15 +1342,15 @@ satip_frontend_input_thread ( void *aux )
u64_2 = getfastmonoclock();

pthread_mutex_lock(&lfe->sf_dvr_lock);
if (lfe->sf_wait_for == 0) start |= 2; else start &= ~2;
if (!satip_frontend_other_is_waiting(lfe)) start |= 2; else start &= ~2;
pthread_mutex_unlock(&lfe->sf_dvr_lock);

while (start != 3) {

nfds = tvhpoll_wait(efd, ev, 1, rtsp ? 55 : -1);

pthread_mutex_lock(&lfe->sf_dvr_lock);
if (lfe->sf_wait_for == 0) start |= 2; else start &= ~2;
if (!satip_frontend_other_is_waiting(lfe)) start |= 2; else start &= ~2;
pthread_mutex_unlock(&lfe->sf_dvr_lock);

if (!tvheadend_is_running()) { exit_flag = 1; goto done; }
Expand Down Expand Up @@ -1639,7 +1679,6 @@ satip_frontend_input_thread ( void *aux )
satip_frontend_tuning_error(lfe, tr);
fatal = 1;
} else {
satip_frontend_wake_other_waiting(lfe);
strncpy((char *)session, rtsp->hc_rtsp_session ?: "", sizeof(session));
session[sizeof(session)-1] = '\0';
stream_id = rtsp->hc_rtsp_stream_id;
Expand Down Expand Up @@ -1675,6 +1714,7 @@ satip_frontend_input_thread ( void *aux )
case RTSP_CMD_PLAY:
if (!running)
break;
satip_frontend_wake_other_waiting(lfe);
if (rtsp->hc_code == 200 && play2) {
play2 = 0;
if (satip_frontend_pid_changed(rtsp, lfe, buf) > 0) {
Expand Down
5 changes: 4 additions & 1 deletion src/input/mpegts/satip/satip_private.h
Expand Up @@ -97,6 +97,7 @@ struct satip_device
int sd_skip_ts;
int sd_disable_workarounds;
pthread_mutex_t sd_tune_mutex;
TAILQ_HEAD(,satip_frontend)sd_serialize_queue;
};

struct satip_tune_req {
Expand Down Expand Up @@ -156,7 +157,9 @@ struct satip_frontend
uint32_t sf_seq;
dvb_mux_t *sf_curmux;
time_t sf_last_data_tstamp;
uint32_t sf_wait_for;
int sf_netlimit;
int sf_serialize;
TAILQ_ENTRY(satip_frontend)sf_serialize_link;

/*
* Configuration
Expand Down
3 changes: 0 additions & 3 deletions src/input/mpegts/satip/satip_satconf.c
Expand Up @@ -167,9 +167,6 @@ satip_satconf_check_limits
return 1;
if (manage && lowest_lfe) {
/* free tuner with lowest weight */
pthread_mutex_lock(&lfe->sf_dvr_lock);
lfe->sf_wait_for |= 1 << lowest_lfe->sf_number;
pthread_mutex_unlock(&lfe->sf_dvr_lock);
mm2 = lowest_lfe->sf_req->sf_mmi->mmi_mux;
mm2->mm_stop(mm2, 1, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
goto retry;
Expand Down

0 comments on commit e488066

Please sign in to comment.