Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
timeshift: merge packet log code to one fcn, many fixes
  • Loading branch information
perexg committed Jan 3, 2016
1 parent debc4e7 commit 385b395
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 95 deletions.
55 changes: 32 additions & 23 deletions src/timeshift.c
Expand Up @@ -38,6 +38,27 @@ static int timeshift_index = 0;

struct timeshift_conf timeshift_conf;

/*
* Packet log
*/
void
timeshift_packet_log0
( const char *source, timeshift_t *ts, streaming_message_t *sm )
{
th_pkt_t *pkt = sm->sm_data;
tvhtrace("timeshift",
"ts %d pkt %s - stream %d type %c pts %10"PRId64
" dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
ts->id, source,
pkt->pkt_componentindex,
pkt_frametype_to_char(pkt->pkt_frametype),
ts_rescale(pkt->pkt_pts, 1000000),
ts_rescale(pkt->pkt_dts, 1000000),
pkt->pkt_duration,
pktbuf_len(pkt->pkt_payload),
sm->sm_time);
}

/*
* Safe values for RAM configuration
*/
Expand Down Expand Up @@ -214,23 +235,6 @@ const idclass_t timeshift_conf_class = {
#define MAX_TIME_DELTA (2*1000000) /* 2 seconds */
#define BACKLOG_COUNT ARRAY_SIZE(timeshift_t->backlog)

static void
timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm )
{
th_pkt_t *pkt = sm->sm_data;
tvhtrace("timeshift",
"ts %d pkt buf - stream %d type %c pts %10"PRId64
" dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
ts->id,
pkt->pkt_componentindex,
pkt_frametype_to_char(pkt->pkt_frametype),
ts_rescale(pkt->pkt_pts, 1000000),
ts_rescale(pkt->pkt_dts, 1000000),
pkt->pkt_duration,
pktbuf_len(pkt->pkt_payload),
sm->sm_time);
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
}

static void
timeshift_packet_flush ( timeshift_t *ts, int64_t time )
Expand All @@ -251,7 +255,9 @@ timeshift_packet_flush ( timeshift_t *ts, int64_t time )
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
timeshift_packet_deliver(ts, lowest);
ts->last_wr_time = lowest->sm_time;
timeshift_packet_log("wr ", ts, lowest);
streaming_target_deliver2(&ts->wr_queue.sq_st, lowest);
}
}

Expand All @@ -274,7 +280,9 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )

sm->sm_time = time;
if (time + MAX_TIME_DELTA < ts->last_time) {
timeshift_packet_deliver(ts, sm);
ts->last_wr_time = time;
timeshift_packet_log("wr2", ts, sm);
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
} else {
if (pkt->pkt_componentindex >= ts->backlog_max)
ts->backlog_max = pkt->pkt_componentindex + 1;
Expand Down Expand Up @@ -323,10 +331,11 @@ static void timeshift_input

/* Send to the writer thread */
if (ts->packet_mode) {
sm->sm_time = ts->last_time;
sm->sm_time = ts->last_wr_time;
if (type == SMT_PACKET) {
timeshift_packet(ts, pkt);
goto msg_free;
streaming_msg_free(sm);
goto _exit;
}
} else {
if (ts->ref_time == 0) {
Expand All @@ -337,10 +346,9 @@ static void timeshift_input
}
}
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
msg_free:
streaming_msg_free(sm);

/* Exit/Stop */
_exit:
if (ts->exit)
timeshift_write_exit(ts->rd_pipe.wr);
}
Expand Down Expand Up @@ -417,6 +425,7 @@ streaming_target_t *timeshift_create
ts->dobuf = ts->ondemand ? 0 : 1;
ts->packet_mode= 1;
ts->last_time = 0;
ts->last_wr_time = 0;
ts->buf_time = 0;
ts->start_pts = 0;
ts->ref_time = 0;
Expand Down
16 changes: 12 additions & 4 deletions src/timeshift/private.h
Expand Up @@ -19,7 +19,7 @@
#ifndef __TVH_TIMESHIFT_PRIVATE_H__
#define __TVH_TIMESHIFT_PRIVATE_H__

