Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed an issue where the messages might not get reprocessed when they should #927

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions Session.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@
FD6A7A692818BE7300035AC1 /* RetrieveDefaultOpenGroupRoomsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6A7A682818BE7300035AC1 /* RetrieveDefaultOpenGroupRoomsJob.swift */; };
FD6A7A6B2818C17C00035AC1 /* UpdateProfilePictureJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6A7A6A2818C17C00035AC1 /* UpdateProfilePictureJob.swift */; };
FD6A7A6D2818C61500035AC1 /* _002_SetupStandardJobs.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6A7A6C2818C61500035AC1 /* _002_SetupStandardJobs.swift */; };
FD6DF00B2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6DF00A2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift */; };
FD6E4C8A2A1AEE4700C7C243 /* LegacyUnsubscribeRequest.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6E4C892A1AEE4700C7C243 /* LegacyUnsubscribeRequest.swift */; };
FD705A92278D051200F16121 /* ReusableView.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD705A91278D051200F16121 /* ReusableView.swift */; };
FD7115EB28C5D78E00B47552 /* ThreadSettingsViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD7115EA28C5D78E00B47552 /* ThreadSettingsViewModel.swift */; };
Expand Down Expand Up @@ -1759,6 +1760,7 @@
FD6A7A682818BE7300035AC1 /* RetrieveDefaultOpenGroupRoomsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RetrieveDefaultOpenGroupRoomsJob.swift; sourceTree = "<group>"; };
FD6A7A6A2818C17C00035AC1 /* UpdateProfilePictureJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UpdateProfilePictureJob.swift; sourceTree = "<group>"; };
FD6A7A6C2818C61500035AC1 /* _002_SetupStandardJobs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _002_SetupStandardJobs.swift; sourceTree = "<group>"; };
FD6DF00A2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _005_AddSnodeReveivedMessageInfoPrimaryKey.swift; sourceTree = "<group>"; };
FD6E4C892A1AEE4700C7C243 /* LegacyUnsubscribeRequest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LegacyUnsubscribeRequest.swift; sourceTree = "<group>"; };
FD705A91278D051200F16121 /* ReusableView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReusableView.swift; sourceTree = "<group>"; };
FD7115EA28C5D78E00B47552 /* ThreadSettingsViewModel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadSettingsViewModel.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -3670,6 +3672,7 @@
FD6A7A6C2818C61500035AC1 /* _002_SetupStandardJobs.swift */,
FD17D7A327F40F8100122BE0 /* _003_YDBToGRDBMigration.swift */,
FD39353528F7C3390084DADA /* _004_FlagMessageHashAsDeletedOrInvalid.swift */,
FD6DF00A2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift */,
);
path = Migrations;
sourceTree = "<group>";
Expand Down Expand Up @@ -5765,6 +5768,7 @@
FD39353628F7C3390084DADA /* _004_FlagMessageHashAsDeletedOrInvalid.swift in Sources */,
FDF8489429405C1B007DCAE5 /* SnodeAPI.swift in Sources */,
FDF848C829405C5B007DCAE5 /* ONSResolveRequest.swift in Sources */,
FD6DF00B2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift in Sources */,
C3C2A5C2255385EE00C340D1 /* Configuration.swift in Sources */,
FDF848C929405C5B007DCAE5 /* SnodeRequest.swift in Sources */,
FDF848CF29405C5B007DCAE5 /* SendMessageRequest.swift in Sources */,
Expand Down
10 changes: 5 additions & 5 deletions SessionMessagingKit/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ public enum SNMessagingKit: MigratableTarget { // Just to make the external API
[
_001_InitialSetupMigration.self,
_002_SetupStandardJobs.self
],
], // Initial DB Creation
[
_003_YDBToGRDBMigration.self
],
], // YDB to GRDB Migration
[
_004_RemoveLegacyYDB.self
],
], // Legacy DB removal
[
_005_FixDeletedMessageReadState.self,
_006_FixHiddenModAdminSupport.self,
_007_HomeQueryOptimisationIndexes.self
],
], // Add job priorities
[
_008_EmojiReacts.self,
_009_OpenGroupPermission.self,
_010_AddThreadIdToFTS.self
], // Add job priorities
], // Fix thread FTS
[
_011_AddPendingReadReceipts.self,
_012_AddFTSIfNeeded.self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public class Poller {
let lastHashes: [String] = namespacedResults
.compactMap { $0.value.data?.lastHash }
let otherKnownHashes: [String] = namespacedResults
.filter { $0.key.shouldDedupeMessages }
.filter { $0.key.shouldFetchSinceLastHash }
.compactMap { $0.value.data?.messages.map { $0.info.hash } }
.reduce([], +)
var messageCount: Int = 0
Expand Down
12 changes: 8 additions & 4 deletions SessionSnodeKit/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ public enum SNSnodeKit: MigratableTarget { // Just to make the external API nice
[
_001_InitialSetupMigration.self,
_002_SetupStandardJobs.self
],
], // Initial DB Creation
[
_003_YDBToGRDBMigration.self
],
], // YDB to GRDB Migration
[
_004_FlagMessageHashAsDeletedOrInvalid.self
],
[] // Add job priorities
], // Legacy DB removal
[], // Add job priorities
[], // Fix thread FTS
[
_005_AddSnodeReveivedMessageInfoPrimaryKey.self
]
]
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ enum _001_InitialSetupMigration: Migration {
}

