Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/solid_cache/cluster/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class Cluster
module Execution
def initialize(options = {})
super(options)
@background = Concurrent::SingleThreadExecutor.new(max_queue: 100, fallback_policy: :discard)
@background = Concurrent::FixedThreadPool.new(1, max_queue: 100, fallback_policy: :discard)
@active_record_instrumentation = options.fetch(:active_record_instrumentation, true)
end

Expand Down
17 changes: 17 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class ActiveSupport::TestCase
SolidCache::Entry.delete_all
end
end

teardown do
wait_for_background_tasks(@cache) if @cache
end
end

def lookup_store(options = {})
Expand All @@ -43,6 +47,19 @@ def send_entries_back_in_time(distance)
end
end

def wait_for_background_tasks(cache, timeout: 2)
timeout_at = Time.now + timeout
threadpools = cache.clusters.map { |cluster| cluster.instance_variable_get("@background") }

threadpools.each do |threadpool|
loop do
break if threadpool.completed_task_count == threadpool.scheduled_task_count
raise "Timeout waiting for cache background tasks" if Time.now > timeout_at
sleep 0.05
end
end
end

def uncached_entry_count
SolidCache.each_shard.sum { SolidCache::Entry.uncached { SolidCache::Entry.count } }
end
24 changes: 12 additions & 12 deletions test/unit/cluster_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ class ClusterTest < ActiveSupport::TestCase

test "writes to both clusters" do
@cache.write("foo", 1)
sleep 0.1
wait_for_background_tasks(@cache)
assert_equal 1, @cache.read("foo")
assert_equal 1, @primary_cache.read("foo")
assert_equal 1, @secondary_cache.read("foo")
end

test "reads from primary cluster" do
@cache.write("foo", 1)
sleep 0.1
wait_for_background_tasks(@cache)
assert_equal 1, @cache.read("foo")

@secondary_cache.delete("foo")
Expand All @@ -37,7 +37,7 @@ class ClusterTest < ActiveSupport::TestCase

test "fetch writes to both clusters" do
@cache.fetch("foo") { 1 }
sleep 0.1
wait_for_background_tasks(@cache)

assert_equal 1, @cache.read("foo")
assert_equal 1, @primary_cache.read("foo")
Expand All @@ -46,12 +46,12 @@ class ClusterTest < ActiveSupport::TestCase

test "fetch reads from primary clusters" do
@cache.fetch("foo") { 1 }
sleep 0.1
wait_for_background_tasks(@cache)
assert_equal 1, @cache.read("foo")

@primary_cache.delete("foo")
@cache.fetch("foo") { 2 }
sleep 0.1
wait_for_background_tasks(@cache)

assert_equal 2, @cache.read("foo")
assert_equal 2, @primary_cache.read("foo")
Expand All @@ -66,11 +66,11 @@ class ClusterTest < ActiveSupport::TestCase

test "deletes from both cluster" do
@cache.write("foo", 1)
sleep 0.1
wait_for_background_tasks(@cache)
assert_equal 1, @cache.read("foo")

@cache.delete("foo")
sleep 0.1
wait_for_background_tasks(@cache)

assert_nil @cache.read("foo")
assert_nil @primary_cache.read("foo")
Expand All @@ -80,22 +80,22 @@ class ClusterTest < ActiveSupport::TestCase
test "multi_writes to both clusters" do
values = { "foo" => "bar", "egg" => "spam" }
@cache.write_multi(values)
sleep 0.1
wait_for_background_tasks(@cache)
assert_equal values, @cache.read_multi("foo", "egg")
assert_equal values, @primary_cache.read_multi("foo", "egg")
assert_equal values, @secondary_cache.read_multi("foo", "egg")
end

test "increment and decrement hit both clusters" do
@cache.write("foo", 1, raw: true)
sleep 0.1
wait_for_background_tasks(@cache)

assert_equal 1, @cache.read("foo", raw: true).to_i
assert_equal 1, @primary_cache.read("foo", raw: true).to_i
assert_equal 1, @secondary_cache.read("foo", raw: true).to_i

@cache.increment("foo")
sleep 0.1
wait_for_background_tasks(@cache)

assert_equal 2, @cache.read("foo", raw: true).to_i
assert_equal 2, @primary_cache.read("foo", raw: true).to_i
Expand All @@ -104,7 +104,7 @@ class ClusterTest < ActiveSupport::TestCase
@secondary_cache.write("foo", 4, raw: true)