#define TIMESHIFT_PLAY_BUF 500000 //< us to buffer in TX
#define TIMESHIFT_PLAY_BUF 1000000 //< us to buffer in TX
#define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file
#define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams

Expand Down Expand Up @@ -94,6 +94,7 @@ typedef struct timeshift {
int packet_mode;///< Packet mode (otherwise MPEG-TS data mode)
int dobuf; ///< Buffer packets (store)
int64_t last_time; ///< Last time in us (PTS conversion)
int64_t last_wr_time;///< Last write time in us (PTS conversion)
int64_t start_pts; ///< Start time for packets (PTS)
int64_t ref_time; ///< Start time in us (monoclock)
int64_t buf_time; ///< Last buffered time in us (PTS conversion)
Expand Down Expand Up @@ -128,6 +129,16 @@ typedef struct timeshift {
extern uint64_t timeshift_total_size;
extern uint64_t timeshift_total_ram_size;

void timeshift_packet_log0
( const char *prefix, timeshift_t *ts, streaming_message_t *sm );

static inline void timeshift_packet_log
( const char *prefix, timeshift_t *ts, streaming_message_t *sm )
{
if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
timeshift_packet_log0(prefix, ts, sm);
}

/*
* Write functions
*/
Expand All @@ -141,9 +152,6 @@ ssize_t timeshift_write_stop ( int fd, int code );
ssize_t timeshift_write_exit ( int fd );
ssize_t timeshift_write_eof ( timeshift_file_t *tsf );

void timeshift_writer_flush ( timeshift_t *ts );
void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst );

/*
* Threads
*/
Expand Down
40 changes: 10 additions & 30 deletions src/timeshift/timeshift_reader.c
Expand Up @@ -413,25 +413,6 @@ static int _timeshift_read
return 0;
}

/*
* Trace packet
*/
static void timeshift_trace_pkt
( timeshift_t *ts, streaming_message_t *sm )
{
th_pkt_t *pkt = sm->sm_data;
tvhtrace("timeshift",
"ts %d pkt out - stream %d type %c pts %10"PRId64
" dts %10"PRId64 " dur %10d len %6zu time %14"PRId64,
ts->id,
pkt->pkt_componentindex,
pkt_frametype_to_char(pkt->pkt_frametype),
ts_rescale(pkt->pkt_pts, 1000000),
ts_rescale(pkt->pkt_dts, 1000000),
pkt->pkt_duration,
pktbuf_len(pkt->pkt_payload), sm->sm_time);
}