try db.create(table: SnodeReceivedMessageInfo.self) { t in
t.column(.id, .integer)
t.deprecatedColumn(name: "id", .integer) // stringlint:disable
.notNull()
.primaryKey(autoincrement: true)
t.column(.key, .text)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.

import Foundation
import GRDB
import SessionUtilitiesKit

enum _005_AddSnodeReveivedMessageInfoPrimaryKey: Migration {
static let target: TargetMigrations.Identifier = .snodeKit
static let identifier: String = "AddSnodeReveivedMessageInfoPrimaryKey" // stringlint:disable
static let needsConfigSync: Bool = false
static let fetchedTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]

/// This migration adds a flat to the `SnodeReceivedMessageInfo` so that when deleting interactions we can
/// ignore their hashes when subsequently trying to fetch new messages (which results in the storage server returning
/// messages from the beginning of time)
static let minExpectedRunDuration: TimeInterval = 0.2

static func migrate(_ db: Database) throws {
// SQLite doesn't support adding a new primary key after creation so we need to create a new table with
// the setup we want, copy data from the old table over, drop the old table and rename the new table
struct TmpSnodeReceivedMessageInfo: Codable, TableRecord, FetchableRecord, PersistableRecord, ColumnExpressible {
static var databaseTableName: String { "tmpSnodeReceivedMessageInfo" }

typealias Columns = CodingKeys
enum CodingKeys: String, CodingKey, ColumnExpression {
case key
case hash
case expirationDateMs
case wasDeletedOrInvalid
}

let key: String
let hash: String
let expirationDateMs: Int64
var wasDeletedOrInvalid: Bool?
}

try db.create(table: TmpSnodeReceivedMessageInfo.self) { t in
t.column(.key, .text).notNull()
t.column(.hash, .text).notNull()
t.column(.expirationDateMs, .integer).notNull()
t.column(.wasDeletedOrInvalid, .boolean)

t.primaryKey([.key, .hash])
}

// Insert into the new table, drop the old table and rename the new table to be the old one
let tmpInfo: TypedTableAlias<TmpSnodeReceivedMessageInfo> = TypedTableAlias()
let info: TypedTableAlias<SnodeReceivedMessageInfo> = TypedTableAlias()
try db.execute(literal: """
INSERT INTO \(tmpInfo)
SELECT \(info[.key]), \(info[.hash]), \(info[.expirationDateMs]), \(info[.wasDeletedOrInvalid])
FROM \(info)
""")

try db.drop(table: SnodeReceivedMessageInfo.self)
try db.rename(
table: TmpSnodeReceivedMessageInfo.databaseTableName,
to: SnodeReceivedMessageInfo.databaseTableName
)

// Need to create the indexes separately from creating 'TmpGroupMember' to ensure they
// have the correct names
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.key])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.hash])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.expirationDateMs])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.wasDeletedOrInvalid])

