Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
timeshift: change time source - use PTS as the synchronization source
  • Loading branch information
perexg committed Dec 30, 2015
1 parent ffcdfe6 commit 582562f
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 178 deletions.
21 changes: 21 additions & 0 deletions src/atomic.h
Expand Up @@ -48,6 +48,21 @@ atomic_add_u64(volatile uint64_t *ptr, uint64_t incr)
#endif
}

static inline int64_t
atomic_add_s64(volatile int64_t *ptr, int64_t incr)
{
#if ENABLE_ATOMIC64
return __sync_fetch_and_add(ptr, incr);
#else
uint64_t ret;
pthread_mutex_lock(&atomic_lock);
ret = *ptr;
*ptr += incr;
pthread_mutex_unlock(&atomic_lock);
return ret;
#endif
}

static inline time_t
atomic_add_time_t(volatile time_t *ptr, time_t incr)
{
Expand Down Expand Up @@ -123,6 +138,12 @@ atomic_exchange_u64(volatile uint64_t *ptr, uint64_t new)
return __sync_lock_test_and_set(ptr, new);
}

static inline int
atomic_exchange_s64(volatile int64_t *ptr, int64_t new)
{
return __sync_lock_test_and_set(ptr, new);
}

static inline int
atomic_exchange_time_t(volatile time_t *ptr, int new)
{
Expand Down
46 changes: 24 additions & 22 deletions src/htsp_server.c
Expand Up @@ -2459,7 +2459,7 @@ htsp_method_skip(htsp_connection_t *htsp, htsmsg_t *in)
memset(&skip, 0, sizeof(skip));
if(!htsmsg_get_s64(in, "time", &s64)) {
skip.type = abs ? SMT_SKIP_ABS_TIME : SMT_SKIP_REL_TIME;
skip.time = hs->hs_90khz ? s64 : ts_rescale_i(s64, 1000000);
skip.time = hs->hs_90khz ? s64 : ts_rescale_inv(s64, 1000000);
tvhtrace("htsp-sub", "skip: %s %"PRId64" (%s)", abs ? "abs" : "rel",
skip.time, hs->hs_90khz ? "90kHz" : "1MHz");
} else if (!htsmsg_get_s64(in, "size", &s64)) {
Expand Down Expand Up @@ -4005,6 +4005,26 @@ htsp_subscription_speed(htsp_subscription_t *hs, int speed)
htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
}

/**
*
*/
#if ENABLE_TIMESHIFT
static void
htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status)
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "method", "timeshiftStatus");
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_u32(m, "full", status->full);
htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000));
if (status->pts_start != PTS_UNSET)
htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ;
if (status->pts_end != PTS_UNSET)
htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ;
htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
}
#endif

/**
*
*/
Expand All @@ -4017,8 +4037,10 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip)
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);

/* Flush pkt buffers */
if (skip->type != SMT_SKIP_ERROR)
if (skip->type != SMT_SKIP_ERROR) {
htsp_flush_queue(hs->hs_htsp, &hs->hs_q, 0);
htsp_subscription_timeshift_status(hs, &skip->timeshift);
}

if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_ABS_SIZE)
htsmsg_add_u32(m, "absolute", 1);
Expand All @@ -4031,26 +4053,6 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip)
htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
}

/**
*
*/
#if ENABLE_TIMESHIFT
static void
htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status)
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "method", "timeshiftStatus");
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_u32(m, "full", status->full);
htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000));
if (status->pts_start != PTS_UNSET)
htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ;
if (status->pts_end != PTS_UNSET)
htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ;
htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
}
#endif

/**
*
*/
Expand Down
130 changes: 92 additions & 38 deletions src/timeshift.c
Expand Up @@ -24,6 +24,7 @@
#include "settings.h"
#include "atomic.h"
#include "access.h"
#include "atomic.h"

#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -205,34 +206,80 @@ const idclass_t timeshift_conf_class = {
};

/*
* Decode initial time diff
*
* Gather some packets and select the lowest pts to identify
* the correct start. Note that for timeshift, the tsfix
* stream plugin is applied, so the starting pts should be
* near zero. If not - it's a bug.
* Process a packet with time sorting
*/

