Skip to content

Commit

Permalink
Fix posts from threads received out-of-order sometimes not being inse…
Browse files Browse the repository at this point in the history
…rted into timelines (mastodon#27653)
  • Loading branch information
ClearlyClaire authored and vmstan committed Dec 14, 2023
1 parent 24b9922 commit 642eb56
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 4 deletions.
8 changes: 6 additions & 2 deletions app/services/fan_out_on_write_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class FanOutOnWriteService < BaseService
# @param [Hash] options
# @option options [Boolean] update
# @option options [Array<Integer>] silenced_account_ids
# @option options [Boolean] skip_notifications
def call(status, options = {})
@status = status
@account = status.account
Expand Down Expand Up @@ -37,8 +38,11 @@ def check_race_condition!

def fan_out_to_local_recipients!
deliver_to_self!
notify_mentioned_accounts!
notify_about_update! if update?

unless @options[:skip_notifications]
notify_mentioned_accounts!
notify_about_update! if update?
end

case @status.visibility.to_sym
when :public, :unlisted, :private
Expand Down
9 changes: 7 additions & 2 deletions app/workers/thread_resolve_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ class ThreadResolveWorker
sidekiq_options queue: 'pull', retry: 3

def perform(child_status_id, parent_url, options = {})
child_status = Status.find(child_status_id)
parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
child_status = Status.find(child_status_id)
return if child_status.in_reply_to_id.present?

parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)

return if parent_status.nil?

child_status.thread = parent_status
child_status.save!

DistributionWorker.perform_async(child_status_id, { 'skip_notifications' => true }) if child_status.within_realtime_window?
rescue ActiveRecord::RecordNotFound
true
end
Expand Down
103 changes: 103 additions & 0 deletions spec/lib/activitypub/activity/create_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,109 @@
stub_request(:get, 'http://example.com/emojib.png').to_return(body: attachment_fixture('emojo.png'), headers: { 'Content-Type' => 'application/octet-stream' })
end

describe 'processing posts received out of order' do
let(:follower) { Fabricate(:account, username: 'bob') }

let(:object_json) do
{
id: [ActivityPub::TagManager.instance.uri_for(sender), 'post1'].join('/'),
type: 'Note',
to: [
'https://www.w3.org/ns/activitystreams#Public',
ActivityPub::TagManager.instance.uri_for(follower),
],
content: '@bob lorem ipsum',
published: 1.hour.ago.utc.iso8601,
updated: 1.hour.ago.utc.iso8601,
tag: {
type: 'Mention',
href: ActivityPub::TagManager.instance.uri_for(follower),
},
}
end

let(:reply_json) do
{
id: [ActivityPub::TagManager.instance.uri_for(sender), 'reply'].join('/'),
type: 'Note',
inReplyTo: object_json[:id],
to: [
'https://www.w3.org/ns/activitystreams#Public',
ActivityPub::TagManager.instance.uri_for(follower),
],
content: '@bob lorem ipsum',
published: Time.now.utc.iso8601,
updated: Time.now.utc.iso8601,
tag: {
type: 'Mention',
href: ActivityPub::TagManager.instance.uri_for(follower),
},
}
end

def activity_for_object(json)
{
'@context': 'https://www.w3.org/ns/activitystreams',
id: [json[:id], 'activity'].join('/'),
type: 'Create',
actor: ActivityPub::TagManager.instance.uri_for(sender),
object: json,
}.with_indifferent_access
end

before do
follower.follow!(sender)
end

around do |example|
Sidekiq::Testing.fake! do
example.run
Sidekiq::Worker.clear_all
end
end

it 'correctly processes posts and inserts them in timelines', :aggregate_failures do
# Simulate a temporary failure preventing from fetching the parent post
stub_request(:get, object_json[:id]).to_return(status: 500)

# When receiving the reply…
described_class.new(activity_for_object(reply_json), sender, delivery: true).perform

# NOTE: Refering explicitly to the workers is a bit awkward
DistributionWorker.drain
FeedInsertWorker.drain

# …it creates a status with an unknown parent
reply = Status.find_by(uri: reply_json[:id])
expect(reply.reply?).to be true
expect(reply.in_reply_to_id).to be_nil

# …and creates a notification
expect(LocalNotificationWorker.jobs.size).to eq 1

# …but does not insert it into timelines
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_nil

# When receiving the parent…
described_class.new(activity_for_object(object_json), sender, delivery: true).perform

Sidekiq::Worker.drain_all

# …it creates a status and insert it into timelines
parent = Status.find_by(uri: object_json[:id])
expect(parent.reply?).to be false
expect(parent.in_reply_to_id).to be_nil
expect(reply.reload.in_reply_to_id).to eq parent.id

# Check that the both statuses have been inserted into the home feed
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), parent.id)).to be_within(0.1).of(parent.id.to_f)
expect(redis.zscore(FeedManager.instance.key(:home, follower.id), reply.id)).to be_within(0.1).of(reply.id.to_f)

# Creates two notifications
expect(Notification.count).to eq 2
end
end

describe '#perform' do
context 'when fetching' do
subject { described_class.new(json, sender) }
Expand Down

0 comments on commit 642eb56

Please sign in to comment.