Storage.update(progress: 1, for: self, in: target)
}
}
15 changes: 2 additions & 13 deletions SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,12 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist

public typealias Columns = CodingKeys
public enum CodingKeys: String, CodingKey, ColumnExpression {
case id
case key
case hash
case expirationDateMs
case wasDeletedOrInvalid
}

/// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into
/// the database yet this value will be `nil`
public var id: Int64? = nil

/// The key this message hash is associated to
///
/// This will be a combination of {address}.{port}.{publicKey} for new rows and just the {publicKey} for legacy rows
Expand All @@ -41,12 +36,6 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist
///
/// **Note:** When retrieving the `lastNotExpired` we will ignore any entries where this flag is true
public var wasDeletedOrInvalid: Bool?

// MARK: - Custom Database Interaction

public mutating func didInsert(_ inserted: InsertionSuccess) {
self.id = inserted.rowID
}
}

// MARK: - Convenience
Expand Down Expand Up @@ -133,7 +122,7 @@ public extension SnodeReceivedMessageInfo {
)
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs > SnodeAPI.currentOffsetTimestampMs())
.order(SnodeReceivedMessageInfo.Columns.id.desc)
.order(Column.rowID.desc)
.fetchOne(db)

// If we have a non-legacy hash then return it immediately (legacy hashes had a different
Expand All @@ -146,7 +135,7 @@ public extension SnodeReceivedMessageInfo {
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid == false
)
.filter(SnodeReceivedMessageInfo.Columns.key == publicKey)
.order(SnodeReceivedMessageInfo.Columns.id.desc)
.order(Column.rowID.desc)
.fetchOne(db)
}
}
Expand Down
11 changes: 5 additions & 6 deletions SessionSnodeKit/Types/SnodeAPINamespace.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
//
// stringlint:disable

import Foundation

Expand Down Expand Up @@ -40,12 +42,9 @@ public extension SnodeAPI {
public var shouldFetchSinceLastHash: Bool { true }

/// This flag indicates whether we should dedupe messages from the specified namespace, when `true` we will
/// store a `SnodeReceivedMessageInfo` record for the message and check for a matching record whenever
/// we receive a message from this namespace
///
/// **Note:** An additional side-effect of this flag is that when we poll for messages from the specified namespace
/// we will always retrieve **all** messages from the namespace (instead of just new messages since the last one
/// we have seen)
/// attempt to `insert` a `SnodeReceivedMessageInfo` record (which will fail if we had already processed this
/// message previously), when `false` we will still `upsert` a record so we don't run into the unique constraint allowing
/// re-processing of a previously processed message
public var shouldDedupeMessages: Bool {
switch self {
case .`default`, .legacyClosedGroup: return true
Expand Down
6 changes: 4 additions & 2 deletions SessionUIKit/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ public enum SNUIKit: MigratableTarget {
// SNUIKit migrations
[], // Initial DB Creation
[], // YDB to GRDB Migration
[], // YDB Removal
[], // Legacy DB removal
[
_001_ThemePreferences.self
] // Add job priorities
], // Add job priorities
[], // Fix thread FTS
[]
]
)
}
Expand Down
8 changes: 5 additions & 3 deletions SessionUtilitiesKit/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ public enum SNUtilitiesKit: MigratableTarget { // Just to make the external API
_001_InitialSetupMigration.self,
_002_SetupStandardJobs.self,
_003_YDBToGRDBMigration.self
],
[], // Other DB migrations
], // Initial DB Creation
[], // YDB to GRDB Migration
[], // Legacy DB removal
[
_004_AddJobPriority.self
]
], // Add job priorities
[], // Fix thread FTS
[]
]
)
}
Expand Down