Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change sidekiq-bulk's batch size from 10,000 to 1,000 jobs in one Redis call #24034

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/lib/activitypub/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def forwardable?
end

def forward!
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
[payload, signature_account_id, inbox_url]
end
end
Expand Down
4 changes: 2 additions & 2 deletions app/services/delete_account_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ def purge_association(association_name)
end

def delete_actor!
ActivityPub::DeliveryWorker.push_bulk(delivery_inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(delivery_inboxes, limit: 1_000) do |inbox_url|
[delete_actor_json, @account.id, inbox_url]
end

ActivityPub::LowPriorityDeliveryWorker.push_bulk(low_priority_delivery_inboxes) do |inbox_url|
ActivityPub::LowPriorityDeliveryWorker.push_bulk(low_priority_delivery_inboxes, limit: 1_000) do |inbox_url|
[delete_actor_json, @account.id, inbox_url]
end
end
Expand Down
2 changes: 1 addition & 1 deletion app/services/remove_status_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def remove_from_remote_reach

status_reach_finder = StatusReachFinder.new(@status, unsafe: true)

ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes, limit: 1_000) do |inbox_url|
[signed_activity_json, @account.id, inbox_url]
end
end
Expand Down
12 changes: 6 additions & 6 deletions app/services/suspend_account_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ def reject_remote_follows!
# counterpart to this operation, i.e. you can't then force a remote
# account to re-follow you, so this part is not reversible.

follows = Follow.where(account: @account).to_a
Follow.where(account: @account).find_in_batches do |follows|
ActivityPub::DeliveryWorker.push_bulk(follows) do |follow|
[Oj.dump(serialize_payload(follow, ActivityPub::RejectFollowSerializer)), follow.target_account_id, @account.inbox_url]
end

ActivityPub::DeliveryWorker.push_bulk(follows) do |follow|
[Oj.dump(serialize_payload(follow, ActivityPub::RejectFollowSerializer)), follow.target_account_id, @account.inbox_url]
follows.each(&:destroy)
end

follows.each(&:destroy)
Comment on lines -34 to -40
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one changes more than just the push_bulk call by processing follows in batches in the first place.

end

def distribute_update_actor!
return unless @account.local?

account_reach_finder = AccountReachFinder.new(@account)

ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes, limit: 1_000) do |inbox_url|
[signed_activity_json, @account.id, inbox_url]
end
end
Expand Down
2 changes: 1 addition & 1 deletion app/services/unsuspend_account_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def distribute_update_actor!

account_reach_finder = AccountReachFinder.new(@account)

ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes, limit: 1_000) do |inbox_url|
[signed_activity_json, @account.id, inbox_url]
end
end
Expand Down
2 changes: 1 addition & 1 deletion app/services/update_account_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def call(account, params, raise_error: false)
def authorize_all_follow_requests(account)
follow_requests = FollowRequest.where(target_account: account)
follow_requests = follow_requests.preload(:account).select { |req| !req.account.silenced? }
AuthorizeFollowWorker.push_bulk(follow_requests) do |req|
AuthorizeFollowWorker.push_bulk(follow_requests, limit: 1_000) do |req|
[req.account_id, req.target_account_id]
end
end
Expand Down
2 changes: 1 addition & 1 deletion app/workers/activitypub/distribute_poll_update_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def perform(status_id)

return unless @status.preloadable_poll

ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
[payload, @account.id, inbox_url]
end

Expand Down
2 changes: 1 addition & 1 deletion app/workers/activitypub/move_distribution_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def perform(migration_id)
@migration = AccountMigration.find(migration_id)
@account = @migration.account

ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
[signed_payload, @account.id, inbox_url]
end

Expand Down
2 changes: 1 addition & 1 deletion app/workers/activitypub/raw_distribution_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def perform(json, source_account_id, exclude_inboxes = [])
def distribute!
return if inboxes.empty?

ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
[payload, source_account_id, inbox_url, options]
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def self_destruct
json = Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(account))

unless options[:dry_run]
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
[json, account.id, inbox_url]
end

Expand Down