/*
* Flush all data to live
*/
Expand All @@ -444,8 +425,7 @@ static int _timeshift_flush_to_live
if (_timeshift_read(ts, cur_file, &sm, wait) == -1)
return -1;
if (!sm) break;
if (tvhtrace_enabled() && sm->sm_type == SMT_PACKET)
timeshift_trace_pkt(ts, sm);
timeshift_packet_log("ouf", ts, sm);
streaming_target_deliver2(ts->output, sm);
}
return 0;
Expand Down Expand Up @@ -503,7 +483,7 @@ static void timeshift_status
void *timeshift_reader ( void *p )
{
timeshift_t *ts = p;
int nfds, end, run = 1, wait = -1;
int nfds, end, run = 1, wait = -1, state;
timeshift_file_t *cur_file = NULL, *tmp_file;
int cur_speed = 100, keyframe_mode = 0;
int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
Expand Down Expand Up @@ -567,7 +547,8 @@ void *timeshift_reader ( void *p )
if (cur_speed != speed) {

/* Live playback */
if (ts->state == TS_LIVE) {
state = ts->state;
if (state == TS_LIVE) {

/* Reject */
if (speed >= 100) {
Expand All @@ -579,7 +560,6 @@ void *timeshift_reader ( void *p )
} else {
tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode",
ts->id);
timeshift_writer_flush(ts);
ts->dobuf = 1;
tmp_file = timeshift_filemgr_newest(ts);
if (tmp_file != NULL) {
Expand Down Expand Up @@ -613,14 +593,15 @@ void *timeshift_reader ( void *p )

/* Update */
cur_speed = speed;
if (speed != 100 || ts->state != TS_LIVE) {
if (speed != 100 || state != TS_LIVE) {
ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
tvhtrace("timeshift", "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY");
}
if (ts->state == TS_PLAY) {
if (ts->state == TS_PLAY && state != TS_PLAY) {
mono_play_time = mono_now;
tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, mono_now, pause_time);
} else if (ts->state == TS_PAUSE) {
tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64,
mono_now, pause_time);
} else if (ts->state == TS_PAUSE && state != TS_PAUSE) {
pause_time = last_time;
}
tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed);
Expand Down Expand Up @@ -815,8 +796,7 @@ void *timeshift_reader ( void *p )
((cur_speed > 0) && (sm->sm_time <= deliver))))) {

last_time = sm->sm_time;
if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
timeshift_trace_pkt(ts, sm);
timeshift_packet_log("out", ts, sm);
streaming_target_deliver2(ts->output, sm);
sm = NULL;
wait = 0;
Expand Down
49 changes: 11 additions & 38 deletions src/timeshift/timeshift_writer.c
Expand Up @@ -253,15 +253,13 @@ static void _handle_sstart ( timeshift_t *ts, timeshift_file_t *tsf, streaming_m
* *************************************************************************/

static inline ssize_t _process_msg0
( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t **smp )
( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t *sm )
{
ssize_t err;
streaming_message_t *sm = *smp;

if (sm->sm_type == SMT_START) {
err = 0;
_handle_sstart(ts, tsf, sm);
*smp = NULL;
_handle_sstart(ts, tsf, streaming_msg_clone(sm));
} else if (sm->sm_type == SMT_SIGNAL_STATUS)
err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data);
else if (sm->sm_type == SMT_PACKET) {
Expand Down Expand Up @@ -307,7 +305,7 @@ static void _process_msg
/* Terminate */
case SMT_EXIT:
if (run) *run = 0;
goto live;
break;
case SMT_STOP:
if (sm->sm_code != SM_CODE_SOURCE_RECONFIGURED && run)
*run = 0;
Expand All @@ -334,14 +332,19 @@ static void _process_msg
case SMT_PACKET:
pthread_mutex_lock(&ts->state_mutex);
ts->buf_time = sm->sm_time;
if (ts->state == TS_LIVE)
if (ts->state == TS_LIVE) {
streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
if (sm->sm_type == SMT_PACKET)
timeshift_packet_log("liv", ts, sm);
}
if (ts->dobuf) {
if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
if ((err = _process_msg0(ts, tsf, sm)) < 0) {
timeshift_filemgr_close(tsf);
tsf->bad = 1;
ts->full = 1; ///< Stop any more writing
} else {
timeshift_packet_log("sav", ts, sm);
}
tsf->refcount--;
}
Expand All @@ -352,6 +355,7 @@ static void _process_msg

/* Next */
streaming_msg_free(sm);
return;

live:
pthread_mutex_lock(&ts->state_mutex);
Expand Down Expand Up @@ -388,34 +392,3 @@ void *timeshift_writer ( void *aux )
pthread_mutex_unlock(&sq->sq_mutex);
return NULL;
}

/* **************************************************************************
* Utilities
* *************************************************************************/

void timeshift_writer_flush ( timeshift_t *ts )

{
streaming_message_t *sm;
streaming_queue_t *sq = &ts->wr_queue;

pthread_mutex_lock(&sq->sq_mutex);
while ((sm = TAILQ_FIRST(&sq->sq_queue))) {
streaming_queue_remove(sq, sm);
_process_msg(ts, sm, NULL);
}
pthread_mutex_unlock(&sq->sq_mutex);
}

void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst )
{
streaming_message_t *sm, *sm2;
streaming_queue_t *sq = &ts->wr_queue;

pthread_mutex_lock(&sq->sq_mutex);
TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) {
sm2 = streaming_msg_clone(sm);
TAILQ_INSERT_TAIL(dst, sm2, sm_link);
}
pthread_mutex_unlock(&sq->sq_mutex);
}

0 comments on commit 385b395

Please sign in to comment.