Skip to content

Commit

Permalink
added longtype + example, fixed a unit test fail on core during connect
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpearson committed Jul 13, 2010
1 parent b11c07a commit 1bc509b
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 53 deletions.
59 changes: 59 additions & 0 deletions examples/long_columnfamily.php
@@ -0,0 +1,59 @@
<?php
/**
* Example LongType ColumnFamily vs default Cassandra storage-conf.xml
*
* <ColumnFamily Name="StandardByLong1" CompareWith="LongType" />
*
* Script performs a save, plus two cross checks - loading directly from an
* anonymous columnfamily model, and populating a model from a slice.
*
*/

require_once('../config.php');

PandraCore::connectSeededKeyspace('localhost');

$ks = 'Keyspace1';
$cfName = 'StandardByLong1';
$keyID = 'PandraTestLong1';

$cf = new PandraColumnFamily($keyID, $ks, $cfName, PandraColumnFamily::TYPE_LONG);

$cf->addColumn(PandraCore::getTime())->setValue('numericly indexed!');
echo 'Saving...<br>';
print_r($cf->toJSON());
$cf->save();

// load from model
echo '<br><br>Loading via CF container...<br>';
$cfNew = new PandraColumnFamily($keyID, $ks, $cfName, PandraColumnFamily::TYPE_LONG);

$cfNew->limit(5)->load();
echo '<br>Loaded...<br>';
print_r($cfNew->toJSON());

// get slice
echo '<br><br>Loading Slice...<br>';
$result = PandraCore::getCFSlice($ks,
$keyID,
new cassandra_ColumnParent(array(
'column_family' => $cfName
)),
new PandraSlicePredicate(
PandraSlicePredicate::TYPE_RANGE,
array('start' => '',
'finish' => '',
'count' => 5,
'reversed' => true))
);

var_dump($result);

$cfNew = new PandraColumnFamily($keyID, $ks, $cfName, PandraColumnFamily::TYPE_LONG);

$cfNew->populate($result);

echo '<br>Imported...<br>';
print_r($cfNew->toJSON());

?>
77 changes: 58 additions & 19 deletions lib/ColumnContainer.class.php
Expand Up @@ -31,6 +31,12 @@ abstract class PandraColumnContainer implements ArrayAccess, Iterator, Countable
/* @var int 'LONG' container type @todo - not implemented! */ /* @var int 'LONG' container type @todo - not implemented! */
const TYPE_LONG = 3; const TYPE_LONG = 3;


/* @var int 'string' flag for type conversion */
const CONTEXT_STRING = 0;

/* @var int 'binary' flag for type conversion */
const CONTEXT_BIN = 1;

/* @var array complete list of errors for this object instance */ /* @var array complete list of errors for this object instance */
public $errors = array(); public $errors = array();


Expand Down Expand Up @@ -430,8 +436,18 @@ public function getReversed() {
return $this->_reversed; return $this->_reversed;
} }


private function _setStartFinish($value, $attrib = '_start') {
if (($this->_containerType == self::TYPE_UUID) ) {
$this->$attrib = $this->typeConvert($value, self::CONTEXT_BIN);
} elseif (($this->_containerType == self::TYPE_LONG)) {
$this->$attrib = $this->typeConvert($value, self::CONTEXT_BIN);
} else {
$this->$attrib = $value;
}
}

public function setStart($start) { public function setStart($start) {
$this->_start = $this->typeConvert($start, UUID::UUID_BIN); $this->_setStartFinish($start);
} }


