Skip to content

Commit

Permalink
deprecated batch_insert helper
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpearson committed Jul 27, 2010
1 parent c043cb1 commit 044daf1
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 83 deletions.
3 changes: 1 addition & 2 deletions lib/Column.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,7 @@ public function reset() {
* mutator, marks this column for deletion and sets modified
*/
public function delete() {
$this->_delete = TRUE;
$this->_modified = TRUE;
$this->_delete = $this->_modified = TRUE;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions lib/ColumnContainer.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,14 @@ protected function setModified($modified = TRUE) {
* @return void
*/
public function delete() {
$this->setDelete(TRUE);
$this->_delete = $this->_modified = TRUE;
foreach ($this->_columns as &$column) {
$column->delete();
}
}

/**
* mutator, marks this column for deletion and sets modified
* mutator, marks this column for deletion
* @param bool $delete
*/
protected function setDelete($delete) {
Expand Down
45 changes: 26 additions & 19 deletions lib/ColumnFamily.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ public function load($keyID = NULL, $consistencyLevel = NULL) {
array(
'column_family' => $this->getName())),
$predicate,
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);

// otherwise by defined columns (slice query)
} else {

$predicate->column_names = $this->getColumnNames();

$result = PandraCore::getCFSliceMulti(
$this->getKeySpace(),
array($keyID),
new cassandra_ColumnParent(
array(
'column_family' => $this->getName())),
$predicate,
PandraCore::getConsistency($consistencyLevel));
$this->getKeySpace(),
array($keyID),
new cassandra_ColumnParent(
array(
'column_family' => $this->getName())),
$predicate,
$consistencyLevel);

$result = $result[$keyID];
}
Expand Down Expand Up @@ -114,34 +114,41 @@ public function save($consistencyLevel = NULL) {
$this->getKeyID(),
$columnPath,
NULL,
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);

if (!$ok) $this->registerError(PandraCore::$lastError);

} else {

$modifiedColumns = $this->getModifiedColumns();
$deletions = array();
$selfKey = $this->getKeyID();
$selfName = $this->getName();

// build mutation
$map = array($this->getKeyID() => array($this->getName() => array()));
$map = array($selfKey => array($selfName => array()));

$ptr = &$map[$selfKey][$selfName];

$modifiedColumns = $this->getModifiedColumns();

// @todo - test delete mutate
foreach ($modifiedColumns as &$cObj) {
$cObj->bindTime();
$timestamp = $cObj->bindTime();

if ($cObj->isDeleted()) {
$p = new PandraSlicePredicate(PandraSlicePredicate::TYPE_COLUMNS, array($cObj->getName()));
$sd = new cassandra_Deletion(array('timestamp' => PandraCore::getTime(), 'predicate' => $p));
$m = new cassandra_Mutation(array('deletion' => $sc));
$deletions[] = $cObj->getName();
} else {
$sc = new cassandra_ColumnOrSuperColumn(array('column' => $cObj));
$m = new cassandra_Mutation(array('column_or_supercolumn' => $sc));
$ptr[] = new cassandra_Mutation(array('column_or_supercolumn' => $sc));
}
}

$map[$this->getKeyID()][$this->getName()][] = $m;
if (!empty($deletions)) {
$p = new PandraSlicePredicate(PandraSlicePredicate::TYPE_COLUMNS, $deletions);
$sd = new cassandra_Deletion(array('timestamp' => PandraCore::getTime(), 'predicate' => $p));
$ptr[] = new cassandra_Mutation(array('deletion' => $sd));
}

$ok = PandraCore::saveMutation($this->getKeySpace(), $map, $consistencyLevel);
$ok = PandraCore::batchMutate($this->getKeySpace(), $map, $consistencyLevel);
}
if ($ok) $this->reset();
}
Expand Down
53 changes: 8 additions & 45 deletions lib/Core.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,10 @@ static public function getConsistency($consistencyLevel = NULL, $readMode = FALS
// ** experimental!
if (self::$_autoDowngrades) {
$quorums = array(
cassandra_ConsistencyLevel::QUORUM,
cassandra_ConsistencyLevel::DCQUORUM,
cassandra_ConsistencyLevel::DCQUORUMSYNC
);
cassandra_ConsistencyLevel::QUORUM,
cassandra_ConsistencyLevel::DCQUORUM,
cassandra_ConsistencyLevel::DCQUORUMSYNC
);

if (in_array($consistency, $quorums) && count(self::$_socketPool[self::$_activePool]) < 2) {
$consistencyLevel == cassandra_ConsistencyLevel::ONE;
Expand Down Expand Up @@ -703,50 +703,13 @@ static public function saveColumnPath($keySpace,
}

/**
* Batch saves a SuperColumn and its modified Columns to Cassandra
*
* @param string $keySpace keyspace of key
* @param string $keyID row key id
* @param string $superCFName Super Column Family name
* @param array $superColumnMap array of Super Column name (string) keyed to array of modified cassandra_Columns
* @param array $mutation cassandra_Mutation struct
* @param int $consistencyLevel response consistency level
* @return bool Super Column saved OK
* @return bool mutate operation completed OK
*/
static public function saveSuperColumn($keySpace,
$keyID,
array $superCFName,
array $superColumnMap,
$consistencyLevel = NULL) {
try {
$client = self::getClient(TRUE, $keySpace);

$mutations = array();

foreach ($superCFName as $superColumnFamilyName) {

// Thrift won't batch insert multiple supercolumns?
$scContainer = new cassandra_ColumnOrSuperColumn();

foreach ($superColumnMap as $superColumnName => $columns) {
$scContainer->super_column = new cassandra_SuperColumn();
$scContainer->super_column->name = $superColumnName;
$scContainer->super_column->columns = $columns;
}

$mutations[$superColumnFamilyName] = array($scContainer);
}

// batch_insert inserts a supercolumn across multiple CF's for key
// @todo batch mutate
$client->batch_insert($keySpace, $keyID, $mutations, self::getConsistency($consistencyLevel));

} catch (TException $te) {
self::registerError( 'TException: '.$te->getMessage().' '.(isset($te->why) ? $te->why : ''));
return FALSE;
}
return TRUE;
}

public function saveMutation($keySpace, $mutation, $consistencyLevel = NULL) {
public function batchMutate($keySpace, $mutation, $consistencyLevel = NULL) {
try {
$client = self::getClient(TRUE, $keySpace);
$client->batch_mutate($keySpace, $mutation, self::getConsistency($consistencyLevel));
Expand Down
52 changes: 41 additions & 11 deletions lib/SuperColumn.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public function save($consistencyLevel = NULL) {
$ok = $this->pathOK();

if ($ok) {
if ($this->getDelete()) {
if ($this->isDeleted()) {

$columnPath = new cassandra_ColumnPath();
$columnPath->column_family = $this->getColumnFamilyName();
Expand All @@ -81,16 +81,47 @@ public function save($consistencyLevel = NULL) {
$this->getKeyID(),
$columnPath,
NULL,
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);
} else {

$deletions = array();
$insertions = array();

$selfKey = $this->getKeyID();
$selfName = $this->getName();
$selfCFName = $this->getColumnFamilyName();

// build mutation
$map = array($selfKey => array($selfCFName => array()));

$ptr = &$map[$selfKey][$selfCFName];

$this->bindTimeModifiedColumns();
$ok = PandraCore::saveSuperColumn(
$this->getKeySpace(),
$this->getKeyID(),
array($this->getColumnFamilyName()),
array($this->getName() => $this->getModifiedColumns()),
PandraCore::getConsistency($consistencyLevel));
$modifiedColumns = $this->getModifiedColumns();

if (!empty($modifiedColumns)) {
foreach ($modifiedColumns as &$cObj) {
if ($cObj->isDeleted()) {
$deletions[] = $cObj->getName();
} else {
$insertions[] = $cObj;
}
}

if (!empty($deletions)) {
$p = new PandraSlicePredicate(PandraSlicePredicate::TYPE_COLUMNS, $deletions);
$sd = new cassandra_Deletion(array('timestamp' => PandraCore::getTime(), 'predicate' => $p));
$ptr[] = new cassandra_Mutation(array('deletion' => $sd));
}

if (!empty($insertions)) {
$sc = new cassandra_SuperColumn(array('name' => $selfName, 'columns' => $insertions));
$csc = new cassandra_ColumnOrSuperColumn(array('super_column' => $sc));
$ptr[] = new cassandra_Mutation(array('column_or_supercolumn' => $csc));
}

$ok = PandraCore::batchMutate($this->getKeySpace(), $map, $consistencyLevel);
}
}

if ($ok) {
Expand Down Expand Up @@ -140,7 +171,7 @@ public function load($keyID = NULL, $consistencyLevel = NULL) {
'column_family' => $this->getColumnFamilyName(),
'super_column' => $this->getName())),
$predicate,
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);

// otherwise by defined columns (slice query)
} else {
Expand All @@ -155,7 +186,7 @@ public function load($keyID = NULL, $consistencyLevel = NULL) {
array(
'column_family' => $this->getColumnFamilyName(),
'super_column' => $this->getName())),
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);

$result = $result[$keyID];
}
Expand Down Expand Up @@ -278,6 +309,5 @@ public function _getName() {
}
return parent::getName();
}

}
?>
6 changes: 3 additions & 3 deletions lib/SuperColumnFamily.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public function save($consistencyLevel = NULL) {
$this->getKeyID(),
$columnPath,
NULL,
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);
if (!$ok) $this->registerError(PandraCore::$lastError);

} else {
Expand Down Expand Up @@ -171,7 +171,7 @@ public function load($keyID = NULL, $consistencyLevel = NULL) {
array(
'column_family' => $this->getName())),
$predicate,
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);

// otherwise by defined columns (slice query)
} else {
Expand All @@ -185,7 +185,7 @@ public function load($keyID = NULL, $consistencyLevel = NULL) {
array(
'column_family' => $this->getName())),
$predicate,
PandraCore::getConsistency($consistencyLevel));
$consistencyLevel);

$result = $result[$keyID];
}
Expand Down
4 changes: 3 additions & 1 deletion tests/lib/PandraSuperColumnTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class PandraSuperColumnTest extends PHPUnit_Framework_TestCase {
* @access protected
*/
protected function setUp() {
PandraCore::addLogger('STDOUT');

$this->superColumnFamily = new PandraSuperColumnFamily();
$this->superColumnFamily->setKeySpace('Keyspace1');
$this->superColumnFamily->setName('Super1');
Expand All @@ -42,7 +44,7 @@ protected function setUp() {
$this->obj->addColumn('street', 'string');
$this->obj->addColumn('zip', 'int');

PandraCore::connect('default', 'localhost');
PandraCore::connectSeededKeyspace(array('localhost'));
}

/**
Expand Down

0 comments on commit 044daf1

Please sign in to comment.