@cache.decrement("foo")
sleep 0.1
wait_for_background_tasks(@cache)

assert_equal 1, @cache.read("foo", raw: true).to_i
assert_equal 1, @primary_cache.read("foo", raw: true).to_i
Expand All @@ -121,7 +121,7 @@ class ClusterTest < ActiveSupport::TestCase
@secondary_cache = lookup_store(expires_in: 60, cluster: secondary_cluster)

@cache.write("foo", 1)
sleep 0.1
wait_for_background_tasks(@cache)
assert_equal 1, @cache.read("foo")
assert_equal 1, @primary_cache.read("foo")
assert_equal 1, @secondary_cache.read("foo")
Expand Down
10 changes: 5 additions & 5 deletions test/unit/execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def test_async_errors_are_reported
end

sleep 0.1

assert_equal 1, error_subscriber.errors.count
assert_equal "Boom!", error_subscriber.errors.first[0].message
if Rails.version >= "7.1"
Expand All @@ -32,6 +31,7 @@ def test_async_errors_are_reported
end
ensure
Rails.error.unsubscribe(error_subscriber) if Rails.error.respond_to?(:unsubscribe)
@cache = nil #  to avoid waiting for background tasks as the error one won't have completed
end

def test_active_record_instrumention_instrumented
Expand Down Expand Up @@ -77,13 +77,13 @@ def test_active_record_instrumention_expiry
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
sleep 0.1
wait_for_background_tasks(uninstrumented_cache)

assert_no_changes -> { calls } do
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
sleep 0.1
wait_for_background_tasks(uninstrumented_cache)
end
end

Expand All @@ -92,13 +92,13 @@ def test_active_record_instrumention_expiry
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
sleep 0.1
wait_for_background_tasks(instrumented_cache)

assert_changes -> { calls } do
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
sleep 0.1
wait_for_background_tasks(instrumented_cache)
end
end
end
Expand Down
18 changes: 9 additions & 9 deletions test/unit/expiry_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SolidCache::ExpiryTest < ActiveSupport::TestCase
@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)

sleep 0.1
wait_for_background_tasks(@cache)
perform_enqueued_jobs

assert_nil @cache.read(default_shard_keys[0])
Expand All @@ -44,12 +44,12 @@ class SolidCache::ExpiryTest < ActiveSupport::TestCase
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)

sleep 0.1
wait_for_background_tasks(@cache)

@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)

sleep 0.1
wait_for_background_tasks(@cache)
perform_enqueued_jobs

# Two records have been deleted
Expand All @@ -65,12 +65,12 @@ class SolidCache::ExpiryTest < ActiveSupport::TestCase
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)

sleep 0.1
wait_for_background_tasks(@cache)

@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)

sleep 0.1
wait_for_background_tasks(@cache)
perform_enqueued_jobs

# Three records have been deleted
Expand All @@ -86,7 +86,7 @@ class SolidCache::ExpiryTest < ActiveSupport::TestCase
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)

sleep 0.1
wait_for_background_tasks(@cache)
perform_enqueued_jobs

assert_equal 0, SolidCache.each_shard.sum { SolidCache::Entry.count }
Expand All @@ -101,7 +101,7 @@ class SolidCache::ExpiryTest < ActiveSupport::TestCase
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)

sleep 0.1
wait_for_background_tasks(@cache)
perform_enqueued_jobs

assert_equal 2, SolidCache.each_shard.sum { SolidCache::Entry.count }
Expand All @@ -124,15 +124,15 @@ class SolidCache::ExpiryTest < ActiveSupport::TestCase
assert_equal 3, @cache.read(shard_one_keys[0])
assert_equal 4, @cache.read(shard_one_keys[1])

sleep 0.1 # ensure they are marked as read
wait_for_background_tasks(@cache)
send_entries_back_in_time(3.weeks)

@cache.write(default_shard_keys[2], 5)
@cache.write(default_shard_keys[3], 6)
@cache.write(shard_one_keys[2], 7)
@cache.write(shard_one_keys[3], 8)

sleep 0.1
wait_for_background_tasks(@cache)
perform_enqueued_jobs

assert_nil @cache.read(default_shard_keys[0])
Expand Down