public function start($start) { public function start($start) {
Expand All @@ -444,7 +460,7 @@ public function getStart() {
} }


public function setFinish($finish) { public function setFinish($finish) {
$this->_finish = $this->typeConvert($finish, UUID::UUID_BIN); $this->_setStartFinish($finish, '_finish');
} }


public function finish($finish) { public function finish($finish) {
Expand All @@ -462,33 +478,42 @@ public function getFinish() {
* This stub can also potentially handle long and utf8 cf types * This stub can also potentially handle long and utf8 cf types
* *
* @param string $columnName column name * @param string $columnName column name
* @param int $toFmt convert to type (UUID::UUID_BIN, UUID::UUID_STR) * @param int $toFmt convert to type CONTEXT_BIN OR CONTEXT_STRING
* @return mixed converted column name * @return mixed converted column name
*/ */
protected function typeConvert($columnName, $toFmt) { protected function typeConvert($columnName, $toFmt) {
if (($this->_containerType == self::TYPE_UUID) ) { $bin = UUID::isBinary($columnName);

$bin = UUID::isBinary($columnName);


if (($this->_containerType == self::TYPE_UUID) ) {
// Save accidental double-conversions on binaries // Save accidental double-conversions on binaries
if (($bin && $toFmt == UUID::UUID_BIN) || if (($bin && $toFmt == self::CONTEXT_BIN) ||
(!$bin && $toFmt == UUID::UUID_STR)) { (!$bin && $toFmt == self::CONTEXT_STRING)) {
return $columnName; return $columnName;
} elseif (!$bin && !UUID::validUUID($columnName)) { } elseif (!$bin && !UUID::validUUID($columnName)) {
throw new RuntimeException('Column Name ('.$columnName.') cannot be converted'); throw new RuntimeException('Column Name ('.$columnName.') cannot be converted');
} }


if ($toFmt == UUID::UUID_BIN) { if ($toFmt == self::CONTEXT_BIN) {
return UUID::toBin($columnName); return UUID::toBin($columnName);
} elseif ($toFmt == UUID::UUID_STR) { } elseif ($toFmt == self::CONTEXT_STRING) {
return UUID::toStr($columnName); return UUID::toStr($columnName);
} }

} else if ($this->_containerType == self::TYPE_LONG) { } else if ($this->_containerType == self::TYPE_LONG) {
$columnName = UUID::isBinary($columnName) ? // Save accidental double-conversions on binaries
unpack('NN', $columnName) : if (($bin && $toFmt == self::CONTEXT_BIN) ||
pack('NN', $columnName); (!$bin && $toFmt == self::CONTEXT_STRING)) {
} return $columnName;

// unpack the long
} elseif ($bin && $toFmt == self::CONTEXT_STRING) {
$columnName = array_pop(unpack('N', $columnName));


// pack the long
} elseif (!$bin && $toFmt == self::CONTEXT_BIN) {
$columnName = pack('NN', $columnName, 0);
}
}
return $columnName; return $columnName;
} }


Expand All @@ -500,15 +525,29 @@ protected function typeConvert($columnName, $toFmt) {
* @return PandraColumn reference to created column * @return PandraColumn reference to created column
*/ */
public function addColumn($columnName, $typeDef = array(), $callbackOnSave = NULL) { public function addColumn($columnName, $typeDef = array(), $callbackOnSave = NULL) {
if (!array_key_exists($columnName, $this->_columns)) {
// can't use array_key_exists - it truncates floats on 32 bit systems
$foundKey = FALSE;
foreach ($this->_columns as $key => $value) {
$foundKey = ($key == $columnName);
if ($foundKey) break;
}

if (!$foundKey) {
$this->_columns[$columnName] = $this->_columns[$columnName] =
new PandraColumn($this->typeConvert($columnName, UUID::UUID_BIN), $typeDef); new PandraColumn($this->typeConvert($columnName, self::CONTEXT_BIN, $typeDef));


$this->_columns[$columnName]->setParent($this, FALSE); $this->_columns[$columnName]->setParent($this, FALSE);
} }


// pre-save callback // pre-save callback
if (!empty($callbackOnSave)) $this->getColumn($columnName)->setCallback($callbackOnSave); if (!empty($callbackOnSave)) $this->getColumn($columnName)->setCallback($callbackOnSave);

// php sucks balls, lets lose our precision.
if (!PANDRA_64 && $this->_containerType == self::TYPE_LONG) {
$columnName = (int) $columnName;
}

return $this->getColumn($columnName); return $this->getColumn($columnName);
} }


Expand All @@ -518,7 +557,7 @@ public function addColumn($columnName, $typeDef = array(), $callbackOnSave = NUL
*/ */
public function addColumnObj(PandraColumn $columnObj) { public function addColumnObj(PandraColumn $columnObj) {
if ($columnObj->getName() === NULL) throw new RuntimeException('Column has no name'); if ($columnObj->getName() === NULL) throw new RuntimeException('Column has no name');
$this->_columns[$this->typeConvert($columnObj->name, UUID::UUID_STR)] = $columnObj; $this->_columns[$this->typeConvert($columnObj->name, self::CONTEXT_STRING)] = $columnObj;
} }


/** /**
Expand Down Expand Up @@ -682,7 +721,7 @@ public function populate($data, $colAutoCreate = NULL) {
// Check depth, take first few keys as keyspace/columnfamily/key // Check depth, take first few keys as keyspace/columnfamily/key
foreach ($data as $idx => $column) { foreach ($data as $idx => $column) {
if ($column instanceof cassandra_Column) { if ($column instanceof cassandra_Column) {
$columnName = $this->typeConvert($column->name, UUID::UUID_STR); $columnName = $this->typeConvert($column->name, self::CONTEXT_STRING);


if ($this->getAutoCreate($colAutoCreate) ) { if ($this->getAutoCreate($colAutoCreate) ) {
$this->_columns[$columnName] = PandraColumn::cast($column, $this); $this->_columns[$columnName] = PandraColumn::cast($column, $this);
Expand All @@ -692,7 +731,7 @@ public function populate($data, $colAutoCreate = NULL) {


// circular dependency? // circular dependency?
} elseif ($column instanceof cassandra_ColumnOrSuperColumn && !empty($column->column)) { } elseif ($column instanceof cassandra_ColumnOrSuperColumn && !empty($column->column)) {
$columnName = $this->typeConvert($column->column->name, UUID::UUID_STR); $columnName = $this->typeConvert($column->column->name, self::CONTEXT_STRING);


if ($this->getAutoCreate($colAutoCreate) ) { if ($this->getAutoCreate($colAutoCreate) ) {
$this->_columns[$columnName] = PandraColumn::cast($column->column, $this); $this->_columns[$columnName] = PandraColumn::cast($column->column, $this);
Expand Down
72 changes: 39 additions & 33 deletions lib/Core.class.php
Expand Up @@ -52,9 +52,9 @@ class PandraCore {
static private $writeMode = self::MODE_RANDOM; static private $writeMode = self::MODE_RANDOM;


static private $_supportedReadConsistency = array( static private $_supportedReadConsistency = array(
cassandra_ConsistencyLevel::ONE, cassandra_ConsistencyLevel::ONE,
cassandra_ConsistencyLevel::QUORUM, cassandra_ConsistencyLevel::QUORUM,
cassandra_ConsistencyLevel::ALL cassandra_ConsistencyLevel::ALL
); );


/* @var supported modes for this core version */ /* @var supported modes for this core version */
Expand Down Expand Up @@ -227,6 +227,8 @@ static public function connect($connectionID, $host, $keySpace = self::DEFAULT_P
// Create Thrift transport and binary protocol cassandra client // Create Thrift transport and binary protocol cassandra client
$transport = new TBufferedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), 1024, 1024); $transport = new TBufferedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), 1024, 1024);


self::_authOpen($transport, $keySpace);

self::$_socketPool[$keySpace][$connectionID] = array( self::$_socketPool[$keySpace][$connectionID] = array(
'retries' => 0, 'retries' => 0,
'transport' => $transport, 'transport' => $transport,
Expand All @@ -244,7 +246,6 @@ function_exists("thrift_protocol_write_binary") ?
} }
} catch (TException $te) { } catch (TException $te) {
self::registerError('TException: '.$te->getMessage(), PandraLog::LOG_CRIT); self::registerError('TException: '.$te->getMessage(), PandraLog::LOG_CRIT);

} }


return FALSE; return FALSE;
Expand Down Expand Up @@ -316,53 +317,58 @@ static public function registerError($errorMsg, $priority = PandraLog::LOG_WARNI
static public function authKeyspace($keySpace, $username, $password) { static public function authKeyspace($keySpace, $username, $password) {
$auth = new cassandra_AuthenticationRequest(); $auth = new cassandra_AuthenticationRequest();
$auth->credentials = array ( $auth->credentials = array (
"username" => $username, "username" => $username,
"password" => $password "password" => $password
); );
self::$_ksAuth[$keySpace] = $auth; self::$_ksAuth[$keySpace] = $auth;
} }


/** /**
* Alias for connectBySeed (deprecated) * Alias for connectBySeed (deprecated)
*/ */
static public function auto($host, $keySpace = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) { static public function auto($hosts, $keySpace = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {
return self::connectSeededKeyspace($host, $keySpace, $port); return self::connectSeededKeyspace($hosts, $keySpace, $port);
} }


/** /**
* Given a single host, attempts to find other nodes in the cluster and attaches them * Given a single host, attempts to find other nodes in the cluster and attaches them
* to the pool * to the pool
* @todo build connections from token map * @todo build connections from token map
* @param string $host host name or IP of connecting node * @param mixed $hosts host name or IP of connecting node (or array thereof)
* @param string $keySpace name of the connection pool (cluster name - usually keyspace) * @param string $keySpace name of the connection pool (cluster name - usually keyspace)
* @param int $port TCP port of connecting node * @param int $port TCP port of connecting node
* @return bool connected ok * @return bool connected ok
*/ */
static public function connectSeededKeyspace($host, $keySpace = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) { static public function connectSeededKeyspace($hosts, $keySpace = self::DEFAULT_POOL_NAME, $port = THRIFT_PORT_DEFAULT) {


try { if (!is_array($hosts)) {
// Create Thrift transport and binary protocol cassandra client $hosts = (array) $hosts;
$transport = new TBufferedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), 1024, 1024); }
$transport->open();
$client = new CassandraClient( foreach ($hosts as $host) {
(function_exists("thrift_protocol_write_binary") ? try {
new TBinaryProtocolAccelerated($transport) : // Create Thrift transport and binary protocol cassandra client
new TBinaryProtocol($transport))); $transport = new TBufferedTransport(new TSocket($host, $port, PERSIST_CONNECTIONS, 'PandraCore::registerError'), 1024, 1024);

$transport->open();
// @todo this has been deprecated by 'describe_ring' 0.6.3 $client = new CassandraClient(
$tokenMap = $client->get_string_property('token map'); (function_exists("thrift_protocol_write_binary") ?
$transport->close(); new TBinaryProtocolAccelerated($transport) :
unset($transport); unset($client); new TBinaryProtocol($transport)));
$tokens = json_decode($tokenMap);
foreach ($tokens as $token => $host) { // @todo this has been deprecated by 'describe_ring' 0.6.3
if (!self::connect($token, $host, $keySpace)) { $tokenMap = $client->get_string_property('token map');
return FALSE; $transport->close();
unset($transport); unset($client);
$tokens = json_decode($tokenMap);
foreach ($tokens as $token => $host) {
if (!self::connect($token, $host, $keySpace)) {
return FALSE;
}
} }
return TRUE;
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
} }

return TRUE;
} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
} }
return FALSE; return FALSE;
} }
Expand Down Expand Up @@ -425,7 +431,7 @@ static public function getAPCAvailable() {
* @param bool $writeMode optional get the write mode client * @param bool $writeMode optional get the write mode client
* @param string $keySpace optional keyspace where auth has been defined * @param string $keySpace optional keyspace where auth has been defined
*/ */
static public function getClient($writeMode = FALSE, $keySpace = NULL) { static public function getClient($writeMode = FALSE, $keySpace = self::DEFAULT_POOL_NAME) {


// Catch trimmed nodes or a completely trimmed pool // Catch trimmed nodes or a completely trimmed pool
if (empty(self::$_activeNode) || empty(self::$_socketPool[self::$_activePool])) { if (empty(self::$_activeNode) || empty(self::$_socketPool[self::$_activePool])) {
Expand Down Expand Up @@ -463,7 +469,7 @@ static public function getClient($writeMode = FALSE, $keySpace = NULL) {


// check connection is open // check connection is open
try { try {
if (!self::$_socketPool[self::$_activePool][self::$_activeNode]['transport']->isOpen()) { if (!self::$_socketPool[self::$_activePool][self::$_activeNode]['transport']->isOpen()) {
self::_authOpen(self::$_socketPool[self::$_activePool][self::$_activeNode]['transport'], $keySpace); self::_authOpen(self::$_socketPool[self::$_activePool][self::$_activeNode]['transport'], $keySpace);
} }
return $conn; return $conn;
Expand Down
2 changes: 1 addition & 1 deletion lib/SuperColumnFamily.class.php
Expand Up @@ -64,7 +64,7 @@ public function addSuper(PandraSuperColumn $scObj) {
public function addColumn($superName, $containerType = NULL) { public function addColumn($superName, $containerType = NULL) {
if (!array_key_exists($superName, $this->_columns)) { if (!array_key_exists($superName, $this->_columns)) {
$this->_columns[$superName] = new PandraSuperColumn( $this->_columns[$superName] = new PandraSuperColumn(
$this->typeConvert($superName, UUID::UUID_BIN), $this->typeConvert($superName, self::CONTEXT_BIN),
$this->getKeyID(), $this->getKeyID(),
$this->getKeySpace(), $this->getKeySpace(),
$this, $this,
Expand Down

0 comments on commit 1bc509b

Please sign in to comment.