Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

- batching flags for column update/deletes, columns can also be saved…

…/deleted individually based on parent ColumnFamily container

- new validator type definitions (bool, float, enum).
- round robin and random connection calls in getClient. added round-robin apc stub
- read/write mode config (active, random, round)
- refactored ColumnFamily towards polymorphic SuperColun container (can wrap ColumnFamilies and Columns congruent to Cassandra spec)
	towards data structures :
				  Keyspace > Key > ColumnFamily > Column(s)
				  Keyspace > Key > ColumnFamily > SuperColumn(s) [Column(s)]  and/or > Column(s)
				  Keyspace > Key > SuperColumn(s) > ColumnFamily > Column(s)
  • Loading branch information...
commit 1ad0ba1b8a4cd59bb7f861b8541148e1570f4e31 1 parent 930c671
@mjpearson authored
View
13 config.php
@@ -18,7 +18,10 @@
The pandra homepage is :
http://www.phpgrease.net/projects/pandra
*/
-
+/**
+ *
+ * @package Pandra
+ */
$GLOBALS['THRIFT_ROOT'] = dirname(__FILE__).'/../thrift-php/';
require_once $GLOBALS['THRIFT_ROOT'].'/packages/cassandra/Cassandra.php';
@@ -27,10 +30,16 @@
require_once $GLOBALS['THRIFT_ROOT'].'/transport/TFramedTransport.php';
require_once $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php';
+// Default Thrift port
define('PANDRA_PORT_DEFAULT', 9160);
+// read/write modes (can be configured independently)
+define('PANDRA_MODE_ACTIVE', 0); // Active client only
+define('PANDRA_MODE_ROUND', 1); // sequentially select configured clients
+define('PANDRA_MODE_ROUND_APC', 1); // sequentially select between interpreter instances w/APC
+define('PANDRA_MODE_RANDOM', 2); // select random node
+
function pandraAutoLoad($className) {
-// $className = strtolower($className);
if (!preg_match("/^pandra/i", $className)) return;
View
87 lib/Column.class.php
@@ -1,18 +1,46 @@
<?php
-
+/**
+ *
+ * @package Pandra
+ */
class PandraColumn {
-
+
+ /* @var string column name */
+ public $name = NULL;
+
+ /* @var string column value */
+ public $value = NULL;
+
+ /* @var int last changed timestamp */
+ public $timestamp = NULL;
+
+ /* @var array validator type definitions for this colun */
public $typeDef = array();
- public $lastError = array();
+ /* @var string last processing error */
+ public $lastError = NULL;
+ /* @var string callback function for this column pre-save */
public $callback = NULL;
- public $modified = FALSE;
+ /* @var bool column value has been modified since load() or init */
+ private $_modified = FALSE;
- public $name = NULL;
+ /* @var $delete column is marked for deletion */
+ private $_delete = FALSE;
- public $value = NULL;
+ /* @var PandraColumnFamily column family parent reference */
+ private $_parentCF = NULL;
+
+ public function __construct($name, &$parentCF) {
+ $this->name = $name;
+ $this->_parentCF = $parentCF;
+ }
+
+ public function bindTime($timeOverride = NULL) {
+ $this->timestamp = ($timeOverride === NULL) ? time() : $timeOverride;
+ return $timestamp;
+ }
public function setValue($value, $validate = TRUE) {
if ($validate && !empty($this->typeDef)) {
@@ -22,13 +50,58 @@ public function setValue($value, $validate = TRUE) {
}
$this->value = $value;
- $this->modified = TRUE;
+ $this->_modified = TRUE;
return TRUE;
}
+ public function reset() {
+ $this->_modified = FALSE;
+ $this->_delete = FALSE;
+ }
+
+ public function markDelete() {
+ $this->_delete = TRUE;
+ $this->_modified = TRUE;
+ }
+
public function callbackValue() {
$value = '';
eval('$value = '.$cObj->callback.'("'.$cObj->value.'");');
return $value;
}
+
+ public function save() {
+
+ if (!$this->_modified) return FALSE;
+
+ // @todo configurable consistency
+ $consistencyLevel = cassandra_ConsistencyLevel::ONE;
+
+ $this->_parentCF->checkCFState();
+
+ $client = Pandra::getClient(TRUE);
+
+ // build the column path
+ $columnPath = new cassandra_ColumnPath();
+ $columnPath->column_family = $this->_parentCF->columnFamily;
+ $columnPath->column = $this->name;
+
+ // @todo super writes
+ if ($this->_parentSuper !== NULL) {
+// $columnPath->super_column = $this->_parentSuper->name;
+ }
+
+ if ($this->_delete) {
+ $client->remove($this->_parentCF->keySpace, $this->_parentCF->keyID, $columnPath, $this->bindTime(), $consistencyLevel);
+ } else {
+ $client->insert($this->_parentCF->keySpace, $this->_parentCF->keyID, $columnPath, $this->value, $this->bindTime(), $consistencyLevel);
+ }
+
+ return TRUE;
+ }
+
+ public function delete() {
+ $this->markDelete();
+ $this->save();
+ }
}
View
171 lib/ColumnFamily.class.php
@@ -1,29 +1,43 @@
-<?
+<?php
+/**
+ *
+ * @package Pandra
+ * @abstract
+ */
abstract class PandraColumnFamily {
- /* @var string magic set/get prefixes */
- private $_fieldPrefix = 'col'; // magic __get/__set column prefix
-
- /* @var bool flag indicates whether object was 'loaded' */
- private $_loaded = FALSE;
-
- /* @var array container for ptkField objects, indexed to field name */
- protected $columns = array();
-
/* @var string keyspace for this column family */
public $keySpace = NULL;
/* @var string child table name */
public $columnFamily = NULL;
+ /* @var string super column name for this columnfamily (supers may encapsulate columns and column families) */
+ public $superColumn = NULL;
+
/* @var mixed keyID key for the working row */
public $keyID = NULL;
- /* */
+ /* @var string magic set/get prefixes */
+ private $_cFieldPrefix = 'column_'; // magic __get/__set column prefix in column famliy
+
+ /* @var string magic set/get prefixes */
+ private $_sFieldPrefix = 'super_'; // magic __get/__set super prefix in column family
+
+ /* @var array container for column objects, indexed to field name */
+ protected $columns = array();
+
+ /* @var array container for supers (column container objects), indexed to supercolumn name */
+ protected $supers = array();
+
+ /* @var string last error encountered */
public $lastError = NULL;
- /* */
- public $errors = NULL;
+ /* @var array complete list of errors for this object instance */
+ public $errors = array();
+
+ /* var bool columnfamily marked for deletion */
+ private $_delete = FALSE;
/**
* Constructor, builds column structures
@@ -35,44 +49,37 @@ public function __construct($keyID = NULL) {
public function addColumn($colName, $typeDef = array(), $callbackOnSave = NULL) {
if (!array_key_exists($colName, $this->columns)) {
- $this->columns[$colName] = new PandraColumn();
- $this->columns[$colName]->name = $colName;
+ $this->columns[$colName] = new PandraColumn($colName, $this);
}
// array of validation functions
if (!empty($typeDef)) $this->columns[$colName]->typeDef = $typeDef;
- // function callback pre-save
+ // pre-save callback
if (!empty($callbackOnSave)) $this->columns[$colName]->callback = $callbackOnSave;
- return TRUE;
+ return $this->getColumn($colName);
}
- public function removeColumn($colName) {
+ public function getColumn($colName) {
if (array_key_exists($colName, $this->columns)) {
- unset ($this->columns[$colName]);
+ return $this->columns[$colName];
}
+ return NULL;
}
- public function addSuper($superName, $columnsIn = array()) {
- if (!empty($columnsIn)) {
- foreach ($columnsIn as $colName) {
- if (!array_key_exists($colName, $this->columns)) {
- // throw columnfamily exception ??
- }
- $this->columns[$colName]['super'] = $superName;
- }
+ public function addSuper($superName) {
+ if (!array_key_exists($superName, $this->supers)) {
+ $this->supers[$superName] = new PandraSuperColumn($superName);
}
+ return $this->getSuper($superName);
}
- /**
- * returns array of matching rows by $search on this table
- * @param array $search
- * @param int $limit
- * @return array
- */
- static protected function findByKey($value) {
- // @todo create a clone of this object, populate it with load data and return
+ public function getSuper($superName) {
+ if (array_key_exists($superName, $this->supers)) {
+ return $this->supers[$superName];
+ }
+ return NULL;
}
/**
@@ -83,12 +90,14 @@ static protected function findByKey($value) {
public function getRawSlice($keyID, $consistencyLevel = cassandra_ConsistencyLevel::ONE) {
$this->keyID = $keyID;
- $client = $this->_getClient();
+ $this->checkCFState();
+
+ $client = Pandra::getClient();
// build the column path
$columnParent = new cassandra_ColumnParent();
$columnParent->column_family = $this->columnFamily;
- $columnParent->super_column = NULL;
+ $columnParent->super_column = $this->superColumn;
$predicate = new cassandra_SlicePredicate();
$predicate->slice_range = new cassandra_SliceRange();
@@ -100,6 +109,7 @@ public function getRawSlice($keyID, $consistencyLevel = cassandra_ConsistencyLev
/**
* Loads a row by it's keyID (all supercolumns and columns)
+ * @todo super columns and slice loads
* @param mixed $value value of this rows primary key to load from
* @return bool this object has loaded its fields
*/
@@ -116,7 +126,11 @@ public function load($keyID, $colAutoCreate = FALSE, $consistencyLevel = cassand
return FALSE;
}
- private function _getClient() {
+ /**
+ * Check column family object state looks OK for writes
+ * @todo: make more dev friendly
+ */
+ public function checkCFState() {
// check a keyID is defined
if ($this->keyID === NULL) throw new RuntimeException('NULL keyID defined, cannot insert');
@@ -125,72 +139,49 @@ private function _getClient() {
// check a column family is defined
if ($this->columnFamily === NULL) throw new RuntimeException('NULL columnFamliy defined, cannot insert');
-
- return Pandra::getClient();
}
/**
- * Save a record, based on this objects field values
+ * Save all columns in this loaded columnfamily
* @return void
*/
public function save() {
- $client = $this->_getClient();
+ $this->checkCFState();
- // @todo configurable consistency level
- $consistencyLevel = cassandra_ConsistencyLevel::ONE;
+ if ($this->_delete === TRUE) {
+ $client = Pandra::getClient(TRUE);
- // build the column path
- $columnPath = new cassandra_ColumnPath();
- $columnPath->column_family = $this->columnFamily;
-
- foreach ($this->columns as $colName => $cObj) {
- if (!$cStruct['modified']) continue;
+ if ($columnPath === NULL) {
+ $columnPath = new cassandra_ColumnPath();
+ $columnPath->column_family = $this->columnFamily;
+ }
- $timestamp = time();
- $columnPath->column = $colName;
- $columnPath->super_column = null;
+ $client->remove($this->keySpace, $this->keyID, $columnPath, time(), $consistencyLevel);
+ }
- // handle saving callback
- $value = $cObj->value;
- if (!empty($cObj->callback)) {
- $value = $cObj->callbackValue();
- }
-
- // @todo supercolumn container
- $client->insert($this->keySpace, $this->keyID, $columnPath, $cStruct['value'], $timestamp, $consistencyLevel);
+ foreach ($this->columns as &$cObj) {
+ $cObj->save();
}
}
/**
* deletes the loaded object from the keyspace, or optionally the supplied columns for the key
*/
- public function delete(cassandra_ColumnPath $columnPath = NULL, $consistencyLevel = cassandra_ConsistencyLevel::ONE) {
-
- $client = $this->_getClient();
-
- if ($columnPath === NULL) {
- $columnPath = new cassandra_ColumnPath();
- $columnPath->column_family = $this->columnFamily;
- }
-
- $client->remove($this->keySpace, $this->keyID, $columnPath, time(), $consistencyLevel);
-
- return FALSE;
+ public function markDelete() {
+ $this->_delete = TRUE;
+ return;
}
- public function deleteColumn($column) {
- throw new RuntimeException('Not Implemented');
-
- // build column path
- $columnPath = new cassandra_ColumnPath();
- $columnPath->column_family = $this->columnFamily;
- $columnPath->column = $column;
+ public function unDelete() {
+ $this->_delete = FALSE;
+ return;
}
- public function deleteSuperColumn() {
- // delete columns in super column
- throw new RuntimeException('Not Implemented');
+ public function reset() {
+ foreach ($this->columns as &$cObj) {
+ $cObj->reset();
+ }
}
/*
@@ -204,7 +195,7 @@ protected function populate($data) {
foreach ($data as $key => $value) {
if (array_key_exists($key, $this->columns)) {
if (!$this->columns[$key]->setValue($value)) {
- $this->errors .= $this->columns[$key]->lastError;
+ $this->errors[] = $this->columns[$key]->lastError;
}
}
}
@@ -218,7 +209,7 @@ protected function populate($data) {
* @return bool field exists
*/
private function gsMutable(&$colName) {
- $colName = preg_replace("/^".$this->_fieldPrefix."/", "", strtolower($colName));
+ $colName = preg_replace("/^".$this->_cFieldPrefix."/", "", strtolower($colName));
return array_key_exists($colName, $this->columns);
}
@@ -257,18 +248,14 @@ protected function __set($colName, $value) {
*/
public function setColumn($colName, $value, $validate = TRUE) {
if ($this->gsMutable($colName)) {
- if (!$this->columns[$colName]->setValue($value, $validate)) {
- $this->errors .= $this->lastError = $this->columns[$colName]->lastError;
- return FALSE;
- }
- return TRUE;
+ if ($this->columns[$colName]->setValue($value, $validate)) return TRUE;
+ $this->errors[] = $this->lastError = $this->columns[$colName]->lastError;
}
return FALSE;
}
/**
- * constructFields builds ptkField objects
+ * constructFields builds column objects via addColumn/addSuper methods
*/
abstract public function constructColumns();
}
-?>
View
81 lib/Pandra.class.php
@@ -1,32 +1,52 @@
<?php
/**
- * @todo read/write switching, memcached hooks, logging hooks
+ * @package Pandra
*/
-
-// constants for determining results format
class Pandra {
- const PANDRA_OBJ = 1;
- const PANDRA_ASSOC = 2;
- const PANDRA_XML = 3;
- const PANDRA_JSON = 4;
+ const FORMAT_OBJ = 1;
+ const FORMAT_ASSOC = 2;
+ const FORMAT_XML = 3;
+ const FORMAT_JSON = 4;
+
+ //const APC_EXPIRE_SECONDS = 60;
static public $lastError = '';
+ static public $consistencyLevel = cassandra_ConsistencyLevel::ZERO;
static private $_nodeConns = array();
static private $_activeNode = NULL;
- static private $consistencyLevel = cassandra_ConsistencyLevel::ZERO;
- public function Query($keySpace, $columnFamily, $args, $format = PANDRA_OBJ) {
+ static private $readMode = PANDRA_MODE_ACTIVE;
+ static private $writeMode = PANDRA_MODE_ACTIVE;
+
+ static private $supportedModes = array(
+ PANDRA_MODE_ACTIVE,
+ PANDRA_MODE_ROUND,
+ //PANDRA_MODE_ROUND_APC,
+ PANDRA_MODE_RANDOM,
+ );
+
+ static public function setReadMode($newMode) {
+ if (!array_key_exists($newMode, self::$supportedModes)) throw new RuntimeExcpetion("Unsupported Read Mode");
+ self::$readMode = $newMode;
}
+ static public function setWriteMode($newMode) {
+ if (!array_key_exists($newMode, self::$supportedModes)) throw new RuntimeExcpetion("Unsupported Write Mode");
+ self::$writeMode = $newMode;
+ }
+
+
/**
*
*/
- public function setactiveNode($connectionID) {
- if (array_key_exists($connectionID, self::$_nodeConns)) {
+ public function setActiveNode($connectionID) {
+ if (array_key_exists($connectionID, self::$_nodeConns) && self::$_nodeConns[$connectionID]['transport']->isOpen()) {
self::$_activeNode = $connectionID;
+ return TRUE;
}
+ return FALSE;
}
/**
@@ -56,7 +76,7 @@ static public function disconnectAll() {
* @param int $port TCP port of connecting node
* @return bool connected ok
*/
- static public function connect($connectionID, $host, $port = PANDRA_PORT_DEFAULT) {
+ static public function connect($connectionID, $host = NULL, $port = PANDRA_PORT_DEFAULT) {
try {
// if the connection exists but it is closed, then re-open
if (array_key_exists($connectionID, self::$_nodeConns)) {
@@ -85,15 +105,46 @@ static public function connect($connectionID, $host, $port = PANDRA_PORT_DEFAULT
}
/**
- * get current working node
+ * get current working node, recursive, trims disconnected clients
*/
- static public function getClient() {
+ static public function getClient($writeMode = FALSE) {
if (empty(self::$_activeNode)) throw new Exception('Not Connected');
- return self::$_nodeConns[self::$_activeNode]['client'];
+ $useMode = ($writeMode) ? self::$writeMode : self::$readMode;
+ switch ($useMode) {
+ case PANDRA_MODE_ROUND_APC :
+ // @todo, APC store of activeNode
+ case PANDRA_MODE_ROUND :
+ if (!current(self::$_nodeConns)) reset(self::$_nodeConns);
+ $curConn = each(self::$_nodeConns);
+ self::$_activeNode = $curConn['key']; // store current working node
+ return self::$_nodeConns[self::$_activeNode]['client'];
+ break;
+ case PANDRA_MODE_RANDOM :
+ $randConn =& array_rand(self::$_nodeConns);
+ return $randConn['client'];
+ break;
+ case PANDRA_MODE_ACTIVE :
+ default :
+ return self::$_nodeConns[self::$_activeNode]['client'];
+ break;
+ }
}
static public function getKeyspace($keySpace) {
$client = self::getClient();
return $client->describe_keyspace($keySpace);
}
+
+ static public function toJSON(&$results) {
+ }
+
+ static public function toXML(&$results) {
+ }
+
+ static public function toSerialised(&$results) {
+ }
+
+ static public function toArray(&$results) {
+ }
+
}
View
7 lib/Slice.class.php
@@ -1,9 +1,13 @@
<?
/**
* Wrapper for a non-schema and non-validated slice (simple factory)
+ * Loads all columns/supers in a columnfamily for given $keyID
+ * @package Pandra
*/
class PandraSlice extends PandraColumnFamily {
+
+
public function constructColumns() { }
public function __construct($keySpace, $columnFamily, $keyID = NULL) {
@@ -15,6 +19,9 @@ public function __construct($keySpace, $columnFamily, $keyID = NULL) {
}
}
+ public function setSliceColumns($columns = array()) {
+ }
+
public function load($keyID, $consistencyLevel = cassandra_ConsistencyLevel::ONE) {
parent::load($keyID, TRUE, $consistencyLevel);
}
View
11 lib/SuperColumn.class.php
@@ -0,0 +1,11 @@
+<?php
+/**
+ * Container stub for SuperColumn
+ * @package pandra
+ */
+class PandraSuperColumn extends PandraColumnFamily {
+
+ public function constructColumns() {
+ }
+
+}
View
41 lib/Validator.class.php
@@ -1,14 +1,21 @@
-<?
+<?php
+/**
+ *
+ * @package Pandra
+ */
class PandraValidator {
- // primitives for which there is $this->validate() logic
+ // primitives for which there is self::check() logic
static public $_primitive = array(
'notempty',
'int',
+ 'float',
'numeric',
'string',
- 'maxlength',
- 'minlength',
+ 'bool',
+ 'maxlength', // =[length]
+ 'minlength', // =[length]
+ 'enum', // =[comma,delimitered,enumerates]
'email',
'url'
);
@@ -79,6 +86,11 @@ static public function check($value, $label, $typeDefs = array(), &$errorMsg = "
list($type, $args) = explode("=", $type);
}
+ if (!in_array($type, self::$_primitive)) {
+ throw new RuntimeException("undefined type definition ($type)\n");
+ }
+
+
// check for basic validator types
switch ($type) {
case 'notempty' :
@@ -94,21 +106,34 @@ static public function check($value, $label, $typeDefs = array(), &$errorMsg = "
if ($error && empty($errorMsg)) $errorMsg = "Invalid URL\n";
break;
case 'int' :
+ case 'float' :
case 'numeric' :
case 'string' :
- eval('$error != is_'.$type.'('.$value.')');
+ case 'bool' :
+ eval('$error != is_'.$type.'("'.$value.'");');
if ($error && empty($errorMsg)) $errorMsg = "Field error, expected ".$type."\n";
break;
case 'maxlength' :
if (empty($args)) throw new RuntimeException("type $type requires argument\n");
$error = (strlen($value) > $args);
- if ($error) $errorMsg .= "Maximum length exceeded ($label)";
+ if ($error) $errorMsg .= "Maximum length $args exceeded ($label)";
+ break;
+ case 'minlength' :
+ if (empty($args)) throw new RuntimeException("type $type requires argument\n");
+ $error = (strlen($value) < $args);
+ if ($error) $errorMsg .= "Minimum length $args unmet ($label)\n";
+ break;
+ case 'enum' :
+ if (empty($args)) throw new RuntimeException("type $type requires argument\n");
+ $enums = explode(",", $args);
+ $error = (!in_array($value, $enums));
+ if ($error) $errorMsg .= "Invalid Argument\n";
default :
- throw new RuntimeException("undefined type definition ($type)\n");
+ throw new RuntimeException("unhandled type definition ($type)\n");
break;
}
}
return !$error;
}
-}
+}
Please sign in to comment.
Something went wrong with that request. Please try again.