Skip to content

Commit

Permalink
Fix single Redis connection being used across all threads (#18135)
Browse files Browse the repository at this point in the history
* Fix single Redis connection being used across all Sidekiq threads

* Fix tests
  • Loading branch information
Gargron committed Apr 28, 2022
1 parent 9bf04db commit 3917353
Show file tree
Hide file tree
Showing 44 changed files with 243 additions and 124 deletions.
8 changes: 5 additions & 3 deletions app/controllers/admin/dashboard_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module Admin
class DashboardController < BaseController
include Redisable

def index
@system_checks = Admin::SystemCheck.perform
@time_period = (29.days.ago.to_date...Time.now.utc.to_date)
Expand All @@ -15,10 +17,10 @@ def index

def redis_info
@redis_info ||= begin
if Redis.current.is_a?(Redis::Namespace)
Redis.current.redis.info
if redis.is_a?(Redis::Namespace)
redis.redis.info
else
Redis.current.info
redis.info
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion app/controllers/media_proxy_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
class MediaProxyController < ApplicationController
include RoutingHelper
include Authorization
include Redisable

skip_before_action :store_current_location
skip_before_action :require_functional!
Expand Down Expand Up @@ -45,7 +46,7 @@ def version
end

def lock_options
{ redis: Redis.current, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds }
{ redis: redis, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds }
end

def reject_media?
Expand Down
3 changes: 2 additions & 1 deletion app/controllers/settings/exports_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

class Settings::ExportsController < Settings::BaseController
include Authorization
include Redisable

skip_before_action :require_functional!

Expand All @@ -28,6 +29,6 @@ def create
end

def lock_options
{ redis: Redis.current, key: "backup:#{current_user.id}" }
{ redis: redis, key: "backup:#{current_user.id}" }
end
end
4 changes: 3 additions & 1 deletion app/lib/access_token_extension.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module AccessTokenExtension
extend ActiveSupport::Concern

included do
include Redisable

after_commit :push_to_streaming_api
end

Expand All @@ -16,6 +18,6 @@ def update_last_used(request, clock = Time)
end

def push_to_streaming_api
Redis.current.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed?
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed?
end
end
2 changes: 1 addition & 1 deletion app/lib/activitypub/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def lock_or_return(key, expire_after = 2.hours.seconds)
end

def lock_or_fail(key, expire_after = 15.minutes.seconds)
RedisLock.acquire({ redis: Redis.current, key: key, autorelease: expire_after }) do |lock|
RedisLock.acquire({ redis: redis, key: key, autorelease: expire_after }) do |lock|
if lock.acquired?
yield
else
Expand Down
18 changes: 11 additions & 7 deletions app/lib/delivery_failure_tracker.rb
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
# frozen_string_literal: true

class DeliveryFailureTracker
include Redisable

FAILURE_DAYS_THRESHOLD = 7

def initialize(url_or_host)
@host = url_or_host.start_with?('https://') || url_or_host.start_with?('http://') ? Addressable::URI.parse(url_or_host).normalized_host : url_or_host
end

def track_failure!
Redis.current.sadd(exhausted_deliveries_key, today)
redis.sadd(exhausted_deliveries_key, today)
UnavailableDomain.create(domain: @host) if reached_failure_threshold?
end

def track_success!
Redis.current.del(exhausted_deliveries_key)
redis.del(exhausted_deliveries_key)
UnavailableDomain.find_by(domain: @host)&.destroy
end

def clear_failures!
Redis.current.del(exhausted_deliveries_key)
redis.del(exhausted_deliveries_key)
end

def days
Redis.current.scard(exhausted_deliveries_key) || 0
redis.scard(exhausted_deliveries_key) || 0
end

def available?
!UnavailableDomain.where(domain: @host).exists?
end

def exhausted_deliveries_days
@exhausted_deliveries_days ||= Redis.current.smembers(exhausted_deliveries_key).sort.map { |date| Date.new(date.slice(0, 4).to_i, date.slice(4, 2).to_i, date.slice(6, 2).to_i) }
@exhausted_deliveries_days ||= redis.smembers(exhausted_deliveries_key).sort.map { |date| Date.new(date.slice(0, 4).to_i, date.slice(4, 2).to_i, date.slice(6, 2).to_i) }
end

alias reset! track_success!

class << self
include Redisable

def without_unavailable(urls)
unavailable_domains_map = Rails.cache.fetch('unavailable_domains') { UnavailableDomain.pluck(:domain).index_with(true) }

Expand All @@ -54,15 +58,15 @@ def reset!(url)
end

def warning_domains
domains = Redis.current.keys(exhausted_deliveries_key_by('*')).map do |key|
domains = redis.keys(exhausted_deliveries_key_by('*')).map do |key|
key.delete_prefix(exhausted_deliveries_key_by(''))
end

domains - UnavailableDomain.all.pluck(:domain)
end

def warning_domains_map
warning_domains.index_with { |domain| Redis.current.scard(exhausted_deliveries_key_by(domain)) }
warning_domains.index_with { |domain| redis.scard(exhausted_deliveries_key_by(domain)) }
end

private
Expand Down
47 changes: 47 additions & 0 deletions app/lib/redis_configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

class RedisConfiguration
class << self
def with
pool.with { |redis| yield redis }
end

def pool
@pool ||= ConnectionPool.new(size: pool_size) { new.connection }
end

def pool_size
if Sidekiq.server?
Sidekiq.options[:concurrency]
else
ENV['MAX_THREADS'] || 5
end
end
end

def connection
if namespace?
Redis::Namespace.new(namespace, redis: raw_connection)
else
raw_connection
end
end

def namespace?
namespace.present?
end

def namespace
ENV.fetch('REDIS_NAMESPACE', nil)
end

def url
ENV['REDIS_URL']
end

private

def raw_connection
Redis.new(url: url, driver: :hiredis)
end
end
4 changes: 3 additions & 1 deletion app/models/account_conversation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#

class AccountConversation < ApplicationRecord
include Redisable

after_commit :push_to_streaming_api

belongs_to :account
Expand Down Expand Up @@ -109,7 +111,7 @@ def push_to_streaming_api
end

def subscribed_to_timeline?
Redis.current.exists?("subscribed:#{streaming_channel}")
redis.exists?("subscribed:#{streaming_channel}")
end

def streaming_channel
Expand Down
4 changes: 3 additions & 1 deletion app/models/account_suggestions/global_source.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

class AccountSuggestions::GlobalSource < AccountSuggestions::Source
include Redisable

def key
:global
end
Expand Down Expand Up @@ -28,7 +30,7 @@ def scope(account)
end

def account_ids_for_locale(locale)
Redis.current.zrevrange("follow_recommendations:#{locale}", 0, -1).map(&:to_i)
redis.zrevrange("follow_recommendations:#{locale}", 0, -1).map(&:to_i)
end

def to_ordered_list_key(account)
Expand Down
2 changes: 1 addition & 1 deletion app/models/concerns/redisable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ module Redisable
private

def redis
Redis.current
Thread.current[:redis] ||= RedisConfiguration.new.connection
end
end
3 changes: 2 additions & 1 deletion app/models/custom_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class CustomFilter < ApplicationRecord
).freeze

