Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
timeshift: reduce and improve the logic - move more packet handling l…
…ogic to writer thread
  • Loading branch information
perexg committed Jan 3, 2016
1 parent 4449a20 commit ea752fe
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 253 deletions.
3 changes: 0 additions & 3 deletions src/packet.h
Expand Up @@ -56,9 +56,6 @@ typedef struct th_pkt {
uint8_t pkt_componentindex;
uint8_t pkt_frametype;
uint8_t pkt_field; // Set if packet is only a half frame (a field)
#if ENABLE_TIMESHIFT
uint8_t pkt_delivered;
#endif

uint8_t pkt_channels;
uint8_t pkt_sri;
Expand Down
159 changes: 33 additions & 126 deletions src/timeshift.c
Expand Up @@ -233,7 +233,7 @@ timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm )
}

static void
timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver )
timeshift_packet_flush ( timeshift_t *ts, int64_t time )
{
streaming_message_t *lowest, *sm;
struct streaming_message_queue *sq;
Expand All @@ -251,15 +251,12 @@ timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver )
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
if (deliver)
timeshift_packet_deliver(ts, lowest);
else
streaming_msg_free(lowest);
timeshift_packet_deliver(ts, lowest);
}
}

static void
timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver )
timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
{
streaming_message_t *sm;
int64_t time;
Expand All @@ -272,61 +269,16 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver )
time = ts_rescale(pkt->pkt_pts, 1000000);
if (time > ts->last_time) {
ts->last_time = time;
timeshift_packet_flush(ts, time, deliver);
timeshift_packet_flush(ts, time);
}

sm->sm_time = time;
if (time + MAX_TIME_DELTA < ts->last_time) {
if (deliver)
timeshift_packet_deliver(ts, sm);
timeshift_packet_deliver(ts, sm);
} else {
if (pkt->pkt_componentindex >= ts->backlog_max)
ts->backlog_max = pkt->pkt_componentindex + 1;
TAILQ_INSERT_TAIL(&ts->backlog[pkt->pkt_componentindex], sm, sm_link);
pkt->pkt_delivered = ts->state <= TS_LIVE;
}
}

void
timeshift_packets_clone
( timeshift_t *ts, struct streaming_message_queue *dst, int delivered )
{
streaming_message_t *lowest, *sm, *sm2;
struct streaming_message_queue *sq, *sq2, *backlogs;
th_pkt_t *pkt;
int i;

lock_assert(&ts->state_mutex);

/* init temporary queues and copy the backlog data */
backlogs = alloca(ts->backlog_max * sizeof(*backlogs));
for (i = 0; i < ts->backlog_max; i++) {
sq = &backlogs[i];
sq2 = &ts->backlog[i];
TAILQ_INIT(sq);
TAILQ_FOREACH(sm, sq2, sm_link) {
if (!delivered) {
pkt = sm->sm_data;
if (pkt->pkt_delivered)
continue;
}
sm2 = streaming_msg_clone(sm);
TAILQ_INSERT_TAIL(sq, sm2, sm_link);
}
}
/* push to destination (pts sorted) */
while (1) {
lowest = NULL;
for (i = 0; i < ts->backlog_max; i++) {
sq = &backlogs[i];
sm = TAILQ_FIRST(sq);
if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time))
lowest = sm;
}
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
TAILQ_INSERT_TAIL(dst, lowest, sm_link);
}
}

Expand All @@ -336,105 +288,62 @@ timeshift_packets_clone
static void timeshift_input
( void *opaque, streaming_message_t *sm )
{
int exit = 0, type = sm->sm_type;
int type = sm->sm_type;
timeshift_t *ts = opaque;
th_pkt_t *pkt = sm->sm_data, *pkt2;

pthread_mutex_lock(&ts->state_mutex);

/* Control */
if (type == SMT_SKIP) {
if (ts->state >= TS_LIVE)
timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
streaming_msg_free(sm);
} else if (type == SMT_SPEED) {
if (ts->state >= TS_LIVE)
timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
streaming_msg_free(sm);
}

else {
} else {

/* Start */
if (type == SMT_START && ts->state == TS_INIT)
ts->state = TS_LIVE;
if (type == SMT_START)
timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA);

/* Change PTS/DTS offsets */
if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) {
else if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) {
pkt2 = pkt_copy_shallow(pkt);
pkt_ref_dec(pkt);
sm->sm_data = pkt = pkt2;
pkt->pkt_pts += ts->start_pts;
pkt->pkt_dts += ts->start_pts;
}

/* Pass-thru */
if (ts->state <= TS_LIVE) {
if (type == SMT_START) {
if (ts->smt_start)
streaming_start_unref(ts->smt_start);
ts->smt_start = sm->sm_data;
atomic_add(&ts->smt_start->ss_refcount, 1);
if (ts->packet_mode) {
timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA + 1000, ts->dobuf);
if (ts->last_time)
ts->start_pts = ts->last_time + 1000;
}
}
streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
}

