Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
[pvr.tvh] fill the VFS buffer on a separate thread. This means we can
Browse files Browse the repository at this point in the history
gradually increase the read chunk size (allowing for more jitter in the
latency between client and server) gradually without making the reader
wait for the full chunk to be received.

The read length is reset during seeking and if the buffer happens to
run dry (ffmpeg does around 10 short seeks for each seek request so it
helps to keep the read size small).
  • Loading branch information
Jalle19 committed Jan 21, 2015
1 parent eeb7352 commit 7cc5109
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 46 deletions.
162 changes: 119 additions & 43 deletions addons/pvr.hts/src/HTSPVFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,48 @@ using namespace std;
using namespace ADDON;
using namespace PLATFORM;

/*
* The buffer thread
*/
void *CHTSPVFS::Process(void)
{
while (!IsStopped())
{
while (m_fileId && m_buffer.free() > 0)
{
if (!SendFileRead())
continue;

CLockObject lock(m_mutex);
m_bHasData = true;
m_condition.Broadcast();
}

// Take a break, we're either stopped or full
CLockObject lock(m_mutex);
m_condition.Wait(m_mutex, 1000);
}

return NULL;
}

/*
* VFS handler
*/
CHTSPVFS::CHTSPVFS ( CHTSPConnection &conn )
: m_conn(conn), m_path(""), m_fileId(0), m_offset(0)
: m_conn(conn), m_path(""), m_fileId(0), m_offset(0),
m_currentReadLength(INITAL_READ_LENGTH)
{
m_buffer.alloc(MAX_BUFFER_SIZE);

// Start the buffer thread
CreateThread();
}

CHTSPVFS::~CHTSPVFS ( void )
{
// Stop the buffer thread
StopThread();
}

void CHTSPVFS::Connected ( void )
Expand Down Expand Up @@ -90,10 +124,19 @@ void CHTSPVFS::Close ( void )
if (m_fileId != 0)
SendFileClose();

m_buffer.reset();
m_offset = 0;
m_fileId = 0;
m_path = "";

Reset();
}

void CHTSPVFS::Reset()
{
CLockObject lock(m_mutex);
m_buffer.reset();
m_bHasData = false;
m_currentReadLength = INITAL_READ_LENGTH;
}

int CHTSPVFS::Read ( unsigned char *buf, unsigned int len )
Expand All @@ -104,50 +147,22 @@ int CHTSPVFS::Read ( unsigned char *buf, unsigned int len )
if (!m_fileId)
return -1;

/* Fetch data */
if (m_buffer.avail() <= len)
/* Signal that we need more data in the buffer. Reset the read length to the
requested length so we don't wait unnecessarily long */
if (m_buffer.avail() < len)
{
htsmsg_t *m;
const void *buf;
size_t len;

/* Build */
m = htsmsg_create_map();
htsmsg_add_u32(m, "id", m_fileId);
htsmsg_add_s64(m, "size", m_buffer.free());

tvhtrace("vfs read id=%d size=%lld",
m_fileId, (long long)m_buffer.free());

/* Send */
{
CLockObject lock(m_conn.Mutex());
m = m_conn.SendAndWait("fileRead", m);
}

if (m == NULL)
return -1;

/* Process */
if (htsmsg_get_bin(m, "data", &buf, &len))
{
htsmsg_destroy(m);
tvherror("vfs fileRead malformed response");
return -1;
}

/* Store */
if (m_buffer.write((unsigned char*)buf, len) != (ssize_t)len)
{
htsmsg_destroy(m);
tvherror("vfs partial buffer write");
return -1;
}
htsmsg_destroy(m);
CLockObject lock(m_mutex);
m_bHasData = false;
m_currentReadLength = len;
m_condition.Broadcast();
}

/* Wait for data */
CLockObject lock(m_mutex);
m_condition.Wait(m_mutex, m_bHasData, 5000);

/* Read */
ret = m_buffer.read(buf, len);
ret = m_buffer.read(buf, len);
m_offset += ret;
return (int)ret;
}
Expand Down Expand Up @@ -277,10 +292,71 @@ long long CHTSPVFS::SendFileSeek ( int64_t pos, int whence, bool force )
{
tvhtrace("vfs seek offset=%lld", (long long)ret);
m_offset = ret;
m_buffer.reset();

Reset();
}
else
tvherror("vfs fileSeek failed");

return ret;
}

bool CHTSPVFS::SendFileRead()
{
htsmsg_t *m;
const void *buf;
size_t len;
size_t readLength;

{
CLockObject lock(m_mutex);

/* Determine read length */
if (m_currentReadLength > m_buffer.free())
readLength = m_buffer.free();
else
readLength = m_currentReadLength;
}

/* Build */
m = htsmsg_create_map();
htsmsg_add_u32(m, "id", m_fileId);
htsmsg_add_s64(m, "size", readLength);

tvhtrace("vfs read id=%d size=%d",
m_fileId, readLength);

/* Send */
{
CLockObject lock(m_conn.Mutex());
m = m_conn.SendAndWait("fileRead", m);
}

if (m == NULL)
return false;

/* Process */
if (htsmsg_get_bin(m, "data", &buf, &len))
{
htsmsg_destroy(m);
tvherror("vfs fileRead malformed response");
return false;
}

/* Store */
if (m_buffer.write((unsigned char*)buf, len) != (ssize_t)len)
{
htsmsg_destroy(m);
tvherror("vfs partial buffer write");
return false;
}

/* Gradually increase read length */
CLockObject lock(m_mutex);

if (m_currentReadLength * 2 < MAX_READ_LENGTH)
m_currentReadLength *= 2;

htsmsg_destroy(m);
return true;
}
19 changes: 16 additions & 3 deletions addons/pvr.hts/src/Tvheadend.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ class CHTSPDemuxer
/*
* HTSP VFS - recordings
*/
class CHTSPVFS
class CHTSPVFS
: public PLATFORM::CThread
{
friend class CTvheadend;

Expand All @@ -296,21 +297,33 @@ class CHTSPVFS
CHTSPConnection &m_conn;
CStdString m_path;
uint32_t m_fileId;
CCircBuffer m_buffer;
int64_t m_offset;

CCircBuffer m_buffer;
PLATFORM::CMutex m_mutex;
bool m_bHasData;
PLATFORM::CCondition<bool> m_condition;
size_t m_currentReadLength;

bool Open ( const PVR_RECORDING &rec );
void Close ( void );
int Read ( unsigned char *buf, unsigned int len );
long long Seek ( long long pos, int whence );
long long Tell ( void );
long long Size ( void );
void Reset ( void );

void *Process();

bool SendFileOpen ( bool force = false );
void SendFileClose ( void );
bool SendFileRead ( void );
long long SendFileSeek ( int64_t pos, int whence, bool force = false );

static const int MAX_BUFFER_SIZE = 1000000;
static const int MAX_BUFFER_SIZE = 5242880; // 5 MB
static const int INITAL_READ_LENGTH = 131072; // 128 KB
static const int MAX_READ_LENGTH = 1048576; // 1 MB

};

/*
Expand Down

0 comments on commit 7cc5109

Please sign in to comment.