Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition(s) while unloading/loading tracks #2305

Merged
merged 5 commits into from Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 45 additions & 22 deletions src/engine/cachingreader.cpp
Expand Up @@ -56,12 +56,12 @@ CachingReader::CachingReader(QString group,
// The capacity of the back channel must be equal to the number of
// allocated chunks, because the worker use writeBlocking(). Otherwise
// the worker could get stuck in a hot loop!!!
m_readerStatusFIFO(kNumberOfCachedChunksInMemory),
m_readerStatus(INVALID),
m_stateFIFO(kNumberOfCachedChunksInMemory),
m_state(State::Idle),
m_mruCachingReaderChunk(nullptr),
m_lruCachingReaderChunk(nullptr),
m_sampleBuffer(CachingReaderChunk::kSamples * kNumberOfCachedChunksInMemory),
m_worker(group, &m_chunkReadRequestFIFO, &m_readerStatusFIFO) {
m_worker(group, &m_chunkReadRequestFIFO, &m_stateFIFO) {

m_allocatedCachingReaderChunks.reserve(kNumberOfCachedChunksInMemory);
// Divide up the allocated raw memory buffer into total_chunks
Expand Down Expand Up @@ -203,13 +203,37 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex
void CachingReader::newTrack(TrackPointer pTrack) {
m_worker.newTrack(pTrack);
m_worker.workReady();
// Don't accept any new read requests until the current
// track has been unloaded and the new track has been
// loaded!
m_state = State::TrackLoading;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set this bit before touching the worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. The worker thread is not intercepting the reader. The reader polls for updates. But if it helps to understand what is happening I will reorder the instructions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...it is even desired to inform the worker asap. A new comment explains why.

// Free all chunks with sample data from the current track
freeAllChunks();
}

void CachingReader::process() {
ReaderStatusUpdate update;
while (m_readerStatusFIFO.read(&update, 1) == 1) {
while (m_stateFIFO.read(&update, 1) == 1) {
DEBUG_ASSERT(m_state != State::Idle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one always fails after ejecting a track and loading a new one.
This is normal for every update.status == TRACK_LOADED message.
So it can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • After TRACK_LOADED the state becomes State::TrackLoaded
  • After TRACK_UNLOADED the state becomes State::Idle and no new updates are expected.

While the reader is idle no update messages are expected!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debug assertion is there for a reason. Idle means don't bother me with update messages or something is seriously wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be true after your recent changes in the other PR, but this assumption is here wrong.
Try it out. I am still wrapping my head around you new code but I want first understand the original issue.

Was this your assertion as well?

auto pChunk = update.takeFromWorker();
if (pChunk) {
// Response to a read request
DEBUG_ASSERT(
update.status == CHUNK_READ_SUCCESS ||
update.status == CHUNK_READ_EOF ||
update.status == CHUNK_READ_INVALID ||
update.status == CHUNK_READ_DISCARDED);
if (m_state == State::TrackLoading) {
// All chunks have been freed before loading the next track!
DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);
// Discard all pending read requests for the previous track
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Discard all results from pending read requests

// that are discarded by the worker before loading the next
// track.
freeChunk(pChunk);
continue;
}
DEBUG_ASSERT(m_state == State::TrackLoaded);
if (update.status == CHUNK_READ_SUCCESS) {
// Insert or freshen the chunk in the MRU/LRU list after
// obtaining ownership from the worker.
Expand All @@ -218,24 +242,23 @@ void CachingReader::process() {
// Discard chunks that don't carry any data
freeChunk(pChunk);
}
}
if (update.status == TRACK_NOT_LOADED) {
m_readerStatus = update.status;
} else if (update.status == TRACK_LOADED) {
m_readerStatus = update.status;
// Reset the max. readable frame index
m_readableFrameIndexRange = update.readableFrameIndexRange();
// Free all chunks with sample data from a previous track
freeAllChunks();
}
if (m_readerStatus == TRACK_LOADED) {
// Adjust the readable frame index range after loading or reading
m_readableFrameIndexRange = intersect(
m_readableFrameIndexRange,
update.readableFrameIndexRange());
// Adjust the readable frame index range (if available)
if (update.status != CHUNK_READ_DISCARDED) {
m_readableFrameIndexRange = intersect(
m_readableFrameIndexRange,
update.readableFrameIndexRange());
}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that we hit always the else branch? We need to receive status update without a chunk.
I think it would be better to have that decoupled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is caused by the coupled design. But I won't change that. The debug assertions clearly indicate that there are two kinds of updates:

  • Read result with a chunk
  • State change track loaded/unloaded without a chunk

DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);
if (update.status == TRACK_LOADED) {
m_state = State::TrackLoaded;
} else {
DEBUG_ASSERT(update.status == TRACK_UNLOADED);
m_state = State::Idle;
}
// Reset the readable frame index range
m_readableFrameIndexRange = mixxx::IndexRange();
m_readableFrameIndexRange = update.readableFrameIndexRange();
}
}
}
Expand All @@ -259,7 +282,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,
}

// If no track is loaded, don't do anything.
if (m_readerStatus != TRACK_LOADED) {
if (m_state != State::TrackLoaded) {
return ReadResult::UNAVAILABLE;
}

Expand Down Expand Up @@ -456,7 +479,7 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,

void CachingReader::hintAndMaybeWake(const HintVector& hintList) {
// If no file is loaded, skip.
if (m_readerStatus != TRACK_LOADED) {
if (m_state != State::TrackLoaded) {
return;
}

Expand Down
9 changes: 7 additions & 2 deletions src/engine/cachingreader.h
Expand Up @@ -124,7 +124,7 @@ class CachingReader : public QObject {
// Thread-safe FIFOs for communication between the engine callback and
// reader thread.
FIFO<CachingReaderChunkReadRequest> m_chunkReadRequestFIFO;
FIFO<ReaderStatusUpdate> m_readerStatusFIFO;
FIFO<ReaderStatusUpdate> m_stateFIFO;

// Looks for the provided chunk number in the index of in-memory chunks and
// returns it if it is present. If not, returns nullptr. If it is present then
Expand All @@ -151,7 +151,12 @@ class CachingReader : public QObject {
// Gets a chunk from the free list, frees the LRU CachingReaderChunk if none available.
CachingReaderChunkForOwner* allocateChunkExpireLRU(SINT chunkIndex);

ReaderStatus m_readerStatus;
enum class State {
Idle,
TrackLoading,
TrackLoaded,
};
State m_state;

// Keeps track of all CachingReaderChunks we've allocated.
QVector<CachingReaderChunkForOwner*> m_chunks;
Expand Down
68 changes: 42 additions & 26 deletions src/engine/cachingreaderworker.cpp
Expand Up @@ -122,73 +122,89 @@ void CachingReaderWorker::run() {
}

void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
ReaderStatusUpdate update;
update.init(TRACK_NOT_LOADED);
// Discard all pending read requests
CachingReaderChunkReadRequest request;
while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the fifo multi consumer aware? I think we need to move this into the run() method and the if (m_newTrackAvailable) { branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No and it doesn't have to be. loadTrack() is private and only invoked from run() within the same thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I misread and took it for newTrack()

const auto update = ReaderStatusUpdate::readDiscarded(request.chunk);
m_pReaderStatusFIFO->writeBlocking(&update, 1);
}

// Unload the track
m_readableFrameIndexRange = mixxx::IndexRange();
m_pAudioSource.reset(); // Close open file handles

if (!pTrack) {
// Unload track
m_pAudioSource.reset(); // Close open file handles
m_readableFrameIndexRange = mixxx::IndexRange();
// If no new track is available then we are done
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
return;
}

// Emit that a new track is loading, stops the current track
emit(trackLoading());
emit trackLoading();

QString filename = pTrack->getLocation();
if (filename.isEmpty() || !pTrack->exists()) {
kLogger.warning()
<< m_group
<< "File not found"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit(trackLoadFailed(
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be found.")
.arg(QDir::toNativeSeparators(filename))));
.arg(QDir::toNativeSeparators(filename)));
return;
}

mixxx::AudioSource::OpenParams config;
config.setChannelCount(CachingReaderChunk::kChannels);
m_pAudioSource = SoundSourceProxy(pTrack).openAudioSource(config);
if (!m_pAudioSource) {
m_readableFrameIndexRange = mixxx::IndexRange();
kLogger.warning()
<< m_group
<< "Failed to open file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit(trackLoadFailed(
pTrack, QString("The file '%1' could not be loaded.").arg(filename)));
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be loaded").arg(filename));
return;
}

const SINT tempReadBufferSize = m_pAudioSource->frames2samples(CachingReaderChunk::kFrames);
if (m_tempReadBuffer.size() != tempReadBufferSize) {
mixxx::SampleBuffer(tempReadBufferSize).swap(m_tempReadBuffer);
}

// Initially assume that the complete content offered by audio source
// is available for reading. Later if read errors occur this value will
// be decreased to avoid repeated reading of corrupt audio data.
m_readableFrameIndexRange = m_pAudioSource->frameIndexRange();

update.init(TRACK_LOADED, nullptr, m_pAudioSource->frameIndexRange());
m_pReaderStatusFIFO->writeBlocking(&update, 1);

// Clear the chunks to read list.
CachingReaderChunkReadRequest request;
while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) {
update.init(CHUNK_READ_INVALID, request.chunk);
if (m_readableFrameIndexRange.empty()) {
m_pAudioSource.reset(); // Close open file handles
kLogger.warning()
<< m_group
<< "Failed to open empty file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' is empty and could not be loaded").arg(filename));
return;
}

// Adjust the internal buffer
const SINT tempReadBufferSize =
m_pAudioSource->frames2samples(CachingReaderChunk::kFrames);
if (m_tempReadBuffer.size() != tempReadBufferSize) {
mixxx::SampleBuffer(tempReadBufferSize).swap(m_tempReadBuffer);
}

const auto update =
ReaderStatusUpdate::trackLoaded(m_readableFrameIndexRange);
m_pReaderStatusFIFO->writeBlocking(&update, 1);

// Emit that the track is loaded.
const SINT sampleCount =
CachingReaderChunk::frames2samples(
m_pAudioSource->frameLength());
emit(trackLoaded(pTrack, m_pAudioSource->sampleRate(), sampleCount));
m_readableFrameIndexRange.length());
emit trackLoaded(pTrack, m_pAudioSource->sampleRate(), sampleCount);
}

void CachingReaderWorker::quitWait() {
Expand Down
33 changes: 27 additions & 6 deletions src/engine/cachingreaderworker.h
Expand Up @@ -26,12 +26,12 @@ typedef struct CachingReaderChunkReadRequest {
} CachingReaderChunkReadRequest;

enum ReaderStatus {
INVALID,
TRACK_NOT_LOADED,
TRACK_LOADED,
TRACK_UNLOADED,
CHUNK_READ_SUCCESS,
CHUNK_READ_EOF,
CHUNK_READ_INVALID
CHUNK_READ_INVALID,
CHUNK_READ_DISCARDED, // response without frame index range!
};

// POD with trivial ctor/dtor/copy for passing through FIFO
Expand All @@ -45,15 +45,36 @@ typedef struct ReaderStatusUpdate {
ReaderStatus status;

void init(
ReaderStatus statusArg = INVALID,
CachingReaderChunk* chunkArg = nullptr,
const mixxx::IndexRange& readableFrameIndexRangeArg = mixxx::IndexRange()) {
ReaderStatus statusArg,
CachingReaderChunk* chunkArg,
const mixxx::IndexRange& readableFrameIndexRangeArg) {
status = statusArg;
chunk = chunkArg;
readableFrameIndexRangeStart = readableFrameIndexRangeArg.start();
readableFrameIndexRangeEnd = readableFrameIndexRangeArg.end();
}

static ReaderStatusUpdate readDiscarded(
CachingReaderChunk* chunk) {
ReaderStatusUpdate update;
update.init(CHUNK_READ_DISCARDED, chunk, mixxx::IndexRange());
return update;
}

static ReaderStatusUpdate trackLoaded(
const mixxx::IndexRange& readableFrameIndexRange) {
DEBUG_ASSERT(!readableFrameIndexRange.empty());
ReaderStatusUpdate update;
update.init(TRACK_LOADED, nullptr, readableFrameIndexRange);
return update;
}

static ReaderStatusUpdate trackNotLoaded() {
ReaderStatusUpdate update;
update.init(TRACK_UNLOADED, nullptr, mixxx::IndexRange());
return update;
}

CachingReaderChunkForOwner* takeFromWorker() {
CachingReaderChunkForOwner* pChunk = nullptr;
if (chunk) {
Expand Down