Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Core uses keyspace as pool name by default, auto now an alias for con…

…nectSeededKeyspace($host, $keySpace, $port);

added keyspace authenticator - to be setup prior to api calls.

Eg : PandraCore::authKeyspace('Keyspace1', 'jsmith', 'havebadpass');
  • Loading branch information...
commit b11c07a458d07844c8304f94c13012d1f681589c 1 parent c71e678
@mjpearson authored
Showing with 83 additions and 49 deletions.
  1. +81 −47 lib/Core.class.php
  2. +2 −2 lib/uuid/UUIDPluginZNative.class.php
View
128 lib/Core.class.php
@@ -19,7 +19,7 @@ class PandraCore {
const MODE_RANDOM = 2; // select random node
- const DEFAULT_POOL_NAME = 'default';
+ const DEFAULT_POOL_NAME = 'Keyspace1';
/* @var string Last internal error */
static public $lastError = '';
@@ -78,6 +78,9 @@ class PandraCore {
/* @var PandraCore instance of self */
static private $_instance = NULL;
+ /* @var array keyspace => array(username, password) struct for authentication */
+ static private $_ksAuth = array();
+
/**
* dummy constructor
*/
@@ -138,14 +141,13 @@ static public function getWriteMode() {
return self::$writeMode;
}
-
/**
* Sets connectionid as active node
* @param string $connectionID named connection
* @return bool connection id exists and has been set
*/
static public function setActiveNode($connectionID) {
- if (array_key_exists($connectionID, self::$_socketPool[self::$_activePool]) && self::$_socketPool[self::$_activePool][$connectionID]['transport']->isOpen()) {
+ if (array_key_exists($connectionID, self::$_socketPool[self::$_activePool])) {
self::$_activeNode = $connectionID;
return TRUE;
}
@@ -153,16 +155,16 @@ static public function setActiveNode($connectionID) {
}
/**
- * Sets connectionid as active node
- * @param string $connectionID named connection
- * @return bool connection id exists and has been set
+ * Sets keyspace as active 'pool'
+ * @param string $keySpace named connection
+ * @return bool pool exists and has been set
*/
- static public function setActivePool($poolName) {
- if (array_key_exists($poolName, self::$_socketPool)) {
- self::$_activePool = $poolName;
+ static public function setActivePool($keySpace) {
+ if (array_key_exists($keySpace, self::$_socketPool)) {
+ self::$_activePool = $keySpace;
// grab last node in the pool to set active
- $poolNames = array_keys(self::$_socketPool[$poolName]);
- $connectionID = array_pop($poolNames);
+ $keySpaces = array_keys(self::$_socketPool[$keySpace]);
+ $connectionID = array_pop($keySpaces);
self::setActiveNode($connectionID);
return TRUE;
}
@@ -174,7 +176,7 @@ static public function setActivePool($poolName) {
* @param string $connectionID named connection
* @return bool disconnected OK
*/
- static public function disconnect($connectionID, $poolName = self::DEFAULT_POOL_NAME) {
+ static public function disconnect($connectionID) {
if (array_key_exists($connectionID, self::$_socketPool[self::$_activePool])) {
if (self::$_socketPool[self::$_activePool][$connectionID]['transport']->isOpen()) {
self::$_socketPool[self::$_activePool][$connectionID]['transport']->close();
@@ -190,11 +192,11 @@ static public function disconnect($connectionID, $poolName = self::DEFAULT_POOL_
*/
static public function disconnectAll() {
- foreach (self::$_socketPool as $poolName => $socketPool) {
+ foreach (self::$_socketPool as $keySpace => $socketPool) {
$connections = array_keys($socketPool);
foreach ($connections as $connectionID) {
- if (!self::disconnect($connectionID, $poolName)) throw new RuntimeException($connectionID.' could not be closed');
+ if (!self::disconnect($connectionID, $keySpace)) throw new RuntimeException($connectionID.' could not be closed');
}
}
return TRUE;
@@ -204,32 +206,28 @@ static public function disconnectAll() {
* Connects to given Cassandra node and makes it available in the static connection pool
* @param string $connectionID named node within connection pool
* @param string $host host name or IP of connecting node
- * @param string $poolName name of the connection pool (cluster name)
+ * @param string $keySpace name of the keyspace (connection pool)
* @param int $port TCP port of connecting node
* @return bool connected ok
*/
- static public function connect($connectionID, $host, $poolName = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
+ static public function connect($connectionID, $host, $keySpace = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
try {
// check connectionid hasn't been marked as down
if (self::priorFail($connectionID)) {
self::registerError($host.'/'.$port.' is marked DOWN', PandraLog::LOG_CRIT);
} else {
- // if the connection exists but it is closed, then re-open
- if (array_key_exists($poolName, self::$_socketPool) && array_key_exists($connectionID, self::$_socketPool[$poolName])) {
- if (!self::$_socketPool[$poolName][$connectionID]['transport']->isOpen()) {
- self::$_socketPool[$poolName][$connectionID]['transport']->open();
- }
+ // if the connection exists but it is closed, return (getClient will open)
+ if (array_key_exists($keySpace, self::$_socketPool) && array_key_exists($connectionID, self::$_socketPool[$keySpace])) {
return TRUE;
}
- if (!array_key_exists($poolName, self::$_socketPool)) self::$_socketPool[$poolName] = array();
+ if (!array_key_exists($keySpace, self::$_socketPool)) self::$_socketPool[$keySpace] = array();
// Create Thrift transport and binary protocol cassandra client
$transport = new TBufferedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), 1024, 1024);
- $transport->open();
- self::$_socketPool[$poolName][$connectionID] = array(
+ self::$_socketPool[$keySpace][$connectionID] = array(
'retries' => 0,
'transport' => $transport,
'client' => new CassandraClient(
@@ -240,7 +238,7 @@ function_exists("thrift_protocol_write_binary") ?
);
// set new connection the active, working master
- self::setActivePool($poolName);
+ self::setActivePool($keySpace);
self::setActiveNode($connectionID);
return TRUE;
}
@@ -309,10 +307,26 @@ static public function registerError($errorMsg, $priority = PandraLog::LOG_WARNI
}
/**
+ * Creates an authentication token to be used whenever pandra needs to open
+ * a thrift transport (via getClient).
+ * @param string $keySpace key space
+ * @param string $username auth username
+ * @param string $password auth password
+ */
+ static public function authKeyspace($keySpace, $username, $password) {
+ $auth = new cassandra_AuthenticationRequest();
+ $auth->credentials = array (
+ "username" => $username,
+ "password" => $password
+ );
+ self::$_ksAuth[$keySpace] = $auth;
+ }
+
+ /**
* Alias for connectBySeed (deprecated)
*/
- static public function auto($host, $poolName = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
- return self::connectBySeed($host, $poolName, $port);
+ static public function auto($host, $keySpace = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
+ return self::connectSeededKeyspace($host, $keySpace, $port);
}
/**
@@ -320,11 +334,11 @@ static public function auto($host, $poolName = self::DEFAULT_POOL_NAME, $port =
* to the pool
* @todo build connections from token map
* @param string $host host name or IP of connecting node
- * @param string $poolName name of the connection pool (cluster name)
+ * @param string $keySpace name of the connection pool (cluster name - usually keyspace)
* @param int $port TCP port of connecting node
* @return bool connected ok
*/
- static public function connectBySeed($host, $poolName = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
+ static public function connectSeededKeyspace($host, $keySpace = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
try {
// Create Thrift transport and binary protocol cassandra client
@@ -341,7 +355,7 @@ static public function connectBySeed($host, $poolName = self::DEFAULT_POOL_NAME,
unset($transport); unset($client);
$tokens = json_decode($tokenMap);
foreach ($tokens as $token => $host) {
- if (!self::connect($token, $host, $poolName)) {
+ if (!self::connect($token, $host, $keySpace)) {
return FALSE;
}
}
@@ -355,12 +369,12 @@ static public function connectBySeed($host, $poolName = self::DEFAULT_POOL_NAME,
/**
* Gets a list of connnection id's for a given pool
- * @param <type> $poolName
+ * @param <type> $keySpace
* @return <type>
*/
- static public function getPoolTokens($poolName = self::DEFAULT_POOL_NAME) {
- if (!empty(self::$_socketPool[$poolName])) {
- return array_keys(self::$_socketPool[$poolName]);
+ static public function getPoolTokens($keySpace = self::DEFAULT_POOL_NAME) {
+ if (!empty(self::$_socketPool[$keySpace])) {
+ return array_keys(self::$_socketPool[$keySpace]);
}
return array();
}
@@ -408,8 +422,10 @@ static public function getAPCAvailable() {
/**
* get current working node, recursive, trims disconnected clients
+ * @param bool $writeMode optional get the write mode client
+ * @param string $keySpace optional keyspace where auth has been defined
*/
- static public function getClient($writeMode = FALSE) {
+ static public function getClient($writeMode = FALSE, $keySpace = NULL) {
// Catch trimmed nodes or a completely trimmed pool
if (empty(self::$_activeNode) || empty(self::$_socketPool[self::$_activePool])) {
@@ -448,7 +464,7 @@ static public function getClient($writeMode = FALSE) {
// check connection is open
try {
if (!self::$_socketPool[self::$_activePool][self::$_activeNode]['transport']->isOpen()) {
- self::$_socketPool[self::$_activePool][self::$_activeNode]['transport']->open();
+ self::_authOpen(self::$_socketPool[self::$_activePool][self::$_activeNode]['transport'], $keySpace);
}
return $conn;
} catch (TException $te) {
@@ -459,7 +475,25 @@ static public function getClient($writeMode = FALSE) {
}
self::registerError(self::$_activePool.':'.self::$_activeNode.': Marked as DOWN, trying next in pool');
- return self::getClient($writeMode);
+ return self::getClient($writeMode, $keySpace);
+ }
+ }
+
+ /**
+ * Autenticates an opening transport against the api
+ * @param <type> $transport
+ * @param <type> $keySpace
+ */
+ static private function _authOpen(TBufferedTransport &$transport, $keySpace) {
+ $transport->open();
+ if (array_key_exists($keySpace, self::$_ksAuth)) {
+ try {
+ $client = self::$_socketPool[self::$_activePool][self::$_activeNode]['client'];
+ $client->login($keySpace, self::$_ksAuth[$keySpace]);
+ } catch (cassandra_AuthenticationException $e) {
+ self::registerError($e->why, PandraLog::LOG_CRIT);
+ throw new TException($e->why);
+ }
}
}
@@ -501,7 +535,7 @@ static private function priorFail($connectionID = NULL) {
* @return array keyspace structure
*/
static public function describeKeyspace($keySpace) {
- $client = self::getClient();
+ $client = self::getClient(FALSE, $keySpace);
return $client->describe_keyspace($keySpace);
}
@@ -555,7 +589,7 @@ static public function loadConfigXML() {
* Generates current time, or microtime for 64-bit systems
* @return int timestamp
*/
- static public function getTime() {
+ static public function getTime() {
// @todo patch thrift .so
if ((PANDRA_64) || (!PANDRA_64 && !function_exists("thrift_protocol_write_binary"))) {
return round(microtime(true) * 1000000, 3);
@@ -581,7 +615,7 @@ static public function deleteColumnPath($keySpace,
$time = NULL,
$consistencyLevel = NULL) {
try {
- $client = self::getClient(TRUE);
+ $client = self::getClient(TRUE, $keySpace);
if ($time === NULL) {
$time = self::getTime();
}
@@ -609,7 +643,7 @@ static public function saveColumnPath($keySpace,
$time = NULL,
$consistencyLevel = NULL) {
try {
- $client = self::getClient(TRUE);
+ $client = self::getClient(TRUE, $keySpace);
if ($time === NULL) {
$time = self::getTime();
}
@@ -636,7 +670,7 @@ static public function saveSuperColumn($keySpace,
array $superColumnMap,
$consistencyLevel = NULL) {
try {
- $client = self::getClient(TRUE);
+ $client = self::getClient(TRUE, $keySpace);
$mutations = array();
@@ -679,7 +713,7 @@ static public function getCFSlice($keySpace,
cassandra_SlicePredicate $predicate,
$consistencyLevel = NULL) {
- $client = self::getClient();
+ $client = self::getClient(FALSE, $keySpace);
try {
return $client->get_slice($keySpace, $keyID, $columnParent, $predicate, self::getConsistency($consistencyLevel));
@@ -705,7 +739,7 @@ static public function getCFSliceMulti($keySpace,
cassandra_SlicePredicate $predicate,
$consistencyLevel = NULL) {
- $client = self::getClient();
+ $client = self::getClient(FALSE, $keySpace);
try {
return $client->multiget_slice($keySpace, $keyIDs, $columnParent, $predicate, self::getConsistency($consistencyLevel));
@@ -729,7 +763,7 @@ static public function getCFColumnCount($keySpace,
$columnParent,
$consistencyLevel = NULL) {
- $client = self::getClient();
+ $client = self::getClient(FALSE, $keySpace);
try {
return $client->get_count($keySpace, $keyID, $columnParent, self::getConsistency($consistencyLevel));
@@ -777,7 +811,7 @@ static public function getColumnPath($keySpace,
cassandra_ColumnPath
$columnPath,
$consistencyLevel = NULL) {
- $client = self::getClient();
+ $client = self::getClient(FALSE, $keySpace);
@@ -805,7 +839,7 @@ static public function getRangeKeys($keySpace,
$numRows = DEFAULT_ROW_LIMIT,
$consistencyLevel = NULL) {
- $client = self::getClient();
+ $client = self::getClient(FALSE, $keySpace);
try {
return $client->get_range_slice($keySpace,
View
4 lib/uuid/UUIDPluginZNative.class.php
@@ -66,8 +66,8 @@ public static function generate() {
* returns a type 1 (MAC address and time based) uuid
* @return string
*/
- public static function v1() {
- return self::_generate(self::UUID_TIME, self::FMT_STRING, function_exists('gethostname') ? gethostname() : 'cassclient');
+ public static function v1($namespace = 'cassclient') {
+ return self::_generate(self::UUID_TIME, self::FMT_STRING, function_exists('gethostname') ? gethostname() : $namespace);
}
/**
Please sign in to comment.
Something went wrong with that request. Please try again.