Skip to content

Commit

Permalink
Fix race condition in consumeRoomData()
Browse files Browse the repository at this point in the history
QCoreApplication::processEvents() is well-known to be a _wrong_ solution
to the unresponsive UI problem; despite that, connection.cpp has long
had that call to let UI update itself while processing bulky room
updates (mainly from the initial sync). This commit finally fixes this,
after an (admittedly rare) race condition has been hit, as follows:
0. Pre-requisite: quotest runs all the tests and is about to leave
   the room; there's an ongoing sync request.
1. Quotest calls /leave
2. Sync returns, with the batch of _several_ rooms (that's important)
3. The above code handles the first room in the batch
4. processEvents() is called, just in time for the /leave response.
5. The /leave response handler in quotest ends up calling
   Connection::logout() (processEvents() still hasn't returned).
6. Connection::logout() calls abandon() on the ongoing SyncJob,
   pulling the rug from under onSyncSuccess()/consumeRoomData().
7. processEvents() returns and the above code proceeds to the next
   room - only to find that the roomDataList (that is a ref to
   a structure owned by SyncJob), is now pointing to garbage.

Morals of the story:
1. processEvents() effectively makes code multi-threaded: one flow is
   suspended and another one may run _on the same data_. After the first
   flow is resumed, it cannot make any assumptions regarding which data
   the second flow touched and/or changed.
2. The library had quite a few cases of using &&-refs, avoiding even
   move operations but also leaving ownership of the data with the
   original producer (SyncJob). If the lifetime of that producer ends
   too soon, those refs become dangling.

The fix makes two important things, respectively:
2. Ownership of room data is now transfered to the processing side,
   the moment it is scheduled (see below), in the form of moving
   into a lambda capture.
1. Instead of processEvents(), processing of room data is scheduled
   via QMetaObject::invokeMethod(), uncoupling the moment when the
   data was received in SyncJob from the moment they are processed
   in Room::updateData() (and all the numerous  signal-slots it calls).

Also: Room::baseStateLoaded now causes Connection::loadedRoomState, not
the other way round - this is more natural and doesn't need Connection
to keep firstTimeRooms map around.
  • Loading branch information
KitsuneRal committed May 8, 2022
1 parent 72ffe96 commit 0bd655c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 27 deletions.
24 changes: 13 additions & 11 deletions lib/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class Connection::Private {
/// as of the last sync
QHash<QString, QString> roomAliasMap;
QVector<QString> roomIdsToForget;
QVector<Room*> firstTimeRooms;
QVector<QString> pendingStateRoomIds;
QMap<QString, User*> userMap;
DirectChatsMap directChats;
Expand Down Expand Up @@ -833,16 +832,14 @@ void Connection::Private::consumeRoomData(SyncDataList&& roomDataList,
}
if (auto* r = q->provideRoom(roomData.roomId, roomData.joinState)) {
pendingStateRoomIds.removeOne(roomData.roomId);
r->updateData(std::move(roomData), fromCache);
if (firstTimeRooms.removeOne(r)) {
emit q->loadedRoomState(r);
if (capabilities.roomVersions)
r->checkVersion();
// Otherwise, the version will be checked in reloadCapabilities()
}
// Update rooms one by one, giving time to update the UI.
QMetaObject::invokeMethod(
r,
[r, rd = std::move(roomData), fromCache] () mutable {
r->updateData(std::move(rd), fromCache);
},
Qt::QueuedConnection);
}
// Let UI update itself after updating each room
QCoreApplication::processEvents();
}
}

Expand Down Expand Up @@ -1707,9 +1704,14 @@ Room* Connection::provideRoom(const QString& id, Omittable<JoinState> joinState)
return nullptr;
}
d->roomMap.insert(roomKey, room);
d->firstTimeRooms.push_back(room);
connect(room, &Room::beforeDestruction, this,
&Connection::aboutToDeleteRoom);
connect(room, &Room::baseStateLoaded, this, [this, room] {
emit loadedRoomState(room);
if (d->capabilities.roomVersions)
room->checkVersion();
// Otherwise, the version will be checked in reloadCapabilities()
});
emit newRoom(room);
}
if (!joinState)
Expand Down
2 changes: 1 addition & 1 deletion lib/jobs/syncjob.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class SyncJob : public BaseJob {
explicit SyncJob(const QString& since, const Filter& filter,
int timeout = -1, const QString& presence = {});

SyncData&& takeData() { return std::move(d); }
SyncData takeData() { return std::move(d); }

protected:
Status prepareResult() override;
Expand Down
11 changes: 6 additions & 5 deletions lib/room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,6 @@ Room::Room(Connection* connection, QString id, JoinState initialJoinState)
// https://marcmutz.wordpress.com/translated-articles/pimp-my-pimpl-%E2%80%94-reloaded/
d->q = this;
d->displayname = d->calculateDisplayname(); // Set initial "Empty room" name
connectUntil(connection, &Connection::loadedRoomState, this, [this](Room* r) {
if (this == r)
emit baseStateLoaded();
return this == r; // loadedRoomState fires only once per room
});
#ifdef Quotient_E2EE_ENABLED
connectSingleShot(this, &Room::encryption, this, [this, connection](){
connection->encryptionUpdate(this);
Expand Down Expand Up @@ -1820,6 +1815,9 @@ Room::Changes Room::Private::updateStatsFromSyncData(const SyncRoomData& data,

void Room::updateData(SyncRoomData&& data, bool fromCache)
{
qCDebug(MAIN) << "--- Updating room" << id() << "/" << objectName();
bool firstUpdate = d->baseState.empty();

if (d->prevBatch.isEmpty())
d->prevBatch = data.timelinePrevBatch;
setJoinState(data.joinState);
Expand All @@ -1845,6 +1843,9 @@ void Room::updateData(SyncRoomData&& data, bool fromCache)
emit namesChanged(this);

d->postprocessChanges(roomChanges, !fromCache);
if (firstUpdate)
emit baseStateLoaded();
qCDebug(MAIN) << "--- Finished updating room" << id() << "/" << objectName();
}

void Room::Private::postprocessChanges(Changes changes, bool saveState)
Expand Down
10 changes: 5 additions & 5 deletions lib/syncdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,26 @@ SyncData::SyncData(const QString& cacheFileName)
<< "is required; discarding the cache";
}

SyncDataList&& SyncData::takeRoomData() { return move(roomData); }
SyncDataList SyncData::takeRoomData() { return move(roomData); }

QString SyncData::fileNameForRoom(QString roomId)
{
roomId.replace(':', '_');
return roomId + ".json";
}

Events&& SyncData::takePresenceData() { return std::move(presenceData); }
Events SyncData::takePresenceData() { return std::move(presenceData); }

Events&& SyncData::takeAccountData() { return std::move(accountData); }
Events SyncData::takeAccountData() { return std::move(accountData); }

Events&& SyncData::takeToDeviceEvents() { return std::move(toDeviceEvents); }
Events SyncData::takeToDeviceEvents() { return std::move(toDeviceEvents); }

std::pair<int, int> SyncData::cacheVersion()
{
return { MajorCacheVersion, 2 };
}

DevicesList&& SyncData::takeDevicesList() { return std::move(devicesList); }
DevicesList SyncData::takeDevicesList() { return std::move(devicesList); }

QJsonObject SyncData::loadJson(const QString& fileName)
{
Expand Down
10 changes: 5 additions & 5 deletions lib/syncdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ class SyncData {
*/
void parseJson(const QJsonObject& json, const QString& baseDir = {});

Events&& takePresenceData();
Events&& takeAccountData();
Events&& takeToDeviceEvents();
Events takePresenceData();
Events takeAccountData();
Events takeToDeviceEvents();
const QHash<QString, int>& deviceOneTimeKeysCount() const
{
return deviceOneTimeKeysCount_;
}
SyncDataList&& takeRoomData();
DevicesList&& takeDevicesList();
SyncDataList takeRoomData();
DevicesList takeDevicesList();

QString nextBatch() const { return nextBatch_; }

Expand Down

0 comments on commit 0bd655c

Please sign in to comment.