|
4 | 4 | require "minitest/mock" |
5 | 5 | require "stubs/test_connection" |
6 | 6 | require "stubs/room" |
| 7 | +require "concurrent/atomic/cyclic_barrier" |
7 | 8 |
|
8 | 9 | module ActionCable::StreamTests |
9 | 10 | class Connection < ActionCable::Connection::Base |
@@ -280,6 +281,63 @@ class StreamTest < ActionCable::TestCase |
280 | 281 | end |
281 | 282 | end |
282 | 283 |
|
| 284 | + test "concurrent unsubscribe_from_channel and stream_from do not raise RuntimeError" do |
| 285 | + threads = [] |
| 286 | + run_in_eventmachine do |
| 287 | + connection = TestConnection.new |
| 288 | + connection.pubsub.unsubscribe_latency = 0.1 |
| 289 | + |
| 290 | + channel = ChatChannel.new connection, "{id: 1}", id: 1 |
| 291 | + channel.subscribe_to_channel |
| 292 | + |
| 293 | + # Set up initial streams |
| 294 | + channel.stream_from "room_one" |
| 295 | + channel.stream_from "room_two" |
| 296 | + wait_for_async |
| 297 | + |
| 298 | + # Create barriers to synchronize thread execution |
| 299 | + barrier = Concurrent::CyclicBarrier.new(2) |
| 300 | + |
| 301 | + exception_caught = nil |
| 302 | + |
| 303 | + # Thread 1: calls unsubscribe_from_channel |
| 304 | + thread1 = Thread.new do |
| 305 | + barrier.wait |
| 306 | + # Add a small delay to increase the chance of concurrent execution |
| 307 | + sleep 0.001 |
| 308 | + channel.unsubscribe_from_channel |
| 309 | + rescue => e |
| 310 | + exception_caught = e |
| 311 | + ensure |
| 312 | + barrier.wait |
| 313 | + end |
| 314 | + threads << thread1 |
| 315 | + |
| 316 | + # Thread 2: calls stream_from during unsubscribe_from_channel iteration |
| 317 | + thread2 = Thread.new do |
| 318 | + barrier.wait |
| 319 | + # Try to add streams while unsubscribe_from_channel is potentially iterating |
| 320 | + 10.times do |i| |
| 321 | + channel.stream_from "concurrent_room_#{i}" |
| 322 | + sleep 0.0001 # Small delay to interleave with unsubscribe_from_channel |
| 323 | + end |
| 324 | + rescue => e |
| 325 | + exception_caught = e |
| 326 | + ensure |
| 327 | + barrier.wait |
| 328 | + end |
| 329 | + threads << thread2 |
| 330 | + |
| 331 | + thread1.join |
| 332 | + thread2.join |
| 333 | + |
| 334 | + # Ensure no RuntimeError was raised during concurrent access |
| 335 | + assert_nil exception_caught, "Concurrent unsubscribe_from_channel and stream_from should not raise RuntimeError: #{exception_caught}" |
| 336 | + end |
| 337 | + ensure |
| 338 | + threads.each(&:kill) |
| 339 | + end |
| 340 | + |
283 | 341 | private |
284 | 342 | def subscribers_of(connection) |
285 | 343 | connection |
|
0 commit comments