Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
DVR: rewrite DVR thread to handle better EPG running flag
  • Loading branch information
perexg committed Oct 31, 2015
1 parent e85a3c7 commit 4400bb2
Showing 1 changed file with 222 additions and 87 deletions.
309 changes: 222 additions & 87 deletions src/dvr/dvr_rec.c
Expand Up @@ -1075,6 +1075,106 @@ dvr_streaming_restart(dvr_entry_t *de, int *run)
}
}

/**
*
*/
static int
dvr_thread_pkt_stats(dvr_entry_t *de, th_pkt_t *pkt, int payload)
{
th_subscription_t *ts;
int ret = 0;

if ((ts = de->de_s) != NULL) {
if (pkt->pkt_err) {
de->de_data_errors += pkt->pkt_err;
ret = 1;
}
if (payload && pkt->pkt_payload)
subscription_add_bytes_out(ts, pktbuf_len(pkt->pkt_payload));
}
return ret;
}

/**
*
*/
static int
dvr_thread_mpegts_stats(dvr_entry_t *de, void *sm_data)
{
th_subscription_t *ts;
pktbuf_t *pb = sm_data;
int ret;

if (pb == NULL)
return 0;
if ((ts = de->de_s) != NULL) {
if (pb->pb_err) {
de->de_data_errors += pb->pb_err;
ret = 1;
}
subscription_add_bytes_out(ts, pktbuf_len(pb));
}
return ret;
}

/**
*
*/
static int
dvr_thread_rec_start(dvr_entry_t *de, int started,
streaming_start_t *ss, int *run,
int64_t *dts_offset,
int epg_running, const char *postproc)
{
profile_chain_t *prch = de->de_chain;
int ret = 0;

if (started &&
muxer_reconfigure(prch->prch_muxer, ss) < 0) {
tvhlog(LOG_WARNING,
"dvr", "Unable to reconfigure \"%s\"",
dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));

// Try to restart the recording if the muxer doesn't
// support reconfiguration of the streams.
dvr_thread_epilog(de, postproc);
started = 0;
*dts_offset = PTS_UNSET;
if (epg_running) {
if (!dvr_thread_global_lock(de, run))
return 0;
if (de->de_config->dvr_clone)
de = dvr_entry_clone(de);
dvr_thread_global_unlock(de);
}
}

if (!started) {
if (!dvr_thread_global_lock(de, run))
return 0;
dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0);
if(dvr_rec_start(de, ss) == 0)
ret = 1;
else
dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0);
dvr_thread_global_unlock(de);
}
return ret;
}

/**
*
*/
static void
dvr_thread_backlog_free(struct streaming_message_queue *backlog)
{
streaming_message_t *sm;
while ((sm = TAILQ_FIRST(backlog)) != NULL) {
TAILQ_REMOVE(backlog, sm, sm_link);
streaming_msg_free(sm);
}
}

/**
*
*/
Expand All @@ -1084,12 +1184,14 @@ dvr_thread(void *aux)
dvr_entry_t *de = aux;
profile_chain_t *prch = de->de_chain;
streaming_queue_t *sq = &prch->prch_sq;
streaming_message_t *sm;
th_subscription_t *ts;
th_pkt_t *pkt;
int run = 1, started = 0, comm_skip, epg_running, rs;
struct streaming_message_queue backlog;
streaming_message_t *sm, *sm2;
th_pkt_t *pkt, *pkt2, *pkt3;
streaming_start_t *ss = NULL;
int run = 1, started = 0, muxing = 0, comm_skip, epg_running = 0, rs;
int commercial = COMMERCIAL_UNKNOWN;
int64_t packets = 0;
int64_t packets = 0, dts_offset = PTS_UNSET;
time_t start_time = 0;
char *postproc;

if (!dvr_thread_global_lock(de, &run))
Expand All @@ -1098,39 +1200,35 @@ dvr_thread(void *aux)
postproc = de->de_config->dvr_postproc ? strdup(de->de_config->dvr_postproc) : NULL;
dvr_thread_global_unlock(de);

pthread_mutex_lock(&sq->sq_mutex);
TAILQ_INIT(&backlog);

