Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
epgdb: deferred write
  • Loading branch information
perexg committed Apr 24, 2015
1 parent d4f1c10 commit 99920b8
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 49 deletions.
86 changes: 58 additions & 28 deletions src/epgdb.c
Expand Up @@ -31,6 +31,7 @@
#include "epggrab.h"

#define EPG_DB_VERSION 2
#define EPG_DB_ALLOC_STEP (1024*1024)

extern epg_object_tree_t epg_brands;
extern epg_object_tree_t epg_seasons;
Expand Down Expand Up @@ -261,7 +262,7 @@ void epg_done ( void )
* Save
* *************************************************************************/

static int _epg_write ( int fd, htsmsg_t *m )
static int _epg_write ( sbuf_t *sb, htsmsg_t *m )
{
int ret = 1;
size_t msglen;
Expand All @@ -270,7 +271,11 @@ static int _epg_write ( int fd, htsmsg_t *m )
int r = htsmsg_binary_serialize(m, &msgdata, &msglen, 0x10000);
htsmsg_destroy(m);
if (!r) {
ret = tvh_write(fd, msgdata, msglen);
ret = 0;
/* allocation helper - we fight with megabytes */
if (sb->sb_size - sb->sb_ptr < 32 * 1024)
sbuf_realloc(sb, (sb->sb_size - (sb->sb_size % EPG_DB_ALLOC_STEP)) + EPG_DB_ALLOC_STEP);
sbuf_append(sb, msgdata, msglen);
free(msgdata);
}
} else {
Expand All @@ -279,11 +284,31 @@ static int _epg_write ( int fd, htsmsg_t *m )
return ret;
}

static int _epg_write_sect ( int fd, const char *sect )
static int _epg_write_sect ( sbuf_t *sb, const char *sect )
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "__section__", sect);
return _epg_write(fd, m);
return _epg_write(sb, m);
}

static void epg_save_tsk_callback ( void *p, int dearmed )
{
sbuf_t *sb = p;
int fd, r;

tvhinfo("epgdb", "save start");
fd = hts_settings_open_file(1, "epgdb.v%d", EPG_DB_VERSION);
if (fd >= 0) {
r = tvh_write(fd, sb->sb_data, sb->sb_ptr);
close(fd);
if (r)
tvherror("epgdb", "write error (size %d)", sb->sb_ptr);
else
tvhinfo("epgdb", "stored (size %d)", sb->sb_ptr);
} else
tvherror("epgdb", "unable to open epgdb file");
sbuf_free(sb);
free(sb);
}

void epg_save_callback ( void *p )
Expand All @@ -293,63 +318,68 @@ void epg_save_callback ( void *p )

void epg_save ( void )
{
int fd;
sbuf_t *sb = malloc(sizeof(*sb));
epg_object_t *eo;
epg_broadcast_t *ebc;
channel_t *ch;
epggrab_stats_t stats;
extern gtimer_t epggrab_save_timer;

if (!sb)
return;

tvhinfo("epgdb", "snapshot start");

sbuf_init_fixed(sb, EPG_DB_ALLOC_STEP);

if (epggrab_epgdb_periodicsave)
gtimer_arm(&epggrab_save_timer, epg_save_callback, NULL, epggrab_epgdb_periodicsave);

fd = hts_settings_open_file(1, "epgdb.v%d", EPG_DB_VERSION);
if (fd < 0)
return;

memset(&stats, 0, sizeof(stats));
if ( _epg_write_sect(fd, "config") ) goto error;
if (_epg_write(fd, epg_config_serialize())) goto error;
if ( _epg_write_sect(fd, "brands") ) goto error;
if ( _epg_write_sect(sb, "config") ) goto error;
if (_epg_write(sb, epg_config_serialize())) goto error;
if ( _epg_write_sect(sb, "brands") ) goto error;
RB_FOREACH(eo, &epg_brands, uri_link) {
if (_epg_write(fd, epg_brand_serialize((epg_brand_t*)eo))) goto error;
if (_epg_write(sb, epg_brand_serialize((epg_brand_t*)eo))) goto error;
stats.brands.total++;
}
if ( _epg_write_sect(fd, "seasons") ) goto error;
if ( _epg_write_sect(sb, "seasons") ) goto error;
RB_FOREACH(eo, &epg_seasons, uri_link) {
if (_epg_write(fd, epg_season_serialize((epg_season_t*)eo))) goto error;
if (_epg_write(sb, epg_season_serialize((epg_season_t*)eo))) goto error;
stats.seasons.total++;
}
if ( _epg_write_sect(fd, "episodes") ) goto error;
if ( _epg_write_sect(sb, "episodes") ) goto error;
RB_FOREACH(eo, &epg_episodes, uri_link) {
if (_epg_write(fd, epg_episode_serialize((epg_episode_t*)eo))) goto error;
if (_epg_write(sb, epg_episode_serialize((epg_episode_t*)eo))) goto error;
stats.episodes.total++;
}
if ( _epg_write_sect(fd, "serieslinks") ) goto error;
if ( _epg_write_sect(sb, "serieslinks") ) goto error;
RB_FOREACH(eo, &epg_serieslinks, uri_link) {
if (_epg_write(fd, epg_serieslink_serialize((epg_serieslink_t*)eo))) goto error;
if (_epg_write(sb, epg_serieslink_serialize((epg_serieslink_t*)eo))) goto error;
stats.seasons.total++;
}
if ( _epg_write_sect(fd, "broadcasts") ) goto error;
if ( _epg_write_sect(sb, "broadcasts") ) goto error;
CHANNEL_FOREACH(ch) {
RB_FOREACH(ebc, &ch->ch_epg_schedule, sched_link) {
if (_epg_write(fd, epg_broadcast_serialize(ebc))) goto error;
if (_epg_write(sb, epg_broadcast_serialize(ebc))) goto error;
stats.broadcasts.total++;
}
}
close(fd);

tasklet_arm_alloc(epg_save_tsk_callback, sb);

/* Stats */
tvhlog(LOG_INFO, "epgdb", "saved");
tvhlog(LOG_INFO, "epgdb", " brands %d", stats.brands.total);
tvhlog(LOG_INFO, "epgdb", " seasons %d", stats.seasons.total);
tvhlog(LOG_INFO, "epgdb", " episodes %d", stats.episodes.total);
tvhlog(LOG_INFO, "epgdb", " broadcasts %d", stats.broadcasts.total);
tvhinfo("epgdb", "queued to save (size %d)", sb->sb_ptr);
tvhinfo("epgdb", " brands %d", stats.brands.total);
tvhinfo("epgdb", " seasons %d", stats.seasons.total);
tvhinfo("epgdb", " episodes %d", stats.episodes.total);
tvhinfo("epgdb", " broadcasts %d", stats.broadcasts.total);

return;

error:
tvhlog(LOG_ERR, "epgdb", "failed to store epg to disk");
hts_settings_remove("epgdb.v%d", EPG_DB_VERSION);
close(fd);
sbuf_free(sb);
free(sb);
}
15 changes: 8 additions & 7 deletions src/main.c
Expand Up @@ -1066,6 +1066,14 @@ main(int argc, char **argv)
tvhftrace("main", imagecache_done);
tvhftrace("main", lang_code_done);
tvhftrace("main", api_done);

