Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
timeshift: write all 'temporary' packets (outside storage) when going…
… to live
  • Loading branch information
perexg committed Dec 30, 2015
1 parent 582562f commit 83ce30a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 2 deletions.
34 changes: 34 additions & 0 deletions src/timeshift.c
Expand Up @@ -282,6 +282,35 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
}
}

void
timeshift_packets_clone( timeshift_t *ts, struct streaming_message_queue *dst )
{
streaming_message_t *lowest, *sm;
struct streaming_message_queue *sq, *backlogs;
int i;

lock_assert(&ts->buffering_mutex);

/* init temporary queues */
backlogs = alloca(ts->backlog_max * sizeof(*backlogs));
for (i = 0; i < ts->backlog_max; i++)
TAILQ_INIT(&backlogs[i]);
/* push to destination (pts sorted) */
while (1) {
lowest = NULL;
for (i = 0; i < ts->backlog_max; i++) {
sq = &backlogs[i];
sm = TAILQ_FIRST(sq);
if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time))
lowest = sm;
}
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
TAILQ_INSERT_TAIL(dst, lowest, sm_link);
}
}

/*
* Receive data
*/
Expand Down Expand Up @@ -312,6 +341,8 @@ static void timeshift_input
ts->state = TS_LIVE;
}

pthread_mutex_lock(&ts->buffering_mutex);

/* Pass-thru */
if (ts->state <= TS_LIVE) {
if (sm->sm_type == SMT_START) {
Expand Down Expand Up @@ -367,6 +398,8 @@ static void timeshift_input
}
pktcont:

pthread_mutex_unlock(&ts->buffering_mutex);

/* Exit/Stop */
if (exit) {
timeshift_write_exit(ts->rd_pipe.wr);
Expand Down Expand Up @@ -455,6 +488,7 @@ streaming_target_t *timeshift_create
TAILQ_INIT(&ts->backlog[i]);
pthread_mutex_init(&ts->rdwr_mutex, NULL);
pthread_mutex_init(&ts->state_mutex, NULL);
pthread_mutex_init(&ts->buffering_mutex, NULL);

/* Initialise output */
tvh_pipe(O_NONBLOCK, &ts->rd_pipe);
Expand Down
7 changes: 7 additions & 0 deletions src/timeshift/private.h
Expand Up @@ -96,6 +96,7 @@ typedef struct timeshift {
int64_t ref_time; ///< Start time in us (monoclock)
struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
int backlog_max;///< Maximum component index in backlog
pthread_mutex_t buffering_mutex;///< Protect backlog / write queues

enum {
TS_INIT,
Expand Down Expand Up @@ -128,6 +129,11 @@ typedef struct timeshift {
extern uint64_t timeshift_total_size;
extern uint64_t timeshift_total_ram_size;

/*
*
*/
void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst );

/*
* Write functions
*/
Expand All @@ -142,6 +148,7 @@ ssize_t timeshift_write_exit ( int fd );
ssize_t timeshift_write_eof ( timeshift_file_t *tsf );

void timeshift_writer_flush ( timeshift_t *ts );
void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst );

/*
* Threads
Expand Down
32 changes: 30 additions & 2 deletions src/timeshift/timeshift_reader.c
Expand Up @@ -426,6 +426,24 @@ static int _timeshift_flush_to_live
return 0;
}

/*
* Write packets from temporary queues
*/
static void _timeshift_write_queues
( timeshift_t *ts )
{
struct streaming_message_queue sq;
streaming_message_t *sm;

TAILQ_INIT(&sq);
timeshift_writer_clone(ts, &sq);
timeshift_packets_clone(ts, &sq);
while ((sm = TAILQ_FIRST(&sq)) != NULL) {
TAILQ_REMOVE(&sq, sm, sm_link);
streaming_target_deliver2(ts->output, sm);
}
}

/*
* Send the status message
*/
Expand Down Expand Up @@ -850,11 +868,21 @@ void *timeshift_reader ( void *p )
streaming_target_deliver2(ts->output, ctrl);
ctrl = NULL;
tvhtrace("timeshift", "reader - set TS_LIVE");
ts->state = TS_LIVE;

/* Critical section - protect write / backlog queues */
pthread_mutex_lock(&ts->buffering_mutex);

/* Flush timeshift buffer to live */
if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1)
if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1) {
pthread_mutex_unlock(&ts->buffering_mutex);
break;
}

/* Flush write / backlog queues */
_timeshift_write_queues(ts);

ts->state = TS_LIVE;
pthread_mutex_unlock(&ts->buffering_mutex);

/* Close file (if open) */
if (cur_file && cur_file->rfd >= 0) {
Expand Down
12 changes: 12 additions & 0 deletions src/timeshift/timeshift_writer.c
Expand Up @@ -384,3 +384,15 @@ void timeshift_writer_flush ( timeshift_t *ts )
pthread_mutex_unlock(&sq->sq_mutex);
}

void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst )
{
streaming_message_t *sm, *sm2;
streaming_queue_t *sq = &ts->wr_queue;

pthread_mutex_lock(&sq->sq_mutex);
TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) {
sm2 = streaming_msg_clone(sm);
TAILQ_INSERT_TAIL(dst, sm2, sm_link);
}
pthread_mutex_unlock(&sq->sq_mutex);
}

0 comments on commit 83ce30a

Please sign in to comment.