Skip to content
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ functions.
Named locks are offered. PostgreSQL locking functions require integers but the
conversion is handled automatically.

No timeouts are supported. If the connection to the database server is lost or
It supports timeouts. If the connection to the database server is lost or
interrupted, the lock is automatically released.

```php
Expand Down
1 change: 1 addition & 0 deletions src/Mutex/FlockMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class FlockMutex extends AbstractLockMutex
/** @var resource */
private $fileHandle;

/** In seconds */
private float $acquireTimeout;

/** @var self::STRATEGY_* */
Expand Down
4 changes: 2 additions & 2 deletions src/Mutex/MySQLMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class MySQLMutex extends AbstractLockMutex
/**
* @param float $acquireTimeout In seconds
*/
public function __construct(\PDO $PDO, string $name, float $acquireTimeout = 0)
public function __construct(\PDO $pdo, string $name, float $acquireTimeout = 0)
{
$this->pdo = $PDO;
$this->pdo = $pdo;

$namePrefix = LockUtil::getInstance()->getKeyPrefix() . ':';

Expand Down
40 changes: 35 additions & 5 deletions src/Mutex/PostgreSQLMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Malkusch\Lock\Mutex;

use Malkusch\Lock\Util\LockUtil;
use Malkusch\Lock\Util\Loop;

class PostgreSQLMutex extends AbstractLockMutex
{
Expand All @@ -13,9 +14,16 @@ class PostgreSQLMutex extends AbstractLockMutex
/** @var array{int, int} */
private array $key;

public function __construct(\PDO $PDO, string $name)
/** In seconds */
private float $acquireTimeout;

/**
* @param float $acquireTimeout In seconds
*/
public function __construct(\PDO $pdo, string $name, float $acquireTimeout = \INF)
{
$this->pdo = $PDO;
$this->pdo = $pdo;
$this->acquireTimeout = $acquireTimeout;

[$keyBytes1, $keyBytes2] = str_split(md5(LockUtil::getInstance()->getKeyPrefix() . ':' . $name, true), 4);

Expand All @@ -32,14 +40,36 @@ public function __construct(\PDO $PDO, string $name)
];
}

#[\Override]
protected function lock(): void
private function lockBlocking(): void
{
$statement = $this->pdo->prepare('SELECT pg_advisory_lock(?, ?)');

$statement->execute($this->key);
}

private function lockBusy(): void
{
$loop = new Loop();

$loop->execute(function () use ($loop): void {
$statement = $this->pdo->prepare('SELECT pg_try_advisory_lock(?, ?)');
$statement->execute($this->key);

if ($statement->fetchColumn()) {
$loop->end();
}
}, $this->acquireTimeout);
}

#[\Override]
protected function lock(): void
{
if ($this->acquireTimeout === \INF) {
$this->lockBlocking();
} else {
$this->lockBusy();
}
}

#[\Override]
protected function unlock(): void
{
Expand Down
7 changes: 7 additions & 0 deletions tests/Mutex/MutexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ static function ($uri) {

return new PostgreSQLMutex($pdo, 'test');
}];

yield 'PostgreSQLMutexWithTimoutLoop' => [static function () {
$pdo = new \PDO(getenv('PGSQL_DSN'), getenv('PGSQL_USER'), getenv('PGSQL_PASSWORD'));
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);

return new PostgreSQLMutex($pdo, 'test', 3);
}];
}
}
}
45 changes: 44 additions & 1 deletion tests/Mutex/PostgreSQLMutexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Malkusch\Lock\Tests\Mutex;

use Eloquent\Liberator\Liberator;
use Malkusch\Lock\Exception\LockAcquireTimeoutException;
use Malkusch\Lock\Mutex\PostgreSQLMutex;
use PHPUnit\Framework\Constraint\IsType;
use PHPUnit\Framework\MockObject\MockObject;
Expand All @@ -24,7 +26,7 @@ protected function setUp(): void

$this->pdo = $this->createMock(\PDO::class);

$this->mutex = new PostgreSQLMutex($this->pdo, 'test-one-negative-key');
$this->mutex = Liberator::liberate(new PostgreSQLMutex($this->pdo, 'test-one-negative-key')); // @phpstan-ignore assign.propertyType
}

private function isPhpunit9x(): bool
Expand Down Expand Up @@ -97,4 +99,45 @@ public function testReleaseLock(): void

\Closure::bind(static fn ($mutex) => $mutex->unlock(), null, PostgreSQLMutex::class)($this->mutex);
}

public function testAcquireTimeoutOccurs(): void
{
$statement = $this->createMock(\PDOStatement::class);

$this->pdo->expects(self::atLeastOnce())
->method('prepare')
->with('SELECT pg_try_advisory_lock(?, ?)')
->willReturn($statement);

$statement->expects(self::atLeastOnce())
->method('execute')
->with(self::logicalAnd(
new IsType(IsType::TYPE_ARRAY),
self::countOf(2),
self::callback(function (...$arguments) {
if ($this->isPhpunit9x()) { // https://github.com/sebastianbergmann/phpunit/issues/5891
$arguments = $arguments[0];
}

foreach ($arguments as $v) {
self::assertLessThan(1 << 32, $v);
self::assertGreaterThanOrEqual(-(1 << 32), $v);
self::assertIsInt($v);
}

return true;
}),
[533558444, -1716795572]
));

$statement->expects(self::atLeastOnce())
->method('fetchColumn')
->willReturn(false);

$this->mutex->acquireTimeout = 1.0; // @phpstan-ignore property.private

$this->expectException(LockAcquireTimeoutException::class);
$this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded');
\Closure::bind(static fn ($mutex) => $mutex->lock(), null, PostgreSQLMutex::class)($this->mutex);
}
}