Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cachingreader fix2 "dedicated thread for CachingReaderWorker" #30

Merged
merged 13 commits into from
Jul 3, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/depends.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ def sources(self, build):
"engine/clockcontrol.cpp",
"engine/readaheadmanager.cpp",
"cachingreader.cpp",
"cachingreaderworker.cpp",

"analyserrg.cpp",
"analyserqueue.cpp",
Expand Down
271 changes: 63 additions & 208 deletions src/cachingreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,72 +11,30 @@
#include "trackinfoobject.h"
#include "soundsourceproxy.h"
#include "sampleutil.h"


// There's a little math to this, but not much: 48khz stereo audio is 384kb/sec
// if using float samples. We want the chunk size to be a power of 2 so it's
// easier to memory align, and roughly 1/2 - 1/4th of a second of audio. 2**17
// and 2**16 are nice candidates. 2**16 is 170ms of audio, which is well above
// (hopefully) the latencies people are seeing. at 10ms latency, one chunk is
// enough for 17 callbacks. We may need to tweak this later.

// Must be divisible by 8, 4, and 2. Just pick a power of 2.
#define CHUNK_LENGTH 65536
//#define CHUNK_LENGTH 524288

const int CachingReader::kChunkLength = CHUNK_LENGTH;
const int CachingReader::kSamplesPerChunk = CHUNK_LENGTH / sizeof(CSAMPLE);

CachingReader::CachingReader(const char* _group,
ConfigObject<ConfigValue>* _config) :
m_pGroup(_group),
m_pConfig(_config),
m_chunkReadRequestFIFO(1024),
m_readerStatusFIFO(1024),
m_readerStatus(INVALID),
m_mruChunk(NULL),
m_lruChunk(NULL),
m_pRawMemoryBuffer(NULL),
m_pCurrentSoundSource(NULL),
m_iTrackSampleRate(0),
m_iTrackNumSamples(0),
m_iTrackNumSamplesCallbackSafe(0),
m_pSample(NULL) {
initialize();
}

CachingReader::~CachingReader() {
m_freeChunks.clear();
m_allocatedChunks.clear();
m_lruChunk = m_mruChunk = NULL;

for (int i=0; i < m_chunks.size(); i++) {
Chunk* c = m_chunks[i];
delete c;
}

delete [] m_pSample;

delete [] m_pRawMemoryBuffer;
m_pRawMemoryBuffer = NULL;

delete m_pCurrentSoundSource;
}

