Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra 0 7 #45

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 50 additions & 27 deletions lib/Core.class.php
Expand Up @@ -214,7 +214,7 @@ static public function disconnectAll() {
static public function connect($connectionID, $host, $keySpace = DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
try {

// check connectionid hasn't been marked as down
// check connectionId hasn't been marked as down
if (self::_priorFail($connectionID)) {
self::registerError($host.'/'.$port.' is marked DOWN', PandraLog::LOG_CRIT);
} else {
Expand All @@ -226,18 +226,23 @@ static public function connect($connectionID, $host, $keySpace = DEFAULT_POOL_NA
if (!array_key_exists($keySpace, self::$_socketPool)) self::$_socketPool[$keySpace] = array();

// Create Thrift transport and binary protocol cassandra client
$transport = new TBufferedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), 1024, 1024);
$transport = new TFramedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), true, true);

self::_authOpen($transport, $keySpace);

$client = new CassandraClient(
(PANDRA_64 &&
function_exists("thrift_protocol_write_binary") ?
new TBinaryProtocolAccelerated($transport) :
new TBinaryProtocol($transport)));

// Cassandra 0.7 won't let you do anything without an active keyspace
$client->set_keyspace($keySpace);

self::$_socketPool[$keySpace][$connectionID] = array(
'retries' => 0,
'transport' => $transport,
'client' => new CassandraClient(
(PANDRA_64 &&
function_exists("thrift_protocol_write_binary") ?
new TBinaryProtocolAccelerated($transport) :
new TBinaryProtocol($transport)))
'client' => $client
);

// set new connection the active, working master
Expand Down Expand Up @@ -325,15 +330,16 @@ static public function authKeyspace($keySpace, $username, $password) {
}

/**
* Alias for connectBySeed (deprecated)
* Alias for connectSeededKeypsace.
* @deprecated use connectSeededKeypsace instead.
*/
static public function auto($hosts, $keySpace = DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
return self::connectSeededKeyspace($hosts, $keySpace, $port);
}

/**
* Given a single host, attempts to find other nodes in the cluster and attaches them
* to the pool
* Given a single host, attempts to find other nodes in the cluster and
* attach them to the pool
* @todo build connections from token map
* @param mixed $hosts host name or IP of connecting node (or array thereof)
* @param string $keySpace name of the connection pool (cluster name - usually keyspace)
Expand All @@ -349,7 +355,7 @@ static public function connectSeededKeyspace($hosts, $keySpace = DEFAULT_POOL_NA
foreach ($hosts as $host) {
try {
// Create Thrift transport and binary protocol cassandra client
$transport = new TBufferedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), 1024, 1024);
$transport = new TFramedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), true, true);
$transport->open();
$client = new CassandraClient(
(function_exists("thrift_protocol_write_binary") ?
Expand Down Expand Up @@ -492,12 +498,12 @@ static public function getClient($writeMode = FALSE, $keySpace = DEFAULT_POOL_NA
* @param <type> $transport
* @param <type> $keySpace
*/
static private function _authOpen(TBufferedTransport &$transport, $keySpace) {
static private function _authOpen(TFramedTransport &$transport, $keySpace) {
$transport->open();
if (array_key_exists($keySpace, self::$_ksAuth)) {
try {
$client = self::$_socketPool[self::$_activePool][self::$_activeNode]['client'];
$client->login($keySpace, self::$_ksAuth[$keySpace]);
$client->login(self::$_ksAuth[$keySpace]);
} catch (cassandra_AuthenticationException $e) {
self::registerError($e->why, PandraLog::LOG_CRIT);
throw new TException($e->why);
Expand Down Expand Up @@ -666,7 +672,7 @@ static public function deleteColumnPath($keySpace,
if ($time === NULL) {
$time = self::getTime();
}
$client->remove($keySpace, $keyID, $columnPath, $time, self::getConsistency($consistencyLevel));
$client->remove($keyID, $columnPath, $time, self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return FALSE;
Expand All @@ -685,7 +691,7 @@ static public function deleteColumnPath($keySpace,
*/
static public function saveColumnPath($keySpace,
$keyID,
cassandra_ColumnPath $columnPath,
cassandra_ColumnPath $path,
$value,
$time = NULL,
$consistencyLevel = NULL) {
Expand All @@ -694,7 +700,23 @@ static public function saveColumnPath($keySpace,
if ($time === NULL) {
$time = self::getTime();
}
$client->insert($keySpace, $keyID, $columnPath, $value, $time, self::getConsistency($consistencyLevel));

xdebug_break();

// Pretty sure this isn't the "Pandra Way". Feel free to fix it.
$col = new cassandra_Column(
array(
"name" => $path->column,
"value" => $value,
'timestamp' => $time
));
$parent = new cassandra_ColumnParent(
array(
'column_family' => $path->column_family,
'super_column' => $path->super_column,
));

$client->insert($keyID, $parent, $col, self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return FALSE;
Expand All @@ -709,10 +731,10 @@ static public function saveColumnPath($keySpace,
* @param int $consistencyLevel response consistency level
* @return bool mutate operation completed OK
*/
public function batchMutate($keySpace, $mutation, $consistencyLevel = NULL) {
static public function batchMutate($keySpace, $mutation, $consistencyLevel = NULL) {
try {
$client = self::getClient(TRUE, $keySpace);
$client->batch_mutate($keySpace, $mutation, self::getConsistency($consistencyLevel));
$client->batch_mutate($mutation, self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return FALSE;
Expand All @@ -738,7 +760,7 @@ static public function getCFSlice($keySpace,
$client = self::getClient(FALSE, $keySpace);

try {
return $client->get_slice($keySpace, $keyID, $columnParent, $predicate, self::getConsistency($consistencyLevel));
return $client->get_slice($keyID, $columnParent, $predicate, self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return NULL;
Expand All @@ -764,7 +786,7 @@ static public function getCFSliceMulti($keySpace,
$client = self::getClient(FALSE, $keySpace);

try {
return $client->multiget_slice($keySpace, $keyIDs, $columnParent, $predicate, self::getConsistency($consistencyLevel));
return $client->multiget_slice($keyIDs, $columnParent, $predicate, self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return NULL;
Expand All @@ -781,14 +803,15 @@ static public function getCFSliceMulti($keySpace,
*/
static public function getCFColumnCount($keySpace,
$keyID,
cassandra_ColumnParent
$columnParent,
cassandra_ColumnParent $columnParent,
cassandra_SlicePredicate $predicate,
$consistencyLevel = NULL) {

$client = self::getClient(FALSE, $keySpace);

try {
return $client->get_count($keySpace, $keyID, $columnParent, self::getConsistency($consistencyLevel));
return $client->get_count($keyID, $columnParent, $predicate,
self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return NULL;
Expand Down Expand Up @@ -836,19 +859,19 @@ static public function getColumnPath($keySpace,
$client = self::getClient(FALSE, $keySpace);

try {
return $client->get($keySpace, $keyID, $columnPath, self::getConsistency($consistencyLevel));
return $client->get($keyID, $columnPath, self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return NULL;
}
}

/**
* numRows is no longer supported; that option has been folded into the slice predicate.
* @param string $keySpace keyspace of key
* @param cassandra_KeyRange $keyRange
* @param cassandra_ColumnParent $columnParent
* @param cassandra_SlicePredicate $predicate column names or range predicate
* @param int number of rows to return
* @param int $consistencyLevel response consistency level
* @return <type>
*/
Expand All @@ -862,11 +885,11 @@ static public function getRangeSlices($keySpace,
$client = self::getClient(FALSE, $keySpace);

try {
return $client->get_range_slices($keySpace,
return $client->get_range_slices(
$columnParent,
$predicate,
$keyRange,
$numRows,
// $numRows, // DEPRECATED, see above
self::getConsistency($consistencyLevel));
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
Expand Down