#define MAX_TIME_DELTA (2*1000000) /* 2 seconds */
#define BACKLOG_COUNT ARRAY_SIZE(timeshift_t->backlog)

static void
timeshift_set_pts_delta ( timeshift_t *ts, int64_t pts )
timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm )
{
th_pkt_t *pkt = sm->sm_data;
tvhtrace("timeshift",
"ts %d pkt buf - stream %d type %c pts %10"PRId64
" dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
ts->id,
pkt->pkt_componentindex,
pkt_frametype_to_char(pkt->pkt_frametype),
ts_rescale(pkt->pkt_pts, 1000000),
ts_rescale(pkt->pkt_dts, 1000000),
pkt->pkt_duration,
pktbuf_len(pkt->pkt_payload),
sm->sm_time);
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
}

static void
timeshift_packet_flush ( timeshift_t *ts, int64_t time )
{
streaming_message_t *lowest, *sm;
struct streaming_message_queue *sq;
int i;
int64_t smallest = INT64_MAX;

if (pts == PTS_UNSET)
while (1) {
lowest = NULL;
for (i = 0; i < ts->backlog_max; i++) {
sq = &ts->backlog[i];
sm = TAILQ_FIRST(sq);
if (sm && sm->sm_time + MAX_TIME_DELTA < time)
if (lowest == NULL || lowest->sm_time > sm->sm_time)
lowest = sm;
}
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
timeshift_packet_deliver(ts, lowest);
}
}

