Skip to content

Commit

Permalink
Split SalmonWorker into smaller parts, move profile updating into ano…
Browse files Browse the repository at this point in the history
…ther job
  • Loading branch information
Gargron committed Apr 5, 2017
1 parent bafbf63 commit 5442083
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 25 deletions.
16 changes: 6 additions & 10 deletions app/services/follow_remote_account_service.rb
Expand Up @@ -45,13 +45,13 @@ def call(uri)
account.suspended = true if domain_block && domain_block.suspend?
account.silenced = true if domain_block && domain_block.silence?

xml = get_feed(account.remote_url)
hubs = get_hubs(xml)
body, xml = get_feed(account.remote_url)
hubs = get_hubs(xml)

account.uri = get_account_uri(xml)
account.hub_url = hubs.first.attribute('href').value

get_profile(xml, account)
get_profile(body, account)
account.save!

account
Expand All @@ -61,7 +61,7 @@ def call(uri)

def get_feed(url)
response = http_client.get(Addressable::URI.parse(url))
Nokogiri::XML(response)
[response.to_s, Nokogiri::XML(response)]
end

def get_hubs(xml)
Expand All @@ -82,12 +82,8 @@ def get_account_uri(xml)
author_uri.content
end

def get_profile(xml, account)
update_remote_profile_service.call(xml.at_xpath('/xmlns:feed'), account)
end

def update_remote_profile_service
@update_remote_profile_service ||= UpdateRemoteProfileService.new
def get_profile(body, account)
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), false)
end

def http_client
Expand Down
6 changes: 3 additions & 3 deletions app/services/process_feed_service.rb
Expand Up @@ -5,15 +5,15 @@ def call(body, account)
xml = Nokogiri::XML(body)
xml.encoding = 'utf-8'

update_author(xml, account)
update_author(body, xml, account)
process_entries(xml, account)
end

private

def update_author(xml, account)
def update_author(body, xml, account)
return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil?
UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS), account, true)
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true)
end

def process_entries(xml, account)
Expand Down
10 changes: 3 additions & 7 deletions app/services/process_interaction_service.rb
Expand Up @@ -24,7 +24,7 @@ def call(envelope, target_account)
return if account.suspended?

if salmon.verify(envelope, account.keypair)
update_remote_profile_service.call(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS), account, true)
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true)

case verb(xml)
when :follow
Expand Down Expand Up @@ -114,7 +114,7 @@ def delete_post!(xml, account)

return if status.nil?

remove_status_service.call(status) if account.id == status.account_id
RemovalWorker.perform_async(status.id) if account.id == status.account_id
end

def favourite!(xml, from_account)
Expand All @@ -130,7 +130,7 @@ def unfavourite!(xml, from_account)
end

def add_post!(body, account)
process_feed_service.call(body, account)
ProcessingWorker.perform_async(account.id, body.force_encoding('UTF-8'))
end

def status(xml)
Expand All @@ -153,10 +153,6 @@ def process_feed_service
@process_feed_service ||= ProcessFeedService.new
end

def update_remote_profile_service
@update_remote_profile_service ||= UpdateRemoteProfileService.new
end

def remove_status_service
@remove_status_service ||= RemoveStatusService.new
end
Expand Down
2 changes: 2 additions & 0 deletions app/workers/admin/suspension_worker.rb
Expand Up @@ -3,6 +3,8 @@
class Admin::SuspensionWorker
include Sidekiq::Worker

sidekiq_options queue: 'pull'

def perform(account_id)
SuspendAccountService.new.call(Account.find(account_id))
end
Expand Down
2 changes: 2 additions & 0 deletions app/workers/application_worker.rb
@@ -1,3 +1,5 @@
# frozen_string_literal: true

class ApplicationWorker
def info(message)
Rails.logger.info("#{self.class.name} - #{message}")
Expand Down
5 changes: 1 addition & 4 deletions app/workers/distribution_worker.rb
Expand Up @@ -4,10 +4,7 @@ class DistributionWorker < ApplicationWorker
include Sidekiq::Worker

def perform(status_id)
status = Status.find(status_id)

FanOutOnWriteService.new.call(status)
WarmCacheService.new.call(status)
FanOutOnWriteService.new.call(Status.find(status_id))
rescue ActiveRecord::RecordNotFound
info("Couldn't find the status")
end
Expand Down
20 changes: 20 additions & 0 deletions app/workers/remote_profile_update_worker.rb
@@ -0,0 +1,20 @@
# frozen_string_literal: true

class RemoteProfileUpdateWorker
include Sidekiq::Worker

sidekiq_options queue: 'pull'

def perform(account_id, body, resubscribe)
account = Account.find(account_id)

xml = Nokogiri::XML(body)
xml.encoding = 'utf-8'

author_container = xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS) || xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS)

UpdateRemoteProfileService.new.call(author_container, account, resubscribe)
rescue ActiveRecord::RecordNotFound
true
end
end
2 changes: 1 addition & 1 deletion app/workers/salmon_worker.rb
Expand Up @@ -7,7 +7,7 @@ class SalmonWorker

def perform(account_id, body)
ProcessInteractionService.new.call(body, Account.find(account_id))
rescue ActiveRecord::RecordNotFound
rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound
true
end
end
1 change: 1 addition & 0 deletions spec/services/process_feed_service_spec.rb
Expand Up @@ -16,6 +16,7 @@
end

it 'updates remote user\'s account information' do
account.reload
expect(account.display_name).to eq '::1'
expect(account).to have_attached_file(:avatar)
end
Expand Down

0 comments on commit 5442083

Please sign in to comment.