/* Check for exit */
if (type == SMT_EXIT ||
else if (type == SMT_EXIT ||
(type == SMT_STOP && sm->sm_code != SM_CODE_SOURCE_RECONFIGURED))
exit = 1;
ts->exit = 1;

if (type == SMT_MPEGTS)
else if (type == SMT_MPEGTS)
ts->packet_mode = 0;

/* Buffer to disk */
if ((ts->state > TS_LIVE) || (ts->dobuf && (ts->state == TS_LIVE))) {
if (ts->packet_mode) {
sm->sm_time = ts->last_time;
if (type == SMT_PACKET) {
timeshift_packet(ts, pkt, 1);
goto msg_free;
}
} else {
if (ts->ref_time == 0) {
ts->ref_time = getmonoclock();
sm->sm_time = 0;
} else {
sm->sm_time = getmonoclock() - ts->ref_time;
}
/* Send to the writer thread */
if (ts->packet_mode) {
sm->sm_time = ts->last_time;
if (type == SMT_PACKET) {
timeshift_packet(ts, pkt);
goto msg_free;
}
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
} else {
if (type == SMT_PACKET) {
timeshift_packet(ts, pkt, 0);
tvhtrace("timeshift",
"ts %d pkt in - stream %d type %c pts %10"PRId64
" dts %10"PRId64" dur %10d len %6zu",
ts->id,
pkt->pkt_componentindex,
pkt_frametype_to_char(pkt->pkt_frametype),
ts_rescale(pkt->pkt_pts, 1000000),
ts_rescale(pkt->pkt_dts, 1000000),
pkt->pkt_duration,
pktbuf_len(pkt->pkt_payload));
if (ts->ref_time == 0) {
ts->ref_time = getmonoclock();
sm->sm_time = 0;
} else {
sm->sm_time = getmonoclock() - ts->ref_time;
}
msg_free:
streaming_msg_free(sm);
}
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
msg_free:
streaming_msg_free(sm);

/* Exit/Stop */
if (exit) {
if (ts->exit)
timeshift_write_exit(ts->rd_pipe.wr);
ts->state = TS_EXIT;
}
}

pthread_mutex_unlock(&ts->state_mutex);
}

/**
Expand Down Expand Up @@ -474,10 +383,6 @@ timeshift_destroy(streaming_target_t *pad)
/* Flush files */
timeshift_filemgr_flush(ts, NULL);

/* Release SMT_START index */
if (ts->smt_start)
streaming_start_unref(ts->smt_start);

if (ts->path)
free(ts->path);
free(ts);
Expand All @@ -503,14 +408,16 @@ streaming_target_t *timeshift_create
ts->output = out;
ts->path = NULL;
ts->max_time = max_time;
ts->state = TS_INIT;
ts->state = TS_LIVE;
ts->exit = 0;
ts->full = 0;
ts->vididx = -1;
ts->id = timeshift_index;
ts->ondemand = timeshift_conf.ondemand;
ts->dobuf = ts->ondemand ? 0 : 1;
ts->packet_mode= 1;
ts->last_time = 0;
ts->buf_time = 0;
ts->start_pts = 0;
ts->ref_time = 0;
for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++)
Expand Down
12 changes: 3 additions & 9 deletions src/timeshift/private.h
Expand Up @@ -19,7 +19,7 @@
#ifndef __TVH_TIMESHIFT_PRIVATE_H__
#define __TVH_TIMESHIFT_PRIVATE_H__

#define TIMESHIFT_PLAY_BUF 2000000 //< us to buffer in TX
#define TIMESHIFT_PLAY_BUF 500000 //< us to buffer in TX
#define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file
#define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams

Expand Down Expand Up @@ -96,21 +96,20 @@ typedef struct timeshift {
int64_t last_time; ///< Last time in us (PTS conversion)
int64_t start_pts; ///< Start time for packets (PTS)
int64_t ref_time; ///< Start time in us (monoclock)
int64_t buf_time; ///< Last buffered time in us (PTS conversion)
struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
int backlog_max;///< Maximum component index in backlog

enum {
TS_INIT,
TS_EXIT,
TS_LIVE,
TS_PAUSE,
TS_PLAY,
} state; ///< Play state
pthread_mutex_t state_mutex; ///< Protect state changes
uint8_t exit; ///< Exit from the main input thread
uint8_t full; ///< Buffer is full

streaming_start_t *smt_start; ///< Current stream makeup

streaming_queue_t wr_queue; ///< Writer queue
pthread_t wr_thread; ///< Writer thread

Expand All @@ -129,11 +128,6 @@ typedef struct timeshift {
extern uint64_t timeshift_total_size;
extern uint64_t timeshift_total_ram_size;

/*
*
*/
void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst, int delivered );

/*
* Write functions
*/
Expand Down

0 comments on commit ea752fe

Please sign in to comment.