tvhtrace("main", "tasklet enter");
pthread_cond_signal(&tasklet_cond);
pthread_join(tasklet_tid, NULL);
tvhtrace("main", "tasklet thread end");
tasklet_flush();
tvhtrace("main", "tasklet leave");

tvhftrace("main", hts_settings_done);
tvhftrace("main", dvb_done);
tvhftrace("main", lang_str_done);
Expand All @@ -1076,13 +1084,6 @@ main(int argc, char **argv)
tvhftrace("main", idnode_done);
tvhftrace("main", spawn_done);

tvhtrace("main", "tasklet enter");
pthread_cond_signal(&tasklet_cond);
pthread_join(tasklet_tid, NULL);
tvhtrace("main", "tasklet thread end");
tasklet_flush();
tvhtrace("main", "tasklet leave");

tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend");
tvhlog_end();

Expand Down
2 changes: 2 additions & 0 deletions src/tvheadend.h
Expand Up @@ -696,6 +696,8 @@ static inline void sbuf_alloc(sbuf_t *sb, int len)
sbuf_alloc_(sb, len);
}

void sbuf_realloc(sbuf_t *sb, int len);

void sbuf_append(sbuf_t *sb, const void *data, int len);

void sbuf_cut(sbuf_t *sb, int off);
Expand Down
34 changes: 20 additions & 14 deletions src/utils.c
Expand Up @@ -333,20 +333,7 @@ sbuf_reset(sbuf_t *sb, int max_len)
void
sbuf_reset_and_alloc(sbuf_t *sb, int len)
{
if (sb->sb_data) {
if (len != sb->sb_size) {
void *n = realloc(sb->sb_data, len);
if (n) {
sb->sb_data = n;
sb->sb_size = len;
}
}
} else {
sb->sb_data = malloc(len);
sb->sb_size = len;
}
if (sb->sb_data == NULL)
sbuf_alloc_fail(len);
sbuf_realloc(sb, len);
sb->sb_ptr = sb->sb_err = 0;
}

Expand All @@ -366,6 +353,25 @@ sbuf_alloc_(sbuf_t *sb, int len)
sbuf_alloc_fail(sb->sb_size);
}

void
sbuf_realloc(sbuf_t *sb, int len)
{
if (sb->sb_data) {
if (len != sb->sb_size) {
void *n = realloc(sb->sb_data, len);
if (n) {
sb->sb_data = n;
sb->sb_size = len;
}
}
} else {
sb->sb_data = malloc(len);
sb->sb_size = len;
}
if (sb->sb_data == NULL)
sbuf_alloc_fail(len);
}

void
sbuf_append(sbuf_t *sb, const void *data, int len)
{
Expand Down

0 comments on commit 99920b8

Please sign in to comment.