Skip to content

Commit

Permalink
Fix signal dispatching on RedisPublisher.request(…) #1837
Browse files Browse the repository at this point in the history
RedisPublisher's NO_DEMAND.request(…) while switching to DEMAND now calls onDataAvailable(…) on the currently active state to ensure data signalling if there's (still) demand.

Previously, we called onDataAvailable(…) on the current NO_DEMAND state anticipating that if there's no data yet, then someone will read data from the channel and once it's there, we will be notified to emit it. Apparently, we can have data already in our buffer and so, upon requesting data, no one will notify us. Data was lingering in the publisher buffer and wasn't emitted. That caused downstream subscribers to hang indefinitely.

Data emission (and command completion) can happen when responses for a command. The request size doesn't correlate with the response that we receive from Redis. Redis can respond with less, exactly or more items. In case we receive from Redis more items than were requested, we buffer these to comply with RS back pressure semantics. After emitting the last requested item to the subscription, we switch from DEMAND (READING) to NO_DEMAND. And the issue exactly starts there.

We now call onDataAvailable(…) on the current state to ensure that when the state is DEMAND, that we will emit all data we have in our buffers.
  • Loading branch information
mp911de committed Sep 14, 2021
1 parent 694f258 commit 6ff2441
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/RedisPublisher.java
Expand Up @@ -542,7 +542,7 @@ void request(RedisSubscription<?> subscription, long n) {
}

subscription.potentiallyReadMore();
onDataAvailable(subscription);
subscription.state().onDataAvailable(subscription);
} else {
onError(subscription, Exceptions.nullOrNegativeRequestException(n));
}
Expand Down
83 changes: 83 additions & 0 deletions src/test/kotlin/io/lettuce/core/CoroutinesIntegrationTests.kt
@@ -0,0 +1,83 @@
/*
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core

import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.coroutines
import io.lettuce.test.LettuceExtension
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import javax.inject.Inject

/**
* Integration tests for Coroutines.
*
* @author Mark Paluch
*/
@ExtendWith(LettuceExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@OptIn(ExperimentalLettuceCoroutinesApi::class)
internal class CoroutinesIntegrationTests @Inject constructor(private val connection: StatefulRedisConnection<String, String>) :
TestSupport() {

@Test
@Timeout(10)
fun shouldRepeatCoroutinesFlowExternalLoop() {

// The usage of 128 here is meaningful, as mentioned above this is only
// observed with multiples of 64 in the size of the requested set of keys.
val array = Array(128) { "111" }

repeat(1000) {
runBlocking {

connection.coroutines().mget(
*array
).mapNotNull { result ->
if (result.hasValue()) {
result.value as String
} else null
}.toList()
}
}
}

@Test
@Timeout(10)
fun shouldRepeatCoroutinesFlowInternalLoop() {

// The usage of 128 here is meaningful, as mentioned above this is only
// observed with multiples of 64 in the size of the requested set of keys.
val array = Array(128) { "111" }

runBlocking {
repeat(1000) {
connection.coroutines().mget(
*array
).mapNotNull { result ->
if (result.hasValue()) {
result.value as String
} else null
}.toList()
}
}
}
}

0 comments on commit 6ff2441

Please sign in to comment.