void CachingReader::initialize() {
#include "util/counter.h"


CachingReader::CachingReader(const char* group,
ConfigObject<ConfigValue>* config)
: m_pConfig(config),
m_chunkReadRequestFIFO(1024),
m_readerStatusFIFO(1024),
m_readerStatus(INVALID),
m_mruChunk(NULL),
m_lruChunk(NULL),
m_pRawMemoryBuffer(NULL),
m_iTrackNumSamplesCallbackSafe(0) {
int memory_to_use = 5000000; // 5mb, TODO
Q_ASSERT(memory_to_use >= kChunkLength);
Q_ASSERT(memory_to_use >= CachingReaderWorker::kChunkLength);

// Only allocate as many bytes as we will actually use.
memory_to_use -= (memory_to_use % kChunkLength);
memory_to_use -= (memory_to_use % CachingReaderWorker::kChunkLength);

m_pSample = new SAMPLE[kSamplesPerChunk];

int total_chunks = memory_to_use / kChunkLength;
int total_chunks = memory_to_use / CachingReaderWorker::kChunkLength;

//qDebug() << "CachingReader using" << memory_to_use << "bytes.";

int rawMemoryBufferLength = kSamplesPerChunk * total_chunks;
int rawMemoryBufferLength = CachingReaderWorker::kSamplesPerChunk * total_chunks;
m_pRawMemoryBuffer = new CSAMPLE[rawMemoryBufferLength];

m_allocatedChunks.reserve(total_chunks);
Expand All @@ -97,8 +55,43 @@ void CachingReader::initialize() {
m_chunks.push_back(c);
m_freeChunks.push_back(c);

bufferStart += kSamplesPerChunk;
bufferStart += CachingReaderWorker::kSamplesPerChunk;
}

m_pWorker = new CachingReaderWorker(group,
&m_chunkReadRequestFIFO,
&m_readerStatusFIFO);

// Forward signals from worker
connect(m_pWorker, SIGNAL(trackLoading()),
this, SIGNAL(trackLoading()),
Qt::DirectConnection);
connect(m_pWorker, SIGNAL(trackLoaded(TrackPointer, int, int)),
this, SIGNAL(trackLoaded(TrackPointer, int, int)),
Qt::DirectConnection);
connect(m_pWorker, SIGNAL(trackLoadFailed(TrackPointer, QString)),
this, SIGNAL(trackLoadFailed(TrackPointer, QString)),
Qt::DirectConnection);

m_pWorker->start();
}


CachingReader::~CachingReader() {

m_pWorker->quitWait();
delete m_pWorker;
m_freeChunks.clear();
m_allocatedChunks.clear();
m_lruChunk = m_mruChunk = NULL;

for (int i=0; i < m_chunks.size(); i++) {
Chunk* c = m_chunks[i];
delete c;
}

delete [] m_pRawMemoryBuffer;
m_pRawMemoryBuffer = NULL;
}

// static
Expand Down Expand Up @@ -239,54 +232,9 @@ Chunk* CachingReader::lookupChunk(int chunk_number) {
return chunk;
}

void CachingReader::processChunkReadRequest(ChunkReadRequest* request,
ReaderStatusUpdate* update) {
int chunk_number = request->chunk->chunk_number;
//qDebug() << "Processing ChunkReadRequest for" << chunk_number;
update->chunk = request->chunk;
update->chunk->length = 0;

if (m_pCurrentSoundSource == NULL || chunk_number < 0) {
update->status = CHUNK_READ_INVALID;
return;
}

// Stereo samples
int sample_position = sampleForChunk(chunk_number);
int samples_remaining = m_iTrackNumSamples - sample_position;
int samples_to_read = math_min(kSamplesPerChunk, samples_remaining);

// Bogus chunk number
if (samples_to_read <= 0) {
update->status = CHUNK_READ_EOF;
return;
}

m_pCurrentSoundSource->seek(sample_position);
int samples_read = m_pCurrentSoundSource->read(samples_to_read,
m_pSample);

// If we've run out of music, the SoundSource can return 0 samples.
// Remember that SoundSourc->getLength() (which is m_iTrackNumSamples) can
// lie to us about the length of the song!
if (samples_read <= 0) {
update->status = CHUNK_READ_EOF;
return;
}

// TODO(XXX) This loop can't be done with a memcpy, but could be done with
// SSE.
CSAMPLE* buffer = request->chunk->data;
//qDebug() << "Reading into " << buffer;
SampleUtil::convert(buffer, m_pSample, samples_read);
update->status = CHUNK_READ_SUCCESS;
update->chunk->length = samples_read;
}

void CachingReader::newTrack(TrackPointer pTrack) {
m_newTrackMutex.lock();
m_newTrack = pTrack;
m_newTrackMutex.unlock();
m_pWorker->newTrack(pTrack);
m_pWorker->workReady();
}

void CachingReader::process() {
Expand Down Expand Up @@ -422,15 +370,16 @@ int CachingReader::read(int sample, int num_samples, CSAMPLE* buffer) {
// If the chunk is not in cache, then we must return an error.
if (current == NULL) {
// qDebug() << "Couldn't get chunk " << chunk_num
// << " in read() of [" << sample << "," << sample+num_samples
// << " in read() of [" << sample << "," << sample + num_samples
// << "] chunks " << start_chunk << "-" << end_chunk;

// Something is wrong. Break out of the loop, that should fill the
// samples requested with zeroes.
Counter("CachingReader::read cache miss")++;
break;
}

int chunk_start_sample = sampleForChunk(chunk_num);
int chunk_start_sample = CachingReaderWorker::sampleForChunk(chunk_num);
int chunk_offset = current_sample - chunk_start_sample;
int chunk_remaining_samples = current->length - chunk_offset;

Expand Down Expand Up @@ -506,13 +455,13 @@ int CachingReader::read(int sample, int num_samples, CSAMPLE* buffer) {
return zerosWritten + num_samples - samples_remaining;
}

void CachingReader::hintAndMaybeWake(QList<Hint>& hintList) {
void CachingReader::hintAndMaybeWake(const QVector<Hint>& hintList) {
// If no file is loaded, skip.
if (m_readerStatus != TRACK_LOADED) {
return;
}

QListIterator<Hint> iterator(hintList);
QVectorIterator<Hint> iterator(hintList);

// To prevent every bit of code having to guess how many samples
// forward it makes sense to keep in memory, the hinter can provide
Expand Down Expand Up @@ -544,10 +493,10 @@ void CachingReader::hintAndMaybeWake(QList<Hint>& hintList) {
continue;
}
int start_sample = math_max(0, math_min(
m_iTrackNumSamplesCallbackSafe, hint.sample));
m_iTrackNumSamplesCallbackSafe, hint.sample));
int start_chunk = chunkForSample(start_sample);
int end_sample = math_max(0, math_min(
m_iTrackNumSamplesCallbackSafe, hint.sample + hint.length - 1));
m_iTrackNumSamplesCallbackSafe, hint.sample + hint.length - 1));
int end_chunk = chunkForSample(end_sample);

for (int current = start_chunk; current <= end_chunk; ++current) {
Expand Down Expand Up @@ -587,100 +536,6 @@ void CachingReader::hintAndMaybeWake(QList<Hint>& hintList) {

// If there are chunks to be read, wake up.
if (shouldWake) {
wake();
}
}

void CachingReader::run() {
TrackPointer pLoadTrack;

m_newTrackMutex.lock();
if (m_newTrack) {
pLoadTrack = m_newTrack;
m_newTrack = TrackPointer();
}
m_newTrackMutex.unlock();

if (pLoadTrack) {
loadTrack(pLoadTrack);
} else {
// Read the requested chunks.
ChunkReadRequest request;
ReaderStatusUpdate status;
while (m_chunkReadRequestFIFO.read(&request, 1) == 1) {
processChunkReadRequest(&request, &status);
m_readerStatusFIFO.writeBlocking(&status, 1);
}
m_pWorker->workReady();
}

// Notify the EngineWorkerScheduler that the work we did is done.
setActive(false);
}

void CachingReader::wake() {
//qDebug() << m_pGroup << "CachingReader::wake()";
workReady();
}

void CachingReader::loadTrack(TrackPointer pTrack) {
//qDebug() << m_pGroup << "CachingReader::loadTrack() lock acquired for load.";

// Emit that a new track is loading, stops the current track
emit(trackLoading());

ReaderStatusUpdate status;
status.status = TRACK_LOADED;
status.chunk = NULL;
status.trackNumSamples = 0;

if (m_pCurrentSoundSource != NULL) {
delete m_pCurrentSoundSource;
m_pCurrentSoundSource = NULL;
}
m_iTrackSampleRate = 0;
m_iTrackNumSamples = 0;

QString filename = pTrack->getLocation();

if (filename.isEmpty() || !pTrack->exists()) {
// Must unlock before emitting to avoid deadlock
qDebug() << m_pGroup << "CachingReader::loadTrack() load failed for\""
<< filename << "\", unlocked reader lock";
status.status = TRACK_NOT_LOADED;
m_readerStatusFIFO.writeBlocking(&status, 1);
emit(trackLoadFailed(
pTrack, QString("The file '%1' could not be found.").arg(filename)));
return;
}

m_pCurrentSoundSource = new SoundSourceProxy(pTrack);
bool openSucceeded = (m_pCurrentSoundSource->open() == OK); //Open the song for reading
m_iTrackSampleRate = m_pCurrentSoundSource->getSampleRate();
m_iTrackNumSamples = status.trackNumSamples =
m_pCurrentSoundSource->length();

if (!openSucceeded || m_iTrackNumSamples == 0 || m_iTrackSampleRate == 0) {
// Must unlock before emitting to avoid deadlock
qDebug() << m_pGroup << "CachingReader::loadTrack() load failed for\""
<< filename << "\", file invalid, unlocked reader lock";
status.status = TRACK_NOT_LOADED;
m_readerStatusFIFO.writeBlocking(&status, 1);
emit(trackLoadFailed(
pTrack, QString("The file '%1' could not be loaded.").arg(filename)));
return;
}

m_readerStatusFIFO.writeBlocking(&status, 1);

// Clear the chunks to read list.
ChunkReadRequest request;
while (m_chunkReadRequestFIFO.read(&request, 1) == 1) {
qDebug() << "Skipping read request for " << request.chunk->chunk_number;
status.status = CHUNK_READ_INVALID;
status.chunk = request.chunk;
m_readerStatusFIFO.writeBlocking(&status, 1);
}

// Emit that the track is loaded.
emit(trackLoaded(pTrack, m_iTrackSampleRate, m_iTrackNumSamples));
}
Loading