Skip to content

Commit

Permalink
Don't perform blocking connect inside the BlockingConnectionQueue Con…
Browse files Browse the repository at this point in the history
…dition variable. (#2997)
  • Loading branch information
kristjanvalur committed Oct 11, 2023
1 parent 054caf3 commit d1dbb15
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,21 +1027,25 @@ def can_get_connection(self) -> bool:
)

async def get_connection(self, command_name, *keys, **options):
"""Get a connection from the pool"""
"""Get a connected connection from the pool"""
connection = self.get_available_connection()
try:
await self.ensure_connection(connection)
except BaseException:
await self.release(connection)
raise

return connection

def get_available_connection(self):
"""Get a connection from the pool, without making sure it is connected"""
try:
connection = self._available_connections.pop()
except IndexError:
if len(self._in_use_connections) >= self.max_connections:
raise ConnectionError("Too many connections") from None
connection = self.make_connection()
self._in_use_connections.add(connection)

try:
await self.ensure_connection(connection)
except BaseException:
await self.release(connection)
raise

return connection

def get_encoder(self):
Expand Down Expand Up @@ -1166,13 +1170,21 @@ def __init__(
async def get_connection(self, command_name, *keys, **options):
"""Gets a connection from the pool, blocking until one is available"""
try:
async with async_timeout(self.timeout):
async with self._condition:
async with self._condition:
async with async_timeout(self.timeout):
await self._condition.wait_for(self.can_get_connection)
return await super().get_connection(command_name, *keys, **options)
connection = super().get_available_connection()
except asyncio.TimeoutError as err:
raise ConnectionError("No connection available.") from err

# We now perform the connection check outside of the lock.
try:
await self.ensure_connection(connection)
return connection
except BaseException:
await self.release(connection)
raise

async def release(self, connection: AbstractConnection):
"""Releases the connection back to the pool."""
async with self._condition:
Expand Down

0 comments on commit d1dbb15

Please sign in to comment.