pthread_mutex_lock(&sq->sq_mutex);
while(run) {
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm == NULL) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
streaming_queue_remove(sq, sm);

if ((ts = de->de_s) != NULL && started) {
pktbuf_t *pb = NULL;
if (sm->sm_type == SMT_PACKET) {
pb = ((th_pkt_t*)sm->sm_data)->pkt_payload;
if (((th_pkt_t*)sm->sm_data)->pkt_err) {
de->de_data_errors += ((th_pkt_t*)sm->sm_data)->pkt_err;
dvr_notify(de);
}
}
else if (sm->sm_type == SMT_MPEGTS) {
pb = sm->sm_data;
if (pb->pb_err) {
de->de_data_errors += pb->pb_err;
dvr_notify(de);
if (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS) {
if (de->de_running_start > de->de_running_stop) {
epg_running = 1;
} else if (de->de_running_start == 0 && de->de_running_stop == 0) {
if (start_time + 2 >= dispatch_clock) {
epg_running = 0;
TAILQ_INSERT_TAIL(&backlog, sm, sm_link);
continue;
} else {
if (TAILQ_FIRST(&backlog))
dvr_thread_backlog_free(&backlog);
epg_running = 1;
}
} else {
epg_running = 0;
}
if (pb)
subscription_add_bytes_out(ts, pktbuf_len(pb));
}

streaming_queue_remove(sq, sm);

epg_running = de->de_running_start > de->de_running_stop ||
(de->de_running_start == 0 && de->de_running_stop == 0);
pthread_mutex_unlock(&sq->sq_mutex);

switch(sm->sm_type) {
Expand Down Expand Up @@ -1161,67 +1259,97 @@ dvr_thread(void *aux)

commercial = pkt->pkt_commercial;

if (started) {
muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
sm->sm_data = NULL;
dvr_notify(de);
packets++;
if (!started)
break;

if (muxing == 0 &&
!dvr_thread_rec_start(de, started, ss, &run, &dts_offset,
epg_running, postproc))
break;

muxing = 1;
while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) {
TAILQ_REMOVE(&backlog, sm2, sm_link);
if (pkt->pkt_dts != PTS_UNSET) {
if (dts_offset == PTS_UNSET) {
pkt2 = sm2->sm_data;
dts_offset = pkt2->pkt_dts;
}
pkt3 = (th_pkt_t *)sm2->sm_data;
if (dts_offset != PTS_UNSET && pkt->pkt_dts >= dts_offset) {
pkt3 = pkt_copy_shallow(pkt3);
pkt3->pkt_dts -= dts_offset;
if (pkt3->pkt_pts != PTS_UNSET)
pkt3->pkt_pts -= dts_offset;
dvr_thread_pkt_stats(de, pkt3, 1);
muxer_write_pkt(prch->prch_muxer, sm2->sm_type, pkt3);
} else {
dvr_thread_pkt_stats(de, pkt3, 0);
}
}
streaming_msg_free(sm2);
}
if (dts_offset == PTS_UNSET && pkt->pkt_dts != PTS_UNSET)
dts_offset = pkt->pkt_dts;
if (pkt->pkt_dts != PTS_UNSET && dts_offset != PTS_UNSET &&
pkt->pkt_dts >= dts_offset) {
pkt3 = pkt_copy_shallow(pkt);
pkt3->pkt_dts -= dts_offset;
if (pkt3->pkt_pts != PTS_UNSET)
pkt3->pkt_pts -= dts_offset;
dvr_thread_pkt_stats(de, pkt3, 1);
muxer_write_pkt(prch->prch_muxer, sm->sm_type, pkt3);
} else {
dvr_thread_pkt_stats(de, pkt, 0);
}
dvr_notify(de);
packets++;
break;

case SMT_MPEGTS:
dvr_rec_set_state(de, !epg_running ? DVR_RS_EPG_WAIT : DVR_RS_RUNNING, 0);
if(started) {
if (!epg_running) {
if (packets) {
dvr_streaming_restart(de, &run);
packets = 0;
started = 0;
}
break;

if (!started)
break;

if (!epg_running) {
if (packets) {
dvr_streaming_restart(de, &run);
packets = 0;
started = 0;
}
muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
sm->sm_data = NULL;
dvr_notify(de);
packets++;
break;
}
break;

case SMT_START:
packets = 0;
if(started &&
muxer_reconfigure(prch->prch_muxer, sm->sm_data) < 0) {
tvhlog(LOG_WARNING,
"dvr", "Unable to reconfigure \"%s\"",
dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));
if (muxing == 0 &&
!dvr_thread_rec_start(de, started, ss, &run, &dts_offset,
epg_running, postproc))
break;

// Try to restart the recording if the muxer doesn't
// support reconfiguration of the streams.
dvr_thread_epilog(de, postproc);
started = 0;
if (epg_running) {
if (!dvr_thread_global_lock(de, &run))
break;
if (de->de_config->dvr_clone)
de = dvr_entry_clone(de);
dvr_thread_global_unlock(de);
}
muxing = 1;
while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) {
TAILQ_REMOVE(&backlog, sm2, sm_link);
dvr_thread_mpegts_stats(de, sm2->sm_data);
muxer_write_pkt(prch->prch_muxer, sm2->sm_type, sm2->sm_data);
sm2->sm_data = NULL;
streaming_msg_free(sm2);
}
dvr_thread_mpegts_stats(de, sm->sm_data);
muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
sm->sm_data = NULL;
dvr_notify(de);
packets++;
break;

if(!started) {
if (!dvr_thread_global_lock(de, &run))
break;
dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0);
if(dvr_rec_start(de, sm->sm_data) == 0)
started = 1;
else
dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0);
dvr_thread_global_unlock(de);
}
case SMT_START:
start_time = dispatch_clock;
packets = 0;
started = 1;
ss = streaming_start_copy((streaming_start_t *)sm->sm_data);
break;

case SMT_STOP:
if(sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) {
if (sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) {
// Subscription is restarting, wait for SMT_START

} else if(sm->sm_code == 0) {
Expand All @@ -1232,27 +1360,32 @@ dvr_thread(void *aux)
"dvr", "Recording completed: \"%s\"",
dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));

dvr_thread_epilog(de, postproc);
started = 0;
goto fin;

}else if(de->de_last_error != sm->sm_code) {
} else if (de->de_last_error != sm->sm_code) {
// Error during recording

dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code);
tvhlog(LOG_ERR,
"dvr", "Recording stopped: \"%s\": %s",
dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL),
streaming_code2txt(sm->sm_code));
dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code);
tvhlog(LOG_ERR,
"dvr", "Recording stopped: \"%s\": %s",
dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL),
streaming_code2txt(sm->sm_code));