include Expireable
include Redisable

belongs_to :account

Expand Down Expand Up @@ -51,7 +52,7 @@ def clean_up_contexts

def remove_cache
Rails.cache.delete("filters:#{account_id}")
Redis.current.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed))
redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed))
end

def context_must_be_valid
Expand Down
3 changes: 2 additions & 1 deletion app/models/encrypted_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class EncryptedMessage < ApplicationRecord
self.inheritance_column = nil

include Paginable
include Redisable

scope :up_to, ->(id) { where(arel_table[:id].lteq(id)) }

Expand All @@ -38,7 +39,7 @@ def push_to_streaming_api
end

def subscribed_to_timeline?
Redis.current.exists?("subscribed:#{streaming_channel}")
redis.exists?("subscribed:#{streaming_channel}")
end

def streaming_channel
Expand Down
4 changes: 3 additions & 1 deletion app/models/follow_recommendation_filter.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

class FollowRecommendationFilter
include Redisable

KEYS = %i(
language
status
Expand All @@ -17,7 +19,7 @@ def results
if params['status'] == 'suppressed'
Account.joins(:follow_recommendation_suppression).order(FollowRecommendationSuppression.arel_table[:id].desc).to_a
else
account_ids = Redis.current.zrevrange("follow_recommendations:#{@language}", 0, -1).map(&:to_i)
account_ids = redis.zrevrange("follow_recommendations:#{@language}", 0, -1).map(&:to_i)
accounts = Account.where(id: account_ids).index_by(&:id)

account_ids.map { |id| accounts[id] }.compact
Expand Down
3 changes: 2 additions & 1 deletion app/models/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class User < ApplicationRecord

include Settings::Extend
include UserRoles
include Redisable

# The home and list feeds will be stored in Redis for this amount
# of time, and status fan-out to followers will include only people
Expand Down Expand Up @@ -456,7 +457,7 @@ def notify_staff_about_pending_account!
end

def regenerate_feed!
RegenerationWorker.perform_async(account_id) if Redis.current.set("account:#{account_id}:regeneration", true, nx: true, ex: 1.day.seconds)
RegenerationWorker.perform_async(account_id) if redis.set("account:#{account_id}:regeneration", true, nx: true, ex: 1.day.seconds)
end

def needs_feed_update?
Expand Down
3 changes: 2 additions & 1 deletion app/services/activitypub/process_account_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
class ActivityPub::ProcessAccountService < BaseService
include JsonLdHelper
include DomainControlHelper
include Redisable

# Should be called with confirmed valid JSON
# and WebFinger-resolved username and domain
Expand Down Expand Up @@ -289,7 +290,7 @@ def protocol_changed?
end

def lock_options
{ redis: Redis.current, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds }
{ redis: redis, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds }
end

def process_tags
Expand Down
3 changes: 2 additions & 1 deletion app/services/activitypub/process_status_update_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

class ActivityPub::ProcessStatusUpdateService < BaseService
include JsonLdHelper
include Redisable

def call(status, json)
raise ArgumentError, 'Status has unsaved changes' if status.changed?
Expand Down Expand Up @@ -241,7 +242,7 @@ def expected_type?
end

def lock_options
{ redis: Redis.current, key: "create:#{@uri}", autorelease: 15.minutes.seconds }
{ redis: redis, key: "create:#{@uri}", autorelease: 15.minutes.seconds }
end

def record_previous_edit!
Expand Down
14 changes: 8 additions & 6 deletions app/services/fan_out_on_write_service.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

class FanOutOnWriteService < BaseService
include Redisable

# Push a status into home and mentions feeds
# @param [Status] status
# @param [Hash] options
Expand Down Expand Up @@ -99,20 +101,20 @@ def deliver_to_mentioned_followers!

def broadcast_to_hashtag_streams!
@status.tags.pluck(:name).each do |hashtag|
Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload)
Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local?
redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload)
redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local?
end
end

def broadcast_to_public_streams!
return if @status.reply? && @status.in_reply_to_account_id != @account.id

Redis.current.publish('timeline:public', anonymous_payload)
Redis.current.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload)
redis.publish('timeline:public', anonymous_payload)
redis.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload)

if @status.with_media?
Redis.current.publish('timeline:public:media', anonymous_payload)
Redis.current.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload)
redis.publish('timeline:public:media', anonymous_payload)
redis.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload)
end
end

Expand Down
4 changes: 3 additions & 1 deletion app/services/fetch_link_card_service.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

class FetchLinkCardService < BaseService
include Redisable

URL_PATTERN = %r{
(#{Twitter::TwitterText::Regex[:valid_url_preceding_chars]}) # $1 preceding chars
( # $2 URL
Expand Down Expand Up @@ -155,6 +157,6 @@ def attempt_opengraph
end

def lock_options
{ redis: Redis.current, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds }
{ redis: redis, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds }
end
end

0 comments on commit 3917353

Please sign in to comment.