Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[Cache] Add tags based invalidation + RedisTaggedAdapter
  • Loading branch information
nicolas-grekas committed Jun 17, 2016
1 parent 22f7ed7 commit 15c2bea
Show file tree
Hide file tree
Showing 12 changed files with 752 additions and 103 deletions.
217 changes: 217 additions & 0 deletions src/Symfony/Component/Cache/Adapter/AbstractTagAwareAdapter.php
@@ -0,0 +1,217 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Cache\Adapter;

use Psr\Cache\CacheItemInterface;
use Symfony\Component\Cache\CacheItem;

/**
* @author Nicolas Grekas <p@tchwork.com>
*/
abstract class AbstractTagAwareAdapter implements TagAwareAdapterInterface
{
private $adapter;
private $deferred = array();
private $createCacheItem;
private $getTagsByKey;

/**
* Removes tag-invalidated keys and returns the removed ones.
*
* @param array &$keys The keys to filter
*
* @return array The keys removed from $keys
*/
abstract protected function filterInvalidatedKeys(array &$keys);

/**
* Persists tags for cache keys.
*
* @param array $tags The tags for each cache keys as index
*
* @return bool True on success
*/
abstract protected function doSaveTags(array $tags);

public function __construct(AdapterInterface $adapter, $defaultLifetime)
{
$this->adapter = $adapter;
$this->createCacheItem = \Closure::bind(
function ($key) use ($defaultLifetime) {
$item = new CacheItem();
$item->key = $key;
$item->isHit = false;
$item->defaultLifetime = $defaultLifetime;

return $item;
},
null,
CacheItem::class
);
$this->getTagsByKey = \Closure::bind(
function ($deferred) {
$tagsByKey = array();
foreach ($deferred as $key => $item) {
$tagsByKey[$key] = $item->tags;
}

return $tagsByKey;
},
null,
CacheItem::class
);
}

/**
* {@inheritdoc}
*/
public function hasItem($key)
{
if ($this->deferred) {
$this->commit();
}
if (!$this->adapter->hasItem($key)) {
return false;
}
$keys = array($key);

return !$this->filterInvalidatedKeys($keys);
}

/**
* {@inheritdoc}
*/
public function getItem($key)
{
if ($this->deferred) {
$this->commit();
}
$keys = array($key);

if ($keys = $this->filterInvalidatedKeys($keys)) {
foreach ($this->generateItems(array(), $keys) as $item) {
return $item;
}
}

return $this->adapter->getItem($key);
}

/**
* {@inheritdoc}
*/
public function getItems(array $keys = array())
{
if ($this->deferred) {
$this->commit();
}
$invalids = $this->filterInvalidatedKeys($keys);
$items = $this->adapter->getItems($keys);

return $this->generateItems($items, $invalids);
}

/**
* {@inheritdoc}
*/
public function clear()
{
$this->deferred = array();

return $this->adapter->clear();
}

/**
* {@inheritdoc}
*/
public function deleteItem($key)
{
return $this->adapter->deleteItem($key);
}

/**
* {@inheritdoc}
*/
public function deleteItems(array $keys)
{
return $this->adapter->deleteItems($keys);
}

/**
* {@inheritdoc}
*/
public function save(CacheItemInterface $item)
{
if (!$item instanceof CacheItem) {
return false;
}
if ($this->deferred) {
$this->commit();
}
$this->deferred[$item->getKey()] = $item;

return $this->commit();
}

/**
* {@inheritdoc}
*/
public function saveDeferred(CacheItemInterface $item)
{
if (!$item instanceof CacheItem) {
return false;
}
$this->deferred[$item->getKey()] = $item;

return true;
}

/**
* {@inheritdoc}
*/
public function commit()
{
$ok = true;

if ($this->deferred) {
foreach ($this->deferred as $key => $item) {
if (!$this->adapter->saveDeferred($item)) {
unset($this->deferred[$key]);
$ok = false;
}
}
$f = $this->getTagsByKey;
$ok = $this->doSaveTags($f($this->deferred)) && $ok;
$this->deferred = array();
}

return $this->adapter->commit() && $ok;
}

public function __destruct()
{
$this->commit();
}

private function generateItems($items, $invalids)
{
foreach ($items as $key => $item) {
yield $key => $item;
}

$f = $this->createCacheItem;

foreach ($invalids as $key) {
yield $key => $f($key);
}
}
}
102 changes: 3 additions & 99 deletions src/Symfony/Component/Cache/Adapter/RedisAdapter.php
Expand Up @@ -12,8 +12,6 @@
namespace Symfony\Component\Cache\Adapter;

use Predis\Connection\Factory;
use Predis\Connection\Aggregate\PredisCluster;
use Predis\Connection\Aggregate\RedisCluster;
use Symfony\Component\Cache\Exception\InvalidArgumentException;

/**
Expand All @@ -22,29 +20,23 @@
*/
class RedisAdapter extends AbstractAdapter
{
use RedisAdapterTrait;

private static $defaultConnectionOptions = array(
'class' => null,
'persistent' => 0,
'timeout' => 0,
'read_timeout' => 0,
'retry_interval' => 0,
);
private $redis;

/**
* @param \Redis|\RedisArray|\RedisCluster|\Predis\Client $redisClient
*/
public function __construct($redisClient, $namespace = '', $defaultLifetime = 0)
{
parent::__construct($namespace, $defaultLifetime);

if (preg_match('#[^-+_.A-Za-z0-9]#', $namespace, $match)) {
throw new InvalidArgumentException(sprintf('RedisAdapter namespace contains "%s" but only characters in [-+_.A-Za-z0-9] are allowed.', $match[0]));
}
if (!$redisClient instanceof \Redis && !$redisClient instanceof \RedisArray && !$redisClient instanceof \RedisCluster && !$redisClient instanceof \Predis\Client) {
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given', __METHOD__, is_object($redisClient) ? get_class($redisClient) : gettype($redisClient)));
}
$this->redis = $redisClient;
$this->setRedis($redisClient, $namespace);
}

/**
Expand Down Expand Up @@ -157,51 +149,6 @@ protected function doHave($id)
return (bool) $this->redis->exists($id);
}

/**
* {@inheritdoc}
*/
protected function doClear($namespace)
{
// When using a native Redis cluster, clearing the cache cannot work and always returns false.
// Clearing the cache should then be done by any other means (e.g. by restarting the cluster).

$hosts = array($this->redis);
$evalArgs = array(array($namespace), 0);

if ($this->redis instanceof \Predis\Client) {
$evalArgs = array(0, $namespace);

$connection = $this->redis->getConnection();
if ($connection instanceof PredisCluster) {
$hosts = array();
foreach ($connection as $c) {
$hosts[] = new \Predis\Client($c);
}
} elseif ($connection instanceof RedisCluster) {
return false;
}
} elseif ($this->redis instanceof \RedisArray) {
foreach ($this->redis->_hosts() as $host) {
$hosts[] = $this->redis->_instance($host);
}
} elseif ($this->redis instanceof \RedisCluster) {
return false;
}
foreach ($hosts as $host) {
if (!isset($namespace[0])) {
$host->flushDb();
} else {
// As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
// can hang your server when it is executed against large databases (millions of items).
// Whenever you hit this scale, it is advised to deploy one Redis database per cache pool
// instead of using namespaces, so that FLUSHDB is used instead.
$host->eval("local keys=redis.call('KEYS',ARGV[1]..'*') for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end", $evalArgs[0], $evalArgs[1]);
}
}

return true;
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -248,47 +195,4 @@ protected function doSave(array $values, $lifetime)

return $failed;
}

private function execute($command, $id, array $args, $redis = null)
{
array_unshift($args, $id);
call_user_func_array(array($redis ?: $this->redis, $command), $args);
}

private function pipeline(\Closure $callback)
{
$redis = $this->redis;

try {
if ($redis instanceof \Predis\Client) {
$redis->pipeline(function ($pipe) use ($callback) {
$this->redis = $pipe;
$callback(array($this, 'execute'));
});
} elseif ($redis instanceof \RedisArray) {
$connections = array();
$callback(function ($command, $id, $args) use (&$connections) {
if (!isset($connections[$h = $this->redis->_target($id)])) {
$connections[$h] = $this->redis->_instance($h);
$connections[$h]->multi(\Redis::PIPELINE);
}
$this->execute($command, $id, $args, $connections[$h]);
});
foreach ($connections as $c) {
$c->exec();
}
} else {
$pipe = $redis->multi(\Redis::PIPELINE);
try {
$callback(array($this, 'execute'));
} finally {
if ($pipe) {
$redis->exec();
}
}
}
} finally {
$this->redis = $redis;
}
}
}

0 comments on commit 15c2bea

Please sign in to comment.