static void
timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
{
streaming_message_t *sm;
int64_t time;

if (pkt->pkt_componentindex >= TIMESHIFT_BACKLOG_MAX) {
pkt_ref_dec(pkt);
return;
}

for (i = 0; i < ARRAY_SIZE(ts->pts_val); i++) {
int64_t i64 = ts->pts_val[i];
if (i64 == PTS_UNSET) {
ts->pts_val[i] = pts;
break;
}
if (i64 < smallest)
smallest = i64;
sm = streaming_msg_create_pkt(pkt);

time = ts_rescale(pkt->pkt_pts, 1000000);
if (time > ts->last_time) {
atomic_exchange_s64(&ts->last_time, time);
timeshift_packet_flush(ts, time);
}

if (i >= ARRAY_SIZE(ts->pts_val))
ts->pts_delta = getmonoclock() - ts_rescale(smallest, 1000000);
sm->sm_time = time;
if (time + MAX_TIME_DELTA < ts->last_time) {
timeshift_packet_deliver(ts, sm);
} else {
if (pkt->pkt_componentindex >= ts->backlog_max)
ts->backlog_max = pkt->pkt_componentindex + 1;
TAILQ_INSERT_TAIL(&ts->backlog[pkt->pkt_componentindex], sm, sm_link);
}
}

/*
Expand Down Expand Up @@ -281,25 +328,26 @@ static void timeshift_input
(sm->sm_type == SMT_STOP && sm->sm_code == 0))
exit = 1;

/* Record (one-off) PTS delta */
if (sm->sm_type == SMT_PACKET && ts->pts_delta == 0)
timeshift_set_pts_delta(ts, pkt->pkt_pts);
if (sm->sm_type == SMT_MPEGTS)
ts->packet_mode = 0;

/* Buffer to disk */
if ((ts->state > TS_LIVE) || (!ts->ondemand && (ts->state == TS_LIVE))) {
sm->sm_time = getmonoclock();
if (sm->sm_type == SMT_PACKET) {
tvhtrace("timeshift",
"ts %d pkt buf - stream %d type %c pts %10"PRId64
" dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
ts->id,
pkt->pkt_componentindex,
pkt_frametype_to_char(pkt->pkt_frametype),
ts_rescale(pkt->pkt_pts, 1000000),
ts_rescale(pkt->pkt_dts, 1000000),
pkt->pkt_duration,
pktbuf_len(pkt->pkt_payload),
sm->sm_time - ts->pts_delta);
if (ts->packet_mode) {
sm->sm_time = ts->last_time;
if (sm->sm_type == SMT_PACKET) {
timeshift_packet(ts, pkt);
sm->sm_data = NULL;
streaming_msg_free(sm);
goto pktcont;
}
} else {
if (ts->ref_time == 0) {
ts->ref_time = getmonoclock();
sm->sm_time = 0;
} else {
sm->sm_time = getmonoclock() - ts->ref_time;
}
}
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
} else {
Expand All @@ -317,6 +365,7 @@ static void timeshift_input
}
streaming_msg_free(sm);
}
pktcont:

/* Exit/Stop */
if (exit) {
Expand All @@ -336,6 +385,7 @@ timeshift_destroy(streaming_target_t *pad)
{
timeshift_t *ts = (timeshift_t*)pad;
streaming_message_t *sm;
int i;

/* Must hold global lock */
lock_assert(&global_lock);
Expand All @@ -355,6 +405,8 @@ timeshift_destroy(streaming_target_t *pad)

/* Shut stuff down */
streaming_queue_deinit(&ts->wr_queue);
for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++)
streaming_queue_clear(&ts->backlog[i]);

close(ts->rd_pipe.rd);
close(ts->rd_pipe.wr);
Expand Down Expand Up @@ -396,9 +448,11 @@ streaming_target_t *timeshift_create
ts->vididx = -1;
ts->id = timeshift_index;
ts->ondemand = timeshift_conf.ondemand;
ts->pts_delta = 0;
for (i = 0; i < ARRAY_SIZE(ts->pts_val); i++)
ts->pts_val[i] = PTS_UNSET;
ts->packet_mode= 1;
ts->last_time = 0;
ts->ref_time = 0;
for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++)
TAILQ_INIT(&ts->backlog[i]);
pthread_mutex_init(&ts->rdwr_mutex, NULL);
pthread_mutex_init(&ts->state_mutex, NULL);

Expand Down
8 changes: 0 additions & 8 deletions src/timeshift.h
Expand Up @@ -40,14 +40,6 @@ typedef struct timeshift_conf {
extern struct timeshift_conf timeshift_conf;
extern const idclass_t timeshift_conf_class;

typedef struct timeshift_status
{
int full;
int64_t shift;
int64_t pts_start;
int64_t pts_end;
} timeshift_status_t;

void timeshift_init ( void );
void timeshift_term ( void );

Expand Down
16 changes: 10 additions & 6 deletions src/timeshift/private.h
Expand Up @@ -19,8 +19,9 @@
#ifndef __TVH_TIMESHIFT_PRIVATE_H__
#define __TVH_TIMESHIFT_PRIVATE_H__

#define TIMESHIFT_PLAY_BUF 200000 // us to buffer in TX
#define TIMESHIFT_FILE_PERIOD 60 // number of secs in each buffer file
#define TIMESHIFT_PLAY_BUF 2000000 //< us to buffer in TX
#define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file
#define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams

/**
* Indexes of import data in the stream
Expand Down Expand Up @@ -55,7 +56,7 @@ typedef struct timeshift_file
int rfd; ///< Read descriptor
char *path; ///< Full path to file

time_t time; ///< Files coarse timestamp
int64_t time; ///< Files coarse timestamp
size_t size; ///< Current file size;
int64_t last; ///< Latest timestamp
off_t woff; ///< Write offset
Expand Down Expand Up @@ -90,8 +91,11 @@ typedef struct timeshift {
char *path; ///< Directory containing buffer
time_t max_time; ///< Maximum period to shift
int ondemand; ///< Whether this is an on-demand timeshift
int64_t pts_delta; ///< Delta between system clock and PTS
int64_t pts_val[6]; ///< Decision PTS values for multiple packets
int packet_mode;///< Packet mode (otherwise MPEG-TS data mode)
int64_t last_time; ///< Last time in us (PTS conversion)
int64_t ref_time; ///< Start time in us (monoclock)
struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
int backlog_max;///< Maximum component index in backlog

enum {
TS_INIT,
Expand Down Expand Up @@ -153,7 +157,7 @@ void timeshift_filemgr_term ( void );
int timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len );

timeshift_file_t *timeshift_filemgr_get
( timeshift_t *ts, int create );
( timeshift_t *ts, int64_t start_time );
timeshift_file_t *timeshift_filemgr_oldest
( timeshift_t *ts );
timeshift_file_t *timeshift_filemgr_newest
Expand Down

0 comments on commit 582562f

Please sign in to comment.