From 2c89543914cfe9421483abd3acd46b9550228f30 Mon Sep 17 00:00:00 2001 From: Thomas Steur Date: Thu, 14 Jan 2016 00:42:48 +0000 Subject: [PATCH] refs #23 support for Redis sentinel by using a different backend --- Queue/Backend.php | 2 + Queue/Backend/Redis.php | 32 +- Queue/Backend/Sentinel.php | 36 + Queue/Factory.php | 14 +- Queue/Lock.php | 3 +- Queue/Manager.php | 3 +- README.md | 17 +- libs/credis/Client.php | 1284 +++++++++++++++++ libs/credis/Cluster.php | 317 ++++ libs/credis/LICENSE | 23 + libs/credis/README.markdown | 171 +++ libs/credis/Sentinel.php | 255 ++++ libs/credis/composer.json | 23 + plugin.json | 2 +- .../Queue/Backend/SentinelTest.php | 35 + tests/Integration/Queue/FactoryTest.php | 10 + 16 files changed, 2208 insertions(+), 19 deletions(-) create mode 100644 Queue/Backend/Sentinel.php create mode 100755 libs/credis/Client.php create mode 100755 libs/credis/Cluster.php create mode 100755 libs/credis/LICENSE create mode 100755 libs/credis/README.markdown create mode 100755 libs/credis/Sentinel.php create mode 100755 libs/credis/composer.json create mode 100644 tests/Integration/Queue/Backend/SentinelTest.php diff --git a/Queue/Backend.php b/Queue/Backend.php index 90b2ef5..e104375 100644 --- a/Queue/Backend.php +++ b/Queue/Backend.php @@ -32,4 +32,6 @@ public function deleteIfKeyHasValue($key, $value); public function expireIfKeyHasValue($key, $value, $ttlInSeconds); public function get($key); + + public function getKeysMatchingPattern($pattern); } diff --git a/Queue/Backend/Redis.php b/Queue/Backend/Redis.php index c9b9828..64b1a5e 100644 --- a/Queue/Backend/Redis.php +++ b/Queue/Backend/Redis.php @@ -11,23 +11,22 @@ use Piwik\Log; use Piwik\Plugins\QueuedTracking\Queue\Backend; use Piwik\Tracker; -use Piwik\Translate; class Redis implements Backend { /** * @var \Redis */ - private $redis; - private $host; - private $port; - private $timeout; - private $password; + protected $redis; + protected $host; + protected $port; + protected $timeout; + protected $password; /** * @var int */ - private $database; + protected $database; public function testConnection() { @@ -162,8 +161,14 @@ public function deleteIfKeyHasValue($key, $value) else return 0 end'; + // ideally we would use evalSha to reduce bandwidth! - return (bool) $this->redis->eval($script, array($key, $value), 1); + return (bool) $this->evalScript($script, array($key), array($value)); + } + + protected function evalScript($script, $keys, $args) + { + return $this->redis->eval($script, array_merge($keys, $args), count($keys)); } public function getKeysMatchingPattern($pattern) @@ -187,7 +192,7 @@ public function expireIfKeyHasValue($key, $value, $ttlInSeconds) return 0 end'; // ideally we would use evalSha to reduce bandwidth! - return (bool) $this->redis->eval($script, array($key, $value, (int) $ttlInSeconds), 1); + return (bool) $this->evalScript($script, array($key), array($value, (int) $ttlInSeconds)); } public function get($key) @@ -213,7 +218,7 @@ private function connectIfNeeded() } } - private function connect() + protected function connect() { $this->redis = new \Redis(); $success = $this->redis->connect($this->host, $this->port, $this->timeout, null, 100); @@ -235,8 +240,11 @@ public function setConfig($host, $port, $timeout, $password) $this->host = $host; $this->port = $port; - $this->timeout = $timeout; - $this->password = $password; + $this->timeout = $timeout; + + if (!empty($password)) { + $this->password = $password; + } } private function disconnect() diff --git a/Queue/Backend/Sentinel.php b/Queue/Backend/Sentinel.php new file mode 100644 index 0000000..6a42c96 --- /dev/null +++ b/Queue/Backend/Sentinel.php @@ -0,0 +1,36 @@ +host, $this->port, $this->timeout, $persistent = false, $this->database, $this->password); + $this->redis = new \Credis_Sentinel($client); + $client->connect(); + + $this->redis = $client; + + return true; + } + + protected function evalScript($script, $keys, $args) + { + return $this->redis->eval($script, $keys, $args); + } + +} diff --git a/Queue/Factory.php b/Queue/Factory.php index f158f9d..317e1dd 100644 --- a/Queue/Factory.php +++ b/Queue/Factory.php @@ -9,6 +9,7 @@ namespace Piwik\Plugins\QueuedTracking\Queue; +use Piwik\Config; use Piwik\Container\StaticContainer; use Piwik\Plugins\QueuedTracking\Queue; use Piwik\Plugins\QueuedTracking\Settings; @@ -48,6 +49,11 @@ public static function getSettings() return StaticContainer::get('Piwik\Plugins\QueuedTracking\Settings'); } + private static function getConfig() + { + return Config::getInstance(); + } + private static function makeBackendFromSettings(Settings $settings) { $host = $settings->redisHost->getValue(); @@ -56,7 +62,13 @@ private static function makeBackendFromSettings(Settings $settings) $password = $settings->redisPassword->getValue(); $database = $settings->redisDatabase->getValue(); - $redis = new Queue\Backend\Redis(); + $queuedTracking = self::getConfig()->QueuedTracking; + if (!empty($queuedTracking['backend']) && $queuedTracking['backend'] === 'sentinel') { + $redis = new Queue\Backend\Sentinel(); + } else { + $redis = new Queue\Backend\Redis(); + } + $redis->setConfig($host, $port, $timeout, $password); $redis->setDatabase($database); diff --git a/Queue/Lock.php b/Queue/Lock.php index 780028a..9d47893 100644 --- a/Queue/Lock.php +++ b/Queue/Lock.php @@ -11,12 +11,11 @@ use Piwik\Common; use Piwik\Plugins\QueuedTracking\Queue\Backend; use Piwik\Tracker; -use Piwik\Plugins\QueuedTracking\Queue\Backend\Redis; class Lock { /** - * @var Redis + * @var Backend */ private $backend; diff --git a/Queue/Manager.php b/Queue/Manager.php index 069b83c..c106dc5 100644 --- a/Queue/Manager.php +++ b/Queue/Manager.php @@ -13,12 +13,11 @@ use Piwik\Plugins\QueuedTracking\Queue\Backend; use Piwik\Tracker\RequestSet; use Piwik\Tracker; -use Piwik\Plugins\QueuedTracking\Queue\Backend\Redis; class Manager { /** - * @var Redis + * @var Backend */ private $backend; diff --git a/README.md b/README.md index 078123f..5bb4aca 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ __How should the redis server be configured?__ Make sure to have enough memory to save all tracking requests in the queue. One tracking request in the queue takes about 2KB, 20.000 tracking requests take about 50MB. All tracking requests of all websites are stored in the same queue. There should be only one Redis server to make sure the data will be replayed in the same order as they were recorded. -If you want to configure Redis HA (High Availability) it should be possible to use Redis Cluser, Redis Sentinel, ... +If you want to configure Redis HA (High Availability) it is possible to use Redis Sentinel see further down. We currently write into the Redis default database by default but you can configure to use a different one. __Why do some tests fail on my local Piwik instance?__ @@ -124,6 +124,17 @@ __I am using the Log Importer in combination with Queued Tracking, is there some Yes, we recommend to set the "Number of requests to process" to `1` as the log importer usually sends multiple requests at once using bulk tracking already. +__How can I configure the QueuedTracking plugin to use Sentinel?__ + +Add the following configuration to your `config/config.ini.php` to enable Sentinel feature: + +``` +[QueuedTracking] +backend=sentinel +``` + +In this case the `phpredis` extension is not needed as it uses a PHP class to connect to your Redis. Please note that calls to Redis might be a little bit slower. + __Are there any known issues?__ * In case you are using bulk tracking the bulk tracking response varies compared to the regular one. We will always return @@ -134,6 +145,10 @@ __Are there any known issues?__ ## Changelog +0.3.0 + +- Added support to use Redis Sentinel for automatic failover + 0.2.5 - Use a better random number generator if available on the system to more evenly process queues. diff --git a/libs/credis/Client.php b/libs/credis/Client.php new file mode 100755 index 0000000..b04744f --- /dev/null +++ b/libs/credis/Client.php @@ -0,0 +1,1284 @@ + + * @copyright 2011 Colin Mollenhour + * @license http://www.opensource.org/licenses/mit-license.php The MIT License + * @package Credis_Client + */ + +if( ! defined('CRLF')) define('CRLF', sprintf('%s%s', chr(13), chr(10))); + +/** + * Credis-specific errors, wraps native Redis errors + */ +class CredisException extends Exception +{ + + const CODE_TIMED_OUT = 1; + const CODE_DISCONNECTED = 2; + + public function __construct($message, $code = 0, $exception = NULL) + { + if ($exception && get_class($exception) == 'RedisException' && $message == 'read error on connection') { + $code = CredisException::CODE_DISCONNECTED; + } + parent::__construct($message, $code, $exception); + } + +} + +/** + * Credis_Client, a lightweight Redis PHP standalone client and phpredis wrapper + * + * Server/Connection: + * @method Credis_Client pipeline() + * @method Credis_Client multi() + * @method array exec() + * @method string flushAll() + * @method string flushDb() + * @method array info() + * @method bool|array config(string $setGet, string $key, string $value = null) + * + * Keys: + * @method int del(string $key) + * @method int exists(string $key) + * @method int expire(string $key, int $seconds) + * @method int expireAt(string $key, int $timestamp) + * @method array keys(string $key) + * @method int persist(string $key) + * @method bool rename(string $key, string $newKey) + * @method bool renameNx(string $key, string $newKey) + * @method array sort(string $key, string $arg1, string $valueN = null) + * @method int ttl(string $key) + * @method string type(string $key) + * + * Scalars: + * @method int append(string $key, string $value) + * @method int decr(string $key) + * @method int decrBy(string $key, int $decrement) + * @method bool|string get(string $key) + * @method int getBit(string $key, int $offset) + * @method string getRange(string $key, int $start, int $end) + * @method string getSet(string $key, string $value) + * @method int incr(string $key) + * @method int incrBy(string $key, int $decrement) + * @method array mGet(array $keys) + * @method bool mSet(array $keysValues) + * @method int mSetNx(array $keysValues) + * @method bool set(string $key, string $value) + * @method int setBit(string $key, int $offset, int $value) + * @method bool setEx(string $key, int $seconds, string $value) + * @method int setNx(string $key, string $value) + * @method int setRange(string $key, int $offset, int $value) + * @method int strLen(string $key) + * + * Sets: + * @method int sAdd(string $key, mixed $value, string $valueN = null) + * @method int sRem(string $key, mixed $value, string $valueN = null) + * @method array sMembers(string $key) + * @method array sUnion(mixed $keyOrArray, string $valueN = null) + * @method array sInter(mixed $keyOrArray, string $valueN = null) + * @method array sDiff(mixed $keyOrArray, string $valueN = null) + * @method string sPop(string $key) + * @method int sCard(string $key) + * @method int sIsMember(string $key, string $member) + * @method int sMove(string $source, string $dest, string $member) + * @method string|array sRandMember(string $key, int $count = null) + * @method int sUnionStore(string $dest, string $key1, string $key2 = null) + * @method int sInterStore(string $dest, string $key1, string $key2 = null) + * @method int sDiffStore(string $dest, string $key1, string $key2 = null) + * + * Hashes: + * @method bool|int hSet(string $key, string $field, string $value) + * @method bool hSetNx(string $key, string $field, string $value) + * @method bool|string hGet(string $key, string $field) + * @method bool|int hLen(string $key) + * @method bool hDel(string $key, string $field) + * @method array hKeys(string $key, string $field) + * @method array hVals(string $key) + * @method array hGetAll(string $key) + * @method bool hExists(string $key, string $field) + * @method int hIncrBy(string $key, string $field, int $value) + * @method bool hMSet(string $key, array $keysValues) + * @method array hMGet(string $key, array $fields) + * + * Lists: + * @method array|null blPop(string $keyN, int $timeout) + * @method array|null brPop(string $keyN, int $timeout) + * @method array|null brPoplPush(string $source, string $destination, int $timeout) + * @method string|null lIndex(string $key, int $index) + * @method int lInsert(string $key, string $beforeAfter, string $pivot, string $value) + * @method int lLen(string $key) + * @method string|null lPop(string $key) + * @method int lPush(string $key, mixed $value, mixed $valueN = null) + * @method int lPushX(string $key, mixed $value) + * @method array lRange(string $key, int $start, int $stop) + * @method int lRem(string $key, int $count, mixed $value) + * @method bool lSet(string $key, int $index, mixed $value) + * @method bool lTrim(string $key, int $start, int $stop) + * @method string|null rPop(string $key) + * @method string|null rPoplPush(string $source, string $destination) + * @method int rPush(string $key, mixed $value, mixed $valueN = null) + * @method int rPushX(string $key, mixed $value) + * + * Sorted Sets: + * @method array zrangebyscore(string $key, mixed $start, mixed $stop, array $args = null) + * TODO + * + * Pub/Sub + * @method int publish(string $channel, string $message) + * @method int|array pubsub(string $subCommand, $arg = NULL) + * + * Scripting: + * @method string|int script(string $command, string $arg1 = null) + * @method string|int|array|bool eval(string $script, array $keys = NULL, array $args = NULL) + * @method string|int|array|bool evalSha(string $script, array $keys = NULL, array $args = NULL) + */ +class Credis_Client { + + const TYPE_STRING = 'string'; + const TYPE_LIST = 'list'; + const TYPE_SET = 'set'; + const TYPE_ZSET = 'zset'; + const TYPE_HASH = 'hash'; + const TYPE_NONE = 'none'; + const FREAD_BLOCK_SIZE = 8192; + + /** + * Socket connection to the Redis server or Redis library instance + * @var resource|Redis + */ + protected $redis; + protected $redisMulti; + + /** + * Host of the Redis server + * @var string + */ + protected $host; + + /** + * Port on which the Redis server is running + * @var integer + */ + protected $port; + + /** + * Timeout for connecting to Redis server + * @var float + */ + protected $timeout; + + /** + * Timeout for reading response from Redis server + * @var float + */ + protected $readTimeout; + + /** + * Unique identifier for persistent connections + * @var string + */ + protected $persistent; + + /** + * @var bool + */ + protected $closeOnDestruct = TRUE; + + /** + * @var bool + */ + protected $connected = FALSE; + + /** + * @var bool + */ + protected $standalone; + + /** + * @var int + */ + protected $maxConnectRetries = 0; + + /** + * @var int + */ + protected $connectFailures = 0; + + /** + * @var bool + */ + protected $usePipeline = FALSE; + + /** + * @var array + */ + protected $commandNames; + + /** + * @var string + */ + protected $commands; + + /** + * @var bool + */ + protected $isMulti = FALSE; + + /** + * @var bool + */ + protected $isWatching = FALSE; + + /** + * @var string + */ + protected $authPassword; + + /** + * @var int + */ + protected $selectedDb = 0; + + /** + * Aliases for backwards compatibility with phpredis + * @var array + */ + protected $wrapperMethods = array('delete' => 'del', 'getkeys' => 'keys', 'sremove' => 'srem'); + + /** + * @var array + */ + protected $renamedCommands; + + /** + * @var int + */ + protected $requests = 0; + + /** + * @var bool + */ + protected $subscribed = false; + + + /** + * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}. + * $host may also be a path to a unix socket or a string in the form of tcp://[hostname]:[port] or unix://[path] + * + * @param string $host The hostname of the Redis server + * @param integer $port The port number of the Redis server + * @param float $timeout Timeout period in seconds + * @param string $persistent Flag to establish persistent connection + * @param int $db The selected datbase of the Redis server + * @param string $password The authentication password of the Redis server + */ + public function __construct($host = '127.0.0.1', $port = 6379, $timeout = null, $persistent = '', $db = 0, $password = null) + { + $this->host = (string) $host; + $this->port = (int) $port; + $this->timeout = $timeout; + $this->persistent = (string) $persistent; + $this->standalone = ! extension_loaded('redis'); + $this->authPassword = $password; + $this->selectedDb = (int)$db; + $this->convertHost(); + } + + public function __destruct() + { + if ($this->closeOnDestruct) { + $this->close(); + } + } + + /** + * @return bool + */ + public function isSubscribed() + { + return $this->subscribed; + } + + /** + * Return the host of the Redis instance + * @return string + */ + public function getHost() + { + return $this->host; + } + /** + * Return the port of the Redis instance + * @return int + */ + public function getPort() + { + return $this->port; + } + + /** + * Return the selected database + * @return int + */ + public function getSelectedDb() + { + return $this->selectedDb; + } + /** + * @return string + */ + public function getPersistence() + { + return $this->persistent; + } + /** + * @throws CredisException + * @return Credis_Client + */ + public function forceStandalone() + { + if ($this->standalone) { + return $this; + } + if($this->connected) { + throw new CredisException('Cannot force Credis_Client to use standalone PHP driver after a connection has already been established.'); + } + $this->standalone = TRUE; + return $this; + } + + /** + * @param int $retries + * @return Credis_Client + */ + public function setMaxConnectRetries($retries) + { + $this->maxConnectRetries = $retries; + return $this; + } + + /** + * @param bool $flag + * @return Credis_Client + */ + public function setCloseOnDestruct($flag) + { + $this->closeOnDestruct = $flag; + return $this; + } + protected function convertHost() + { + if (preg_match('#^(tcp|unix)://(.*)$#', $this->host, $matches)) { + if($matches[1] == 'tcp') { + if ( ! preg_match('#^([^:]+)(:([0-9]+))?(/(.+))?$#', $matches[2], $matches)) { + throw new CredisException('Invalid host format; expected tcp://host[:port][/persistence_identifier]'); + } + $this->host = $matches[1]; + $this->port = (int) (isset($matches[3]) ? $matches[3] : 6379); + $this->persistent = isset($matches[5]) ? $matches[5] : ''; + } else { + $this->host = $matches[2]; + $this->port = NULL; + if (substr($this->host,0,1) != '/') { + throw new CredisException('Invalid unix socket format; expected unix:///path/to/redis.sock'); + } + } + } + if ($this->port !== NULL && substr($this->host,0,1) == '/') { + $this->port = NULL; + } + } + /** + * @throws CredisException + * @return Credis_Client + */ + public function connect() + { + if ($this->connected) { + return $this; + } + if ($this->standalone) { + $flags = STREAM_CLIENT_CONNECT; + $remote_socket = $this->port === NULL + ? 'unix://'.$this->host + : 'tcp://'.$this->host.':'.$this->port; + if ($this->persistent) { + if ($this->port === NULL) { // Unix socket + throw new CredisException('Persistent connections to UNIX sockets are not supported in standalone mode.'); + } + $remote_socket .= '/'.$this->persistent; + $flags = $flags | STREAM_CLIENT_PERSISTENT; + } + $result = $this->redis = @stream_socket_client($remote_socket, $errno, $errstr, $this->timeout !== null ? $this->timeout : 2.5, $flags); + } + else { + if ( ! $this->redis) { + $this->redis = new Redis; + } + $result = $this->persistent + ? $this->redis->pconnect($this->host, $this->port, $this->timeout, $this->persistent) + : $this->redis->connect($this->host, $this->port, $this->timeout); + } + + // Use recursion for connection retries + if ( ! $result) { + $this->connectFailures++; + if ($this->connectFailures <= $this->maxConnectRetries) { + return $this->connect(); + } + $failures = $this->connectFailures; + $this->connectFailures = 0; + throw new CredisException("Connection to Redis failed after $failures failures." . (isset($errno) && isset($errstr) ? "Last Error : ({$errno}) {$errstr}" : "")); + } + + $this->connectFailures = 0; + $this->connected = TRUE; + + // Set read timeout + if ($this->readTimeout) { + $this->setReadTimeout($this->readTimeout); + } + + if($this->authPassword !== null) { + $this->auth($this->authPassword); + } + if($this->selectedDb !== 0) { + $this->select($this->selectedDb); + } + return $this; + } + /** + * @return bool + */ + public function isConnected() + { + return $this->connected; + } + /** + * Set the read timeout for the connection. Use 0 to disable timeouts entirely (or use a very long timeout + * if not supported). + * + * @param int $timeout 0 (or -1) for no timeout, otherwise number of seconds + * @throws CredisException + * @return Credis_Client + */ + public function setReadTimeout($timeout) + { + if ($timeout < -1) { + throw new CredisException('Timeout values less than -1 are not accepted.'); + } + $this->readTimeout = $timeout; + if ($this->connected) { + if ($this->standalone) { + $timeout = $timeout <= 0 ? 315360000 : $timeout; // Ten-year timeout + stream_set_blocking($this->redis, TRUE); + stream_set_timeout($this->redis, (int) floor($timeout), ($timeout - floor($timeout)) * 1000000); + } else if (defined('Redis::OPT_READ_TIMEOUT')) { + // supported in phpredis 2.2.3 + // a timeout value of -1 means reads will not timeout + $timeout = $timeout == 0 ? -1 : $timeout; + $this->redis->setOption(Redis::OPT_READ_TIMEOUT, $timeout); + } + } + return $this; + } + + /** + * @return bool + */ + public function close() + { + $result = TRUE; + if ($this->connected && ! $this->persistent) { + try { + $result = $this->standalone ? fclose($this->redis) : $this->redis->close(); + $this->connected = FALSE; + } catch (Exception $e) { + ; // Ignore exceptions on close + } + } + return $result; + } + + /** + * Enabled command renaming and provide mapping method. Supported methods are: + * + * 1. renameCommand('foo') // Salted md5 hash for all commands -> md5('foo'.$command) + * 2. renameCommand(function($command){ return 'my'.$command; }); // Callable + * 3. renameCommand('get', 'foo') // Single command -> alias + * 4. renameCommand(['get' => 'foo', 'set' => 'bar']) // Full map of [command -> alias] + * + * @param string|callable|array $command + * @param string|null $alias + * @return $this + */ + public function renameCommand($command, $alias = NULL) + { + if ( ! $this->standalone) { + $this->forceStandalone(); + } + if ($alias === NULL) { + $this->renamedCommands = $command; + } else { + if ( ! $this->renamedCommands) { + $this->renamedCommands = array(); + } + $this->renamedCommands[$command] = $alias; + } + return $this; + } + + /** + * @param $command + */ + public function getRenamedCommand($command) + { + static $map; + + // Command renaming not enabled + if ($this->renamedCommands === NULL) { + return $command; + } + + // Initialize command map + if ($map === NULL) { + if (is_array($this->renamedCommands)) { + $map = $this->renamedCommands; + } else { + $map = array(); + } + } + + // Generate and return cached result + if ( ! isset($map[$command])) { + // String means all commands are hashed with salted md5 + if (is_string($this->renamedCommands)) { + $map[$command] = md5($this->renamedCommands.$command); + } + // Would already be set in $map if it was intended to be renamed + else if (is_array($this->renamedCommands)) { + return $command; + } + // User-supplied function + else if (is_callable($this->renamedCommands)) { + $map[$command] = call_user_func($this->renamedCommands, $command); + } + } + return $map[$command]; + } + + /** + * @param string $password + * @return bool + */ + public function auth($password) + { + $response = $this->__call('auth', array($password)); + $this->authPassword = $password; + return $response; + } + + /** + * @param int $index + * @return bool + */ + public function select($index) + { + $response = $this->__call('select', array($index)); + $this->selectedDb = (int) $index; + return $response; + } + + /** + * @param string|array $pattern + * @return array + */ + public function pUnsubscribe() + { + list($command, $channel, $subscribedChannels) = $this->__call('punsubscribe', func_get_args()); + $this->subscribed = $subscribedChannels > 0; + return array($command, $channel, $subscribedChannels); + } + + /** + * @param int $Iterator + * @param string $pattern + * @param int $Iterator + * @return bool | Array + */ + public function scan(&$Iterator, $pattern = null, $count = null) + { + return $this->__call('scan', array(&$Iterator, $pattern, $count)); + } + + /** + * @param int $Iterator + * @param string $pattern + * @param int $Iterator + * @return bool | Array + */ + public function hscan(&$Iterator, $pattern = null, $count = null) + { + return $this->__call('hscan', array(&$Iterator, $pattern, $count)); + } + + /** + * @param int $Iterator + * @param string $pattern + * @param int $Iterator + * @return bool | Array + */ + public function sscan(&$Iterator, $pattern = null, $count = null) + { + return $this->__call('sscan', array(&$Iterator, $pattern, $count)); + } + + /** + * @param int $Iterator + * @param string $pattern + * @param int $Iterator + * @return bool | Array + */ + public function zscan(&$Iterator, $pattern = null, $count = null) + { + return $this->__call('zscan', array(&$Iterator, $pattern, $count)); + } + + /** + * @param string|array $patterns + * @param $callback + * @return $this|array|bool|Credis_Client|mixed|null|string + * @throws CredisException + */ + public function pSubscribe($patterns, $callback) + { + if ( ! $this->standalone) { + return $this->__call('pSubscribe', array((array)$patterns, $callback)); + } + + // Standalone mode: use infinite loop to subscribe until timeout + $patternCount = is_array($patterns) ? count($patterns) : 1; + while ($patternCount--) { + if (isset($status)) { + list($command, $pattern, $status) = $this->read_reply(); + } else { + list($command, $pattern, $status) = $this->__call('psubscribe', array($patterns)); + } + $this->subscribed = $status > 0; + if ( ! $status) { + throw new CredisException('Invalid pSubscribe response.'); + } + } + try { + while ($this->subscribed) { + list($type, $pattern, $channel, $message) = $this->read_reply(); + if ($type != 'pmessage') { + throw new CredisException('Received non-pmessage reply.'); + } + $callback($this, $pattern, $channel, $message); + } + } catch (CredisException $e) { + if ($e->getCode() == CredisException::CODE_TIMED_OUT) { + try { + list($command, $pattern, $status) = $this->pUnsubscribe($patterns); + while ($status !== 0) { + list($command, $pattern, $status) = $this->read_reply(); + } + } catch (CredisException $e2) { + throw $e2; + } + } + throw $e; + } + } + + /** + * @param string|array $pattern + * @return array + */ + public function unsubscribe() + { + list($command, $channel, $subscribedChannels) = $this->__call('unsubscribe', func_get_args()); + $this->subscribed = $subscribedChannels > 0; + return array($command, $channel, $subscribedChannels); + } + + /** + * @param string|array $channels + * @param $callback + * @throws CredisException + * @return $this|array|bool|Credis_Client|mixed|null|string + */ + public function subscribe($channels, $callback) + { + if ( ! $this->standalone) { + return $this->__call('subscribe', array((array)$channels, $callback)); + } + + // Standalone mode: use infinite loop to subscribe until timeout + $channelCount = is_array($channels) ? count($channels) : 1; + while ($channelCount--) { + if (isset($status)) { + list($command, $channel, $status) = $this->read_reply(); + } else { + list($command, $channel, $status) = $this->__call('subscribe', array($channels)); + } + $this->subscribed = $status > 0; + if ( ! $status) { + throw new CredisException('Invalid subscribe response.'); + } + } + try { + while ($this->subscribed) { + list($type, $channel, $message) = $this->read_reply(); + if ($type != 'message') { + throw new CredisException('Received non-message reply.'); + } + $callback($this, $channel, $message); + } + } catch (CredisException $e) { + if ($e->getCode() == CredisException::CODE_TIMED_OUT) { + try { + list($command, $channel, $status) = $this->unsubscribe($channels); + while ($status !== 0) { + list($command, $channel, $status) = $this->read_reply(); + } + } catch (CredisException $e2) { + throw $e2; + } + } + throw $e; + } + } + + public function __call($name, $args) + { + // Lazy connection + $this->connect(); + + $name = strtolower($name); + + // Send request via native PHP + if($this->standalone) + { + switch ($name) { + case 'eval': + case 'evalsha': + $script = array_shift($args); + $keys = (array) array_shift($args); + $eArgs = (array) array_shift($args); + $args = array($script, count($keys), $keys, $eArgs); + break; + case 'set': + // The php redis module has different behaviour with ttl + // https://github.com/phpredis/phpredis#set + if (count($args) === 3 && is_int($args[2])) { + $args = array($args[0], $args[1], array('EX', $args[2])); + } elseif (count($args) === 3 && is_array($args[2])) { + $tmp_args = $args; + $args = array($tmp_args[0], $tmp_args[1]); + foreach ($tmp_args[2] as $k=>$v) { + if (is_string($k)) { + $args[] = array($k,$v); + } elseif (is_int($k)) { + $args[] = $v; + } + } + unset($tmp_args); + } + break; + case 'scan': + case 'sscan': + case 'hscan': + case 'zscan': + $ref =& $args[0]; + if (empty($ref)) + { + $ref = 0; + } + $eArgs = array($ref); + if (!empty($args[1])) + { + $eArgs[] = 'MATCH'; + $eArgs[] = $args[1]; + } + if (!empty($args[2])) + { + $eArgs[] = 'COUNT'; + $eArgs[] = $args[2]; + } + $args = $eArgs; + break; + case 'zrangebyscore': + if (isset($args[3]) && is_array($args[3])) { + // map options + $cArgs = array(); + if (!empty($args[3]['withscores'])) { + $cArgs[] = 'withscores'; + } + if (array_key_exists('limit', $args[3])) { + $cArgs[] = array('limit' => $args[3]['limit']); + } + $args[3] = $cArgs; + } + break; + } + // Flatten arguments + $args = self::_flattenArguments($args); + + // In pipeline mode + if($this->usePipeline) + { + if($name == 'pipeline') { + throw new CredisException('A pipeline is already in use and only one pipeline is supported.'); + } + else if($name == 'exec') { + if($this->isMulti) { + $this->commandNames[] = $name; + $this->commands .= self::_prepare_command(array($this->getRenamedCommand($name))); + } + + // Write request + if($this->commands) { + $this->write_command($this->commands); + } + $this->commands = NULL; + + // Read response + $response = array(); + foreach($this->commandNames as $command) { + $response[] = $this->read_reply($command); + } + $this->commandNames = NULL; + + if($this->isMulti) { + $response = array_pop($response); + } + $this->usePipeline = $this->isMulti = FALSE; + return $response; + } + else { + if($name == 'multi') { + $this->isMulti = TRUE; + } + array_unshift($args, $this->getRenamedCommand($name)); + $this->commandNames[] = $name; + $this->commands .= self::_prepare_command($args); + return $this; + } + } + + // Start pipeline mode + if($name == 'pipeline') + { + $this->usePipeline = TRUE; + $this->commandNames = array(); + $this->commands = ''; + return $this; + } + + // If unwatching, allow reconnect with no error thrown + if($name == 'unwatch') { + $this->isWatching = FALSE; + } + + // Non-pipeline mode + array_unshift($args, $this->getRenamedCommand($name)); + $command = self::_prepare_command($args); + $this->write_command($command); + $response = $this->read_reply($name); + + switch($name) + { + case 'scan': + case 'sscan': + case 'hscan': + case 'zscan': + $ref = array_shift($response); + if (empty($ref)) + { + $response = false; + } + break; + case 'zrangebyscore'; + if (in_array('withscores', $args, true)) { + // Map array of values into key=>score list like phpRedis does + $item = null; + $out = array(); + foreach ($response as $value) { + if ($item == null) { + $item = $value; + } else { + // 2nd value is the score + $out[$item] = (float) $value; + $item = null; + } + } + $response = $out; + } + break; + } + + // Watch mode disables reconnect so error is thrown + if($name == 'watch') { + $this->isWatching = TRUE; + } + // Transaction mode + else if($this->isMulti && ($name == 'exec' || $name == 'discard')) { + $this->isMulti = FALSE; + } + // Started transaction + else if($this->isMulti || $name == 'multi') { + $this->isMulti = TRUE; + $response = $this; + } + } + + // Send request via phpredis client + else + { + // Tweak arguments + switch($name) { + case 'get': // optimize common cases + case 'set': + case 'hget': + case 'hset': + case 'setex': + case 'mset': + case 'msetnx': + case 'hmset': + case 'hmget': + case 'del': + case 'zrangebyscore': + break; + case 'mget': + if(isset($args[0]) && ! is_array($args[0])) { + $args = array($args); + } + break; + case 'lrem': + $args = array($args[0], $args[2], $args[1]); + break; + case 'eval': + case 'evalsha': + if (isset($args[1]) && is_array($args[1])) { + $cKeys = $args[1]; + } elseif (isset($args[1]) && is_string($args[1])) { + $cKeys = array($args[1]); + } else { + $cKeys = array(); + } + if (isset($args[2]) && is_array($args[2])) { + $cArgs = $args[2]; + } elseif (isset($args[2]) && is_string($args[2])) { + $cArgs = array($args[2]); + } else { + $cArgs = array(); + } + $args = array($args[0], array_merge($cKeys, $cArgs), count($cKeys)); + break; + case 'subscribe': + case 'psubscribe': + break; + case 'scan': + case 'sscan': + case 'hscan': + case 'zscan': + // allow phpredis to see the caller's reference + //$param_ref =& $args[0]; + break; + default: + // Flatten arguments + $args = self::_flattenArguments($args); + } + + try { + // Proxy pipeline mode to the phpredis library + if($name == 'pipeline' || $name == 'multi') { + if($this->isMulti) { + return $this; + } else { + $this->isMulti = TRUE; + $this->redisMulti = call_user_func_array(array($this->redis, $name), $args); + } + } + else if($name == 'exec' || $name == 'discard') { + $this->isMulti = FALSE; + $response = $this->redisMulti->$name(); + $this->redisMulti = NULL; + #echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n"; + return $response; + } + + // Use aliases to be compatible with phpredis wrapper + if(isset($this->wrapperMethods[$name])) { + $name = $this->wrapperMethods[$name]; + } + + // Multi and pipeline return self for chaining + if($this->isMulti) { + call_user_func_array(array($this->redisMulti, $name), $args); + return $this; + } + + // Send request, retry one time when using persistent connections on the first request only + $this->requests++; + try { + $response = call_user_func_array(array($this->redis, $name), $args); + } catch (RedisException $e) { + if ($this->persistent && $this->requests == 1 && $e->getMessage() == 'read error on connection') { + $this->connected = FALSE; + $this->connect(); + $response = call_user_func_array(array($this->redis, $name), $args); + } else { + throw $e; + } + } + } + // Wrap exceptions + catch(RedisException $e) { + $code = 0; + if ( ! ($result = $this->redis->IsConnected())) { + $this->connected = FALSE; + $code = CredisException::CODE_DISCONNECTED; + } + throw new CredisException($e->getMessage(), $code, $e); + } + + #echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n"; + + // change return values where it is too difficult to minim in standalone mode + switch($name) + { + case 'hmget': + $response = array_values($response); + break; + + case 'type': + $typeMap = array( + self::TYPE_NONE, + self::TYPE_STRING, + self::TYPE_SET, + self::TYPE_LIST, + self::TYPE_ZSET, + self::TYPE_HASH, + ); + $response = $typeMap[$response]; + break; + + // Handle scripting errors + case 'eval': + case 'evalsha': + case 'script': + $error = $this->redis->getLastError(); + $this->redis->clearLastError(); + if ($error && substr($error,0,8) == 'NOSCRIPT') { + $response = NULL; + } else if ($error) { + throw new CredisException($error); + } + break; + default: + $error = $this->redis->getLastError(); + $this->redis->clearLastError(); + if ($error) { + throw new CredisException($error); + } + break; + } + } + + return $response; + } + + protected function write_command($command) + { + // Reconnect on lost connection (Redis server "timeout" exceeded since last command) + if(feof($this->redis)) { + $this->close(); + // If a watch or transaction was in progress and connection was lost, throw error rather than reconnect + // since transaction/watch state will be lost. + if(($this->isMulti && ! $this->usePipeline) || $this->isWatching) { + $this->isMulti = $this->isWatching = FALSE; + throw new CredisException('Lost connection to Redis server during watch or transaction.'); + } + $this->connected = FALSE; + $this->connect(); + if($this->authPassword) { + $this->auth($this->authPassword); + } + if($this->selectedDb != 0) { + $this->select($this->selectedDb); + } + } + + $commandLen = strlen($command); + $lastFailed = FALSE; + for ($written = 0; $written < $commandLen; $written += $fwrite) { + $fwrite = fwrite($this->redis, substr($command, $written)); + if ($fwrite === FALSE || ($fwrite == 0 && $lastFailed)) { + $this->connected = FALSE; + throw new CredisException('Failed to write entire command to stream'); + } + $lastFailed = !!$fwrite; + } + } + + protected function read_reply($name = '') + { + $reply = fgets($this->redis); + if($reply === FALSE) { + $info = stream_get_meta_data($this->redis); + if ($info['timed_out']) { + throw new CredisException('Read operation timed out.', CredisException::CODE_TIMED_OUT); + } else { + $this->connected = FALSE; + throw new CredisException('Lost connection to Redis server.', CredisException::CODE_DISCONNECTED); + } + } + $reply = rtrim($reply, CRLF); + #echo "> $name: $reply\n"; + $replyType = substr($reply, 0, 1); + switch ($replyType) { + /* Error reply */ + case '-': + if($this->isMulti || $this->usePipeline) { + $response = FALSE; + } else if ($name == 'evalsha' && substr($reply,0,9) == '-NOSCRIPT') { + $response = NULL; + } else { + throw new CredisException(substr($reply,0,4) == '-ERR' ? substr($reply, 5) : substr($reply,1)); + } + break; + /* Inline reply */ + case '+': + $response = substr($reply, 1); + if($response == 'OK' || $response == 'QUEUED') { + return TRUE; + } + break; + /* Bulk reply */ + case '$': + if ($reply == '$-1') return FALSE; + $size = (int) substr($reply, 1); + $response = stream_get_contents($this->redis, $size + 2); + if( ! $response) { + $this->connected = FALSE; + throw new CredisException('Error reading reply.'); + } + $response = substr($response, 0, $size); + break; + /* Multi-bulk reply */ + case '*': + $count = substr($reply, 1); + if ($count == '-1') return FALSE; + + $response = array(); + for ($i = 0; $i < $count; $i++) { + $response[] = $this->read_reply(); + } + break; + /* Integer reply */ + case ':': + $response = intval(substr($reply, 1)); + break; + default: + throw new CredisException('Invalid response: '.print_r($reply, TRUE)); + break; + } + + // Smooth over differences between phpredis and standalone response + switch($name) + { + case '': // Minor optimization for multi-bulk replies + break; + case 'config': + case 'hgetall': + $keys = $values = array(); + while($response) { + $keys[] = array_shift($response); + $values[] = array_shift($response); + } + $response = count($keys) ? array_combine($keys, $values) : array(); + break; + case 'info': + $lines = explode(CRLF, trim($response,CRLF)); + $response = array(); + foreach($lines as $line) { + if ( ! $line || substr($line, 0, 1) == '#') { + continue; + } + list($key, $value) = explode(':', $line, 2); + $response[$key] = $value; + } + break; + case 'ttl': + if($response === -1) { + $response = FALSE; + } + break; + } + + return $response; + } + + /** + * Build the Redis unified protocol command + * + * @param array $args + * @return string + */ + private static function _prepare_command($args) + { + return sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array('self', '_map'), $args), CRLF), CRLF); + } + + private static function _map($arg) + { + return sprintf('$%d%s%s', strlen($arg), CRLF, $arg); + } + + /** + * Flatten arguments + * + * If an argument is an array, the key is inserted as argument followed by the array values + * array('zrangebyscore', '-inf', 123, array('limit' => array('0', '1'))) + * becomes + * array('zrangebyscore', '-inf', 123, 'limit', '0', '1') + * + * @param array $in + * @return array + */ + private static function _flattenArguments(array $arguments, &$out = array()) + { + foreach ($arguments as $key => $arg) { + if (!is_int($key)) { + $out[] = $key; + } + + if (is_array($arg)) { + self::_flattenArguments($arg, $out); + } else { + $out[] = $arg; + } + } + + return $out; + } +} diff --git a/libs/credis/Cluster.php b/libs/credis/Cluster.php new file mode 100755 index 0000000..afc4f02 --- /dev/null +++ b/libs/credis/Cluster.php @@ -0,0 +1,317 @@ + + * @copyright 2009 Justin Poliey + * @license http://www.opensource.org/licenses/mit-license.php The MIT License + * @package Credis + */ + +/** + * A generalized Credis_Client interface for a cluster of Redis servers + */ +class Credis_Cluster +{ + /** + * Collection of Credis_Client objects attached to Redis servers + * @var Credis_Client[] + */ + protected $clients; + /** + * If a server is set as master, all write commands go to that one + * @var Credis_Client + */ + protected $masterClient; + /** + * Aliases of Credis_Client objects attached to Redis servers, used to route commands to specific servers + * @see Credis_Cluster::to + * @var array + */ + protected $aliases; + + /** + * Hash ring of Redis server nodes + * @var array + */ + protected $ring; + + /** + * Individual nodes of pointers to Redis servers on the hash ring + * @var array + */ + protected $nodes; + + /** + * The commands that are not subject to hashing + * @var array + * @access protected + */ + protected $dont_hash; + + /** + * Creates an interface to a cluster of Redis servers + * Each server should be in the format: + * array( + * 'host' => hostname, + * 'port' => port, + * 'db' => db, + * 'password' => password, + * 'timeout' => timeout, + * 'alias' => alias, + * 'persistent' => persistence_identifier, + * 'master' => master + * 'write_only'=> true/false + * ) + * + * @param array $servers The Redis servers in the cluster. + * @param int $replicas + * @param bool $standAlone + */ + public function __construct($servers, $replicas = 128, $standAlone = false) + { + $this->clients = array(); + $this->masterClient = null; + $this->aliases = array(); + $this->ring = array(); + $this->replicas = (int)$replicas; + $client = null; + foreach ($servers as $server) + { + if(is_array($server)){ + $client = new Credis_Client( + $server['host'], + $server['port'], + isset($server['timeout']) ? $server['timeout'] : 2.5, + isset($server['persistent']) ? $server['persistent'] : '', + isset($server['db']) ? $server['db'] : 0, + isset($server['password']) ? $server['password'] : null + ); + if (isset($server['alias'])) { + $this->aliases[$server['alias']] = $client; + } + if(isset($server['master']) && $server['master'] === true){ + $this->masterClient = $client; + if(isset($server['write_only']) && $server['write_only'] === true){ + continue; + } + } + } elseif($server instanceof Credis_Client){ + $client = $server; + } else { + throw new CredisException('Server should either be an array or an instance of Credis_Client'); + } + if($standAlone) { + $client->forceStandalone(); + } + $this->clients[] = $client; + for ($replica = 0; $replica <= $this->replicas; $replica++) { + $md5num = hexdec(substr(md5($client->getHost().':'.$client->getPort().'-'.$replica),0,7)); + $this->ring[$md5num] = count($this->clients)-1; + } + } + ksort($this->ring, SORT_NUMERIC); + $this->nodes = array_keys($this->ring); + $this->dont_hash = array_flip(array( + 'RANDOMKEY', 'DBSIZE', 'PIPELINE', 'EXEC', + 'SELECT', 'MOVE', 'FLUSHDB', 'FLUSHALL', + 'SAVE', 'BGSAVE', 'LASTSAVE', 'SHUTDOWN', + 'INFO', 'MONITOR', 'SLAVEOF' + )); + if($this->masterClient !== null && count($this->clients()) == 0){ + $this->clients[] = $this->masterClient; + for ($replica = 0; $replica <= $this->replicas; $replica++) { + $md5num = hexdec(substr(md5($this->masterClient->getHost().':'.$this->masterClient->getHost().'-'.$replica),0,7)); + $this->ring[$md5num] = count($this->clients)-1; + } + $this->nodes = array_keys($this->ring); + } + } + + /** + * @param Credis_Client $masterClient + * @param bool $writeOnly + * @return Credis_Cluster + */ + public function setMasterClient(Credis_Client $masterClient, $writeOnly=false) + { + if(!$masterClient instanceof Credis_Client){ + throw new CredisException('Master client should be an instance of Credis_Client'); + } + $this->masterClient = $masterClient; + if (!isset($this->aliases['master'])) { + $this->aliases['master'] = $masterClient; + } + if(!$writeOnly){ + $this->clients[] = $this->masterClient; + for ($replica = 0; $replica <= $this->replicas; $replica++) { + $md5num = hexdec(substr(md5($this->masterClient->getHost().':'.$this->masterClient->getHost().'-'.$replica),0,7)); + $this->ring[$md5num] = count($this->clients)-1; + } + $this->nodes = array_keys($this->ring); + } + return $this; + } + /** + * Get a client by index or alias. + * + * @param string|int $alias + * @throws CredisException + * @return Credis_Client + */ + public function client($alias) + { + if (is_int($alias) && isset($this->clients[$alias])) { + return $this->clients[$alias]; + } + else if (isset($this->aliases[$alias])) { + return $this->aliases[$alias]; + } + throw new CredisException("Client $alias does not exist."); + } + + /** + * Get an array of all clients + * + * @return array|Credis_Client[] + */ + public function clients() + { + return $this->clients; + } + + /** + * Execute a command on all clients + * + * @return array + */ + public function all() + { + $args = func_get_args(); + $name = array_shift($args); + $results = array(); + foreach($this->clients as $client) { + $results[] = call_user_func_array([$client, $name], $args); + } + return $results; + } + + /** + * Get the client that the key would hash to. + * + * @param string $key + * @return \Credis_Client + */ + public function byHash($key) + { + return $this->clients[$this->hash($key)]; + } + + /** + * Execute a Redis command on the cluster with automatic consistent hashing and read/write splitting + * + * @param string $name + * @param array $args + * @return mixed + */ + public function __call($name, $args) + { + if($this->masterClient !== null && !$this->isReadOnlyCommand($name)){ + $client = $this->masterClient; + }elseif (count($this->clients()) == 1 || isset($this->dont_hash[strtoupper($name)]) || !isset($args[0])) { + $client = $this->clients[0]; + } + else { + $client = $this->byHash($args[0]); + } + return call_user_func_array([$client, $name], $args); + } + + /** + * Get client index for a key by searching ring with binary search + * + * @param string $key The key to hash + * @return int The index of the client object associated with the hash of the key + */ + public function hash($key) + { + $needle = hexdec(substr(md5($key),0,7)); + $server = $min = 0; + $max = count($this->nodes) - 1; + while ($max >= $min) { + $position = (int) (($min + $max) / 2); + $server = $this->nodes[$position]; + if ($needle < $server) { + $max = $position - 1; + } + else if ($needle > $server) { + $min = $position + 1; + } + else { + break; + } + } + return $this->ring[$server]; + } + + public function isReadOnlyCommand($command) + { + $readOnlyCommands = array( + 'DBSIZE', + 'INFO', + 'MONITOR', + 'EXISTS', + 'TYPE', + 'KEYS', + 'SCAN', + 'RANDOMKEY', + 'TTL', + 'GET', + 'MGET', + 'SUBSTR', + 'STRLEN', + 'GETRANGE', + 'GETBIT', + 'LLEN', + 'LRANGE', + 'LINDEX', + 'SCARD', + 'SISMEMBER', + 'SINTER', + 'SUNION', + 'SDIFF', + 'SMEMBERS', + 'SSCAN', + 'SRANDMEMBER', + 'ZRANGE', + 'ZREVRANGE', + 'ZRANGEBYSCORE', + 'ZREVRANGEBYSCORE', + 'ZCARD', + 'ZSCORE', + 'ZCOUNT', + 'ZRANK', + 'ZREVRANK', + 'ZSCAN', + 'HGET', + 'HMGET', + 'HEXISTS', + 'HLEN', + 'HKEYS', + 'HVALS', + 'HGETALL', + 'HSCAN', + 'PING', + 'AUTH', + 'SELECT', + 'ECHO', + 'QUIT', + 'OBJECT', + 'BITCOUNT', + 'TIME', + 'SORT' + ); + return in_array(strtoupper($command),$readOnlyCommands); + } +} + diff --git a/libs/credis/LICENSE b/libs/credis/LICENSE new file mode 100755 index 0000000..c0a2f41 --- /dev/null +++ b/libs/credis/LICENSE @@ -0,0 +1,23 @@ +Copyright (c) 2009 Justin Poliey +Copyright (c) 2011 Colin Mollenhour + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, +copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. diff --git a/libs/credis/README.markdown b/libs/credis/README.markdown new file mode 100755 index 0000000..ec911f5 --- /dev/null +++ b/libs/credis/README.markdown @@ -0,0 +1,171 @@ +# Credis + +Credis is a lightweight interface to the [Redis](http://redis.io/) key-value store which wraps the [phpredis](https://github.com/nicolasff/phpredis) +library when available for better performance. This project was forked from one of the many redisent forks. + +## Getting Started + +Credis_Client uses methods named the same as Redis commands, and translates return values to the appropriate +PHP equivalents. + +```php +require 'Credis/Client.php'; +$redis = new Credis_Client('localhost'); +$redis->set('awesome', 'absolutely'); +echo sprintf('Is Credis awesome? %s.\n', $redis->get('awesome')); + +// When arrays are given as arguments they are flattened automatically +$redis->rpush('particles', array('proton','electron','neutron')); +$particles = $redis->lrange('particles', 0, -1); +``` +Redis error responses will be wrapped in a CredisException class and thrown. + +Credis_Client also supports transparent command renaming. Write code using the original command names and the +client will send the aliased commands to the server transparently. Specify the renamed commands using a prefix +for md5, a callable function, individual aliases, or an array map of aliases. See "Redis Security":http://redis.io/topics/security for more info. + +## Clustering your servers + +Credis also includes a way for developers to fully utilize the scalability of Redis with multiple servers and [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing). +Using the [Credis_Cluster](Cluster.php) class, you can use Credis the same way, except that keys will be hashed across multiple servers. +Here is how to set up a cluster: + +### Basic clustering example +```php + '127.0.0.1', 'port' => 6379, 'alias'=>'alpha'), + array('host' => '127.0.0.1', 'port' => 6380, 'alias'=>'beta') +)); +$cluster->set('key','value'); +echo "Alpha: ".$cluster->client('alpha')->get('key').PHP_EOL; +echo "Beta: ".$cluster->client('beta')->get('key').PHP_EOL; +``` + +### Explicit definition of replicas + +The consistent hashing strategy stores keys on a so called "ring". The position of each key is relative to the position of its target node. The target node that has the closest position will be the selected node for that specific key. + +To avoid an uneven distribution of keys (especially on small clusters), it is common to duplicate target nodes. Based on the number of replicas, each target node will exist *n times* on the "ring". + +The following example explicitly sets the number of replicas to 5. Both Redis instances will have 5 copies. The default value is 128. + +```php + '127.0.0.1', 'port' => 6379, 'alias'=>'alpha'), + array('host' => '127.0.0.1', 'port' => 6380, 'alias'=>'beta') + ), 5 +); +$cluster->set('key','value'); +echo "Alpha: ".$cluster->client('alpha')->get('key').PHP_EOL; +echo "Beta: ".$cluster->client('beta')->get('key').PHP_EOL; +``` + +## Master/slave replication + +The [Credis_Cluster](Cluster.php) class can also be used for [master/slave replication](http://redis.io/topics/replication). +Credis_Cluster will automatically perform *read/write splitting* and send the write requests exclusively to the master server. +Read requests will be handled by all servers unless you set the *write_only* flag to true in the connection string of the master server. + +### Redis server settings for master/slave replication + +Setting up master/slave replication is simple and only requires adding the following line to the config of the slave server: + +``` +slaveof 127.0.0.1 6379 +``` + +### Basic master/slave example +```php + '127.0.0.1', 'port' => 6379, 'alias'=>'master', 'master'=>true), + array('host' => '127.0.0.1', 'port' => 6380, 'alias'=>'slave') +)); +$cluster->set('key','value'); +echo $cluster->get('key').PHP_EOL; +echo $cluster->client('slave')->get('key').PHP_EOL; + +$cluster->client('master')->set('key2','value'); +echo $cluster->client('slave')->get('key2').PHP_EOL; +``` + +### No read on master + +The following example illustrates how to disable reading on the master server. This will cause the master server only to be used for writing. +This should only happen when you have enough write calls to create a certain load on the master server. Otherwise this is an inefficient usage of server resources. + +```php + '127.0.0.1', 'port' => 6379, 'alias'=>'master', 'master'=>true, 'write_only'=>true), + array('host' => '127.0.0.1', 'port' => 6380, 'alias'=>'slave') +)); +$cluster->set('key','value'); +echo $cluster->get('key').PHP_EOL; +``` +## Automatic failover with Sentinel + +[Redis Sentinel](http://redis.io/topics/sentinel) is a system that can monitor Redis instances. You register master servers and Sentinel automatically detects its slaves. + +When a master server dies, Sentinel will make sure one of the slaves is promoted to be the new master. This autofailover mechanism will also demote failed masters to avoid data inconsistency. + +The [Credis_Sentinel](Sentinel.php) class interacts with the *Redis Sentinel* instance(s) and acts as a proxy. Sentinel will automatically create [Credis_Cluster](Cluster.php) objects and will set the master and slaves accordingly. + +Sentinel uses the same protocol as Redis. In the example below we register the Sentinel server running on port *26379* and assign it to the [Credis_Sentinel](Sentinel.php) object. +We then ask Sentinel the hostname and port for the master server known as *mymaster*. By calling the *getCluster* method we immediately get a [Credis_Cluster](Cluster.php) object that allows us to perform basic Redis calls. + +```php +getMasterAddressByName('mymaster'); +$cluster = $sentinel->getCluster('mymaster'); + +echo 'Writing to master: '.$masterAddress[0].' on port '.$masterAddress[1].PHP_EOL; +$cluster->set('key','value'); +echo $cluster->get('key').PHP_EOL; +``` +### Additional parameters + +Because [Credis_Sentinel](Sentinel.php) will create [Credis_Cluster](Cluster.php) objects using the *"getCluster"* or *"createCluster"* methods, additional parameters can be passed. + +First of all there's the *"write_only"* flag. You can also define the selected database and the number of replicas. And finally there's a *"selectRandomSlave"* option. + +The *"selectRandomSlave"* flag is used in setups for masters that have multiple slaves. The Credis_Sentinel will either select one random slave to be used when creating the Credis_Cluster object or to pass them all and use the built-in hashing. + +The example below shows how to use these 3 options. It selects database 2, sets the number of replicas to 10, it doesn't select a random slave and doesn't allow reading on the master server. + +```php +getCluster('mymaster',2,10,false,true); +$cluster->set('key','value'); +echo $cluster->get('key').PHP_EOL; +``` + +## About + +© 2011 [Colin Mollenhour](http://colin.mollenhour.com) +© 2009 [Justin Poliey](http://justinpoliey.com) diff --git a/libs/credis/Sentinel.php b/libs/credis/Sentinel.php new file mode 100755 index 0000000..2aa7502 --- /dev/null +++ b/libs/credis/Sentinel.php @@ -0,0 +1,255 @@ + + * @license http://www.opensource.org/licenses/mit-license.php The MIT License + * @package Credis_Sentinel + */ +class Credis_Sentinel +{ + /** + * Contains a client that connects to a Sentinel node. + * Sentinel uses the same protocol as Redis which makes using Credis_Client convenient. + * @var Credis_Client + */ + protected $_client; + /** + * Contains an active instance of Credis_Cluster per master pool + * @var Array + */ + protected $_cluster = array(); + /** + * Contains an active instance of Credis_Client representing a master + * @var Array + */ + protected $_master = array(); + /** + * Contains an array Credis_Client objects representing all slaves per master pool + * @var Array + */ + protected $_slaves = array(); + /** + * Use the phpredis extension or the standalone implementation + * @var bool + */ + protected $_standAlone = false; + /** + * Connect with a Sentinel node. Sentinel will do the master and slave discovery + * @param Credis_Client $client + */ + public function __construct(Credis_Client $client) + { + if(!$client instanceof Credis_Client){ + throw new CredisException('Sentinel client should be an instance of Credis_Client'); + } + $client->forceStandalone(); + $this->_client = $client; + } + /** + * @return Credis_Sentinel + */ + public function forceStandalone() + { + $this->_standAlone = true; + return $this; + } + /** + * Discover the master node automatically and return an instance of Credis_Client that connects to the master + * @param string $name + * @return Credis_Client + */ + public function createMasterClient($name) + { + $master = $this->getMasterAddressByName($name); + if(!isset($master[0]) || !isset($master[1])){ + throw new CredisException('Master not found'); + } + return new Credis_Client($master[0],$master[1]); + } + /** + * If a Credis_Client object exists for a master, return it. Otherwise create one and return it + * @param string $name + * @return Credis_Client + */ + public function getMasterClient($name) + { + if(!isset($this->_master[$name])){ + $this->_master[$name] = $this->createMasterClient($name); + } + return $this->_master[$name]; + } + /** + * Discover the slave nodes automatically and return an array of Credis_Client objects + * @param string $name + * @return Credis_Client[] + */ + public function createSlaveClients($name) + { + $slaves = $this->slaves($name); + $workingSlaves = array(); + foreach($slaves as $slave) { + if(!isset($slave[9])){ + throw new CredisException('Can\' retrieve slave status'); + } + if(!strstr($slave[9],'s_down') && !strstr($slave[9],'disconnected')) { + $workingSlaves[] = new Credis_Client($slave[3],$slave[5]); + } + } + return $workingSlaves; + } + /** + * If an array of Credis_Client objects exist for a set of slaves, return them. Otherwise create and return them + * @param string $name + * @return Credis_Client[] + */ + public function getSlaveClients($name) + { + if(!isset($this->_slaves[$name])){ + $this->_slaves[$name] = $this->createSlaveClients($name); + } + return $this->_slaves[$name]; + } + /** + * Returns a Redis cluster object containing a random slave and the master + * When $selectRandomSlave is true, only one random slave is passed. + * When $selectRandomSlave is false, all clients are passed and hashing is applied in Credis_Cluster + * When $writeOnly is false, the master server will also be used for read commands. + * @param string $name + * @param int $db + * @param int $replicas + * @param bool $selectRandomSlave + * @param bool $writeOnly + * @return Credis_Cluster + */ + public function createCluster($name, $db=0, $replicas=128, $selectRandomSlave=true, $writeOnly=false) + { + $clients = array(); + $workingClients = array(); + $master = $this->master($name); + if(strstr($master[9],'s_down') || strstr($master[9],'disconnected')) { + throw new CredisException('The master is down'); + } + $slaves = $this->slaves($name); + foreach($slaves as $slave){ + if(!strstr($slave[9],'s_down') && !strstr($slave[9],'disconnected')) { + $workingClients[] = array('host'=>$slave[3],'port'=>$slave[5],'master'=>false,'db'=>$db); + } + } + if(count($workingClients)>0){ + if($selectRandomSlave){ + if(!$writeOnly){ + $workingClients[] = array('host'=>$master[3],'port'=>$master[5],'master'=>false,'db'=>$db); + } + $clients[] = $workingClients[rand(0,count($workingClients)-1)]; + } else { + $clients = $workingClients; + } + } + $clients[] = array('host'=>$master[3],'port'=>$master[5], 'db'=>$db ,'master'=>true,'write_only'=>$writeOnly); + return new Credis_Cluster($clients,$replicas,$this->_standAlone); + } + /** + * If a Credis_Cluster object exists, return it. Otherwise create one and return it. + * @param string $name + * @param int $db + * @param int $replicas + * @param bool $selectRandomSlave + * @param bool $writeOnly + * @return Credis_Cluster + */ + public function getCluster($name, $db=0, $replicas=128, $selectRandomSlave=true, $writeOnly=false) + { + if(!isset($this->_cluster[$name])){ + $this->_cluster[$name] = $this->createCluster($name, $db, $replicas, $selectRandomSlave, $writeOnly); + } + return $this->_cluster[$name]; + } + /** + * Catch-all method + * @param string $name + * @param array $args + * @return mixed + */ + public function __call($name, $args) + { + array_unshift($args,$name); + return call_user_func(array($this->_client,'sentinel'),$args); + } + /** + * Return information about all registered master servers + * @return mixed + */ + public function masters() + { + return $this->_client->sentinel('masters'); + } + /** + * Return all information for slaves that are associated with a single master + * @param string $name + * @return mixed + */ + public function slaves($name) + { + return $this->_client->sentinel('slaves',$name); + } + /** + * Get the information for a specific master + * @param string $name + * @return mixed + */ + public function master($name) + { + return $this->_client->sentinel('master',$name); + } + /** + * Get the hostname and port for a specific master + * @param string $name + * @return mixed + */ + public function getMasterAddressByName($name) + { + return $this->_client->sentinel('get-master-addr-by-name',$name); + } + /** + * Check if the Sentinel is still responding + * @param string $name + * @return mixed + */ + public function ping() + { + return $this->_client->ping(); + } + /** + * Perform an auto-failover which will re-elect another master and make the current master a slave + * @param string $name + * @return mixed + */ + public function failover($name) + { + return $this->_client->sentinel('failover',$name); + } + + /** + * @return string + */ + public function getHost() + { + return $this->_client->getHost(); + } + + /** + * @return int + */ + public function getPort() + { + return $this->_client->getPort(); + } +} \ No newline at end of file diff --git a/libs/credis/composer.json b/libs/credis/composer.json new file mode 100755 index 0000000..379af5e --- /dev/null +++ b/libs/credis/composer.json @@ -0,0 +1,23 @@ +{ + "name": "colinmollenhour/credis", + "type": "library", + "description": "Credis is a lightweight interface to the Redis key-value store which wraps the phpredis library when available for better performance.", + "homepage": "https://github.com/colinmollenhour/credis", + "license": "MIT", + "authors": [ + { + "name": "Colin Mollenhour", + "email": "colin@mollenhour.com" + } + ], + "require": { + "php": ">=5.3.0" + }, + "autoload": { + "classmap": [ + "Client.php", + "Cluster.php", + "Sentinel.php" + ] + } +} diff --git a/plugin.json b/plugin.json index 755885a..8948b26 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "name": "QueuedTracking", - "version": "0.2.5", + "version": "0.3.0", "description": "Scale your large traffic Piwik service by queuing tracking requests in Redis for better performance. ", "theme": false, "keywords": ["tracker", "tracking", "queue", "redis"], diff --git a/tests/Integration/Queue/Backend/SentinelTest.php b/tests/Integration/Queue/Backend/SentinelTest.php new file mode 100644 index 0000000..10a306c --- /dev/null +++ b/tests/Integration/Queue/Backend/SentinelTest.php @@ -0,0 +1,35 @@ +QueuedTracking = array('backend' => 'sentinel'); + + $sentinel = Factory::makeBackend(); + + $this->assertTrue($sentinel instanceof Sentinel); + + return $sentinel; + } + +} diff --git a/tests/Integration/Queue/FactoryTest.php b/tests/Integration/Queue/FactoryTest.php index 0211ad0..3ae81a9 100644 --- a/tests/Integration/Queue/FactoryTest.php +++ b/tests/Integration/Queue/FactoryTest.php @@ -8,6 +8,7 @@ namespace Piwik\Plugins\QueuedTracking\tests\Integration\Queue; +use Piwik\Config; use Piwik\Plugins\QueuedTracking\Queue; use Piwik\Plugins\QueuedTracking\Queue\Factory; use Piwik\Plugins\QueuedTracking\Settings; @@ -54,6 +55,15 @@ public function test_makeBackend_shouldReturnARedisInstance() { $backend = Factory::makeBackend(); $this->assertTrue($backend instanceof Queue\Backend\Redis); + $this->assertFalse($backend instanceof Queue\Backend\Sentinel); + } + + public function test_makeBackend_shouldReturnASentinelInstanceIfConfigured() + { + Config::getInstance()->QueuedTracking = array('backend' => 'sentinel'); + $backend = Factory::makeBackend(); + Config::getInstance()->QueuedTracking = array(); + $this->assertTrue($backend instanceof Queue\Backend\Sentinel); } public function test_makeBackend_shouldConfigureRedis()