dvr_thread_epilog(de, postproc);
started = 0;
fin:
dvr_thread_backlog_free(&backlog);
dvr_thread_epilog(de, postproc);
start_time = 0;
started = 0;
muxing = 0;
streaming_start_unref(ss);
ss = NULL;
}
break;

case SMT_SERVICE_STATUS:
if(sm->sm_code & TSS_PACKETS) {
if (sm->sm_code & TSS_PACKETS) {

} else if(sm->sm_code & TSS_ERRORS) {
} else if (sm->sm_code & TSS_ERRORS) {

int code = SM_CODE_UNDEFINED_ERROR;

Expand All @@ -1274,7 +1407,7 @@ dvr_thread(void *aux)

case SMT_NOSTART:

if(de->de_last_error != sm->sm_code) {
if (de->de_last_error != sm->sm_code) {
dvr_rec_set_state(de, DVR_RS_PENDING, sm->sm_code);

tvhlog(LOG_ERR,
Expand Down Expand Up @@ -1303,7 +1436,9 @@ dvr_thread(void *aux)
}
pthread_mutex_unlock(&sq->sq_mutex);

if(prch->prch_muxer)
dvr_thread_backlog_free(&backlog);

if (prch->prch_muxer)
dvr_thread_epilog(de, postproc);

free(postproc);
Expand Down

0 comments on commit 4400bb2

Please sign in to comment.