Skip to content

Commit

Permalink
Upgrade phpcassa
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastien Giroux committed Dec 26, 2012
1 parent 4a230d0 commit b6bffed
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 78 deletions.
108 changes: 99 additions & 9 deletions include/phpcassa/AbstractColumnFamily.php
Expand Up @@ -9,6 +9,7 @@


use phpcassa\Iterator\IndexedColumnFamilyIterator; use phpcassa\Iterator\IndexedColumnFamilyIterator;
use phpcassa\Iterator\RangeColumnFamilyIterator; use phpcassa\Iterator\RangeColumnFamilyIterator;
use phpcassa\Iterator\RangeTokenColumnFamilyIterator;


use phpcassa\Batch\CfMutator; use phpcassa\Batch\CfMutator;


Expand Down Expand Up @@ -429,7 +430,7 @@ public function get_count($key,


$cp = $this->create_column_parent(); $cp = $this->create_column_parent();
$slice = $this->create_slice_predicate( $slice = $this->create_slice_predicate(
$column_names, $column_slice, ColumnSlice::MAX_COUNT); $column_names, $column_slice, null, ColumnSlice::MAX_COUNT);
return $this->_get_count($key, $cp, $slice, $consistency_level); return $this->_get_count($key, $cp, $slice, $consistency_level);
} }


Expand All @@ -456,7 +457,7 @@ public function multiget_count($keys,


$cp = $this->create_column_parent(); $cp = $this->create_column_parent();
$slice = $this->create_slice_predicate( $slice = $this->create_slice_predicate(
$column_names, $column_slice, ColumnSlice::MAX_COUNT); $column_names, $column_slice, null, ColumnSlice::MAX_COUNT);


return $this->_multiget_count($keys, $cp, $slice, $consistency_level); return $this->_multiget_count($keys, $cp, $slice, $consistency_level);
} }
Expand Down Expand Up @@ -543,6 +544,73 @@ protected function _get_range($start, $finish, $count, $cp, $slice, $cl, $buffsz
$count, $cp, $slice, $this->rcl($cl)); $count, $cp, $slice, $this->rcl($cl));
} }






/**
* Get an iterator over a token range.
*
* Example usages of get_range_by_token() :
* 1. You can iterate part of the ring.
* This helps to start several processes,
* that scans the ring in parallel in fashion similar to Hadoop.
* Then full ring scan will take only 1 / <number of processes>
*
* 2. You can iterate "local" token range for each Cassandra node.
* You can start one process on each cassandra node,
* that iterates only on token range for this node.
* In this case you minimize the network traffic between nodes.
*
* 3. Combinations of the above.
*
* @param string $token_start fetch rows with a token >= this
* @param string $token_finish fetch rows with a token <= this
* @param int $row_count limit the number of rows returned to this amount
* @param \phpcassa\ColumnSlice a slice of columns to fetch, or null
* @param mixed[] $column_names limit the columns or super columns fetched to this list
* @param ConsistencyLevel $consistency_level affects the guaranteed
* number of nodes that must respond before the operation returns
* @param int $buffer_size When calling `get_range`, the intermediate results need
* to be buffered if we are fetching many rows, otherwise the Cassandra
* server will overallocate memory and fail. This is the size of
* that buffer in number of rows.
*
* @return phpcassa\Iterator\RangeColumnFamilyIterator
*/
public function get_range_by_token($token_start="",
$token_finish="",
$row_count=self::DEFAULT_ROW_COUNT,
$column_slice=null,
$column_names=null,
$consistency_level=null,
$buffer_size=null) {

$cp = $this->create_column_parent();
$slice = $this->create_slice_predicate($column_names, $column_slice);

return $this->_get_range_by_token($token_start, $token_finish, $row_count,
$cp, $slice, $consistency_level, $buffer_size);
}

protected function _get_range_by_token($tokenstart, $tokenfinish, $count, $cp, $slice, $cl, $buffsz) {

if ($buffsz == null)
$buffsz = $this->buffer_size;
if ($buffsz < 2) {
$ire = new InvalidRequestException();
$ire->message = 'buffer_size cannot be less than 2';
throw $ire;
}

return new RangeTokenColumnFamilyIterator($this, $buffsz, $tokenstart, $tokenfinish,
$count, $cp, $slice, $this->rcl($cl));
}





/** /**
* Fetch a set of rows from this column family based on an index clause. * Fetch a set of rows from this column family based on an index clause.
* *
Expand Down Expand Up @@ -636,19 +704,36 @@ public function batch_insert($rows, $timestamp=null, $ttl=null, $consistency_lev
if ($timestamp === null) if ($timestamp === null)
$timestamp = Clock::get_time(); $timestamp = Clock::get_time();


$arrayTTL = false;
if(is_array($ttl)){
$arrayTTL = true;
if(count($ttl) !== count($rows))
throw new UnexpectedValueException("ttl array size must match rows array size");
}

$cfmap = array(); $cfmap = array();
if ($this->insert_format == self::DICTIONARY_FORMAT) { if ($this->insert_format == self::DICTIONARY_FORMAT) {
foreach($rows as $key => $columns) { foreach($rows as $key => $columns) {
$packed_key = $this->pack_key($key, $handle_serialize=true); $packed_key = $this->pack_key($key, $handle_serialize=true);
if($arrayTTL){
$ttlRow = $ttl[$packed_key];
} else {
$ttlRow = $ttl;
}
$cfmap[$packed_key][$this->column_family] = $cfmap[$packed_key][$this->column_family] =
$this->make_mutation($columns, $timestamp, $ttl); $this->make_mutation($columns, $timestamp, $ttlRow);
} }
} else if ($this->insert_format == self::ARRAY_FORMAT) { } else if ($this->insert_format == self::ARRAY_FORMAT) {
foreach($rows as $row) { foreach($rows as $row) {
list($key, $columns) = $row; list($key, $columns) = $row;
$packed_key = $this->pack_key($key); $packed_key = $this->pack_key($key);
if($arrayTTL){
$ttlRow = $ttl[$packed_key];
} else {
$ttlRow = $ttl;
}
$cfmap[$packed_key][$this->column_family] = $cfmap[$packed_key][$this->column_family] =
$this->make_mutation($columns, $timestamp, $ttl); $this->make_mutation($columns, $timestamp, $ttlRow);
} }
} else { } else {
throw new UnexpectedValueException("Bad insert_format selected"); throw new UnexpectedValueException("Bad insert_format selected");
Expand Down Expand Up @@ -769,15 +854,20 @@ protected function wcl($write_consistency_level) {
return $write_consistency_level; return $write_consistency_level;
} }


protected function create_slice_predicate($column_names, protected function create_slice_predicate(
$column_names,
$column_slice, $column_slice,
$default_count=ColumnSlice::DEFAULT_COLUMN_COUNT) { $is_super=NULL,
$default_count=ColumnSlice::DEFAULT_COLUMN_COUNT)
{
if ($is_super === null)
$is_super = $this->is_super;


$predicate = new SlicePredicate(); $predicate = new SlicePredicate();
if ($column_names !== null) { if ($column_names !== null) {
$packed_cols = array(); $packed_cols = array();
foreach($column_names as $col) foreach($column_names as $col)
$packed_cols[] = $this->pack_name($col, $this->is_super); $packed_cols[] = $this->pack_name($col, $is_super);
$predicate->column_names = $packed_cols; $predicate->column_names = $packed_cols;
} else { } else {
if ($column_slice !== null) { if ($column_slice !== null) {
Expand All @@ -791,7 +881,7 @@ protected function create_slice_predicate($column_names,
$slice_end = self::SLICE_START; $slice_end = self::SLICE_START;


$slice_range->start = $this->pack_name( $slice_range->start = $this->pack_name(
$column_start, $this->is_super, $slice_end); $column_start, $is_super, $slice_end);
} else { } else {
$slice_range->start = ''; $slice_range->start = '';
} }
Expand All @@ -804,7 +894,7 @@ protected function create_slice_predicate($column_names,
$slice_end = self::SLICE_FINISH; $slice_end = self::SLICE_FINISH;


$slice_range->finish = $this->pack_name( $slice_range->finish = $this->pack_name(
$column_finish, $this->is_super, $slice_end); $column_finish, $is_super, $slice_end);
} else { } else {
$slice_range->finish = ''; $slice_range->finish = '';
} }
Expand Down
60 changes: 60 additions & 0 deletions include/phpcassa/Batch/AbstractMutator.php
@@ -0,0 +1,60 @@
<?php

namespace phpcassa\Batch;

/**
* Common methods shared by CfMutator and Mutator classes
*/
abstract class AbstractMutator
{
protected $pool;
protected $buffer = array();
protected $cl;

/**
* Send all buffered mutations.
*
* If an error occurs, the buffer will be preserverd, allowing you to
* attempt to call send() again later or take other recovery actions.
*
* @param cassandra\ConsistencyLevel $consistency_level optional
* override for the mutator's default consistency level
*/
public function send($consistency_level=null) {
if ($consistency_level === null)
$wcl = $this->cl;
else
$wcl = $consistency_level;

$mutations = array();
foreach ($this->buffer as $mut_set) {
list($key, $cf, $cols) = $mut_set;

if (isset($mutations[$key])) {
$key_muts = $mutations[$key];
} else {
$key_muts = array();
}

if (isset($key_muts[$cf])) {
$cf_muts = $key_muts[$cf];
} else {
$cf_muts = array();
}

$cf_muts = array_merge($cf_muts, $cols);
$key_muts[$cf] = $cf_muts;
$mutations[$key] = $key_muts;
}

if (!empty($mutations)) {
$this->pool->call('batch_mutate', $mutations, $wcl);
}
$this->buffer = array();
}

protected function enqueue($key, $cf, $mutations) {
$mut = array($key, $cf->column_family, $mutations);
$this->buffer[] = $mut;
}
}
12 changes: 7 additions & 5 deletions include/phpcassa/Batch/CfMutator.php
@@ -1,4 +1,4 @@
<? <?php
namespace phpcassa\Batch; namespace phpcassa\Batch;


use phpcassa\Batch\Mutator; use phpcassa\Batch\Mutator;
Expand All @@ -9,9 +9,10 @@
* *
* @package phpcassa\Batch * @package phpcassa\Batch
*/ */
class CfMutator extends Mutator { class CfMutator extends AbstractMutator {


protected $cf; protected $cf;
protected $mutatorInstance;


/** /**
* Initialize a mutator for a given column family. * Initialize a mutator for a given column family.
Expand All @@ -29,7 +30,8 @@ public function __construct($column_family, $write_consistency_level=null) {
$wcl = $column_family->write_consistency_level; $wcl = $column_family->write_consistency_level;
else else
$wcl = $write_consistency_level; $wcl = $write_consistency_level;
parent::__construct($column_family->pool, $wcl);
$this->mutatorInstance = new Mutator($column_family->pool, $wcl);
} }


/** /**
Expand All @@ -43,7 +45,7 @@ public function __construct($column_family, $write_consistency_level=null) {
* @param int $ttl a TTL to apply to all columns inserted here * @param int $ttl a TTL to apply to all columns inserted here
*/ */
public function insert($key, $columns, $timestamp=null, $ttl=null) { public function insert($key, $columns, $timestamp=null, $ttl=null) {
return parent::insert($this->cf, $key, $columns, $timestamp, $ttl); return $this->mutatorInstance->insert($this->cf, $key, $columns, $timestamp, $ttl);
} }


/** /**
Expand All @@ -57,6 +59,6 @@ public function insert($key, $columns, $timestamp=null, $ttl=null) {
* this function is called, not when send() is called) * this function is called, not when send() is called)
*/ */
public function remove($key, $columns=null, $super_column=null, $timestamp=null) { public function remove($key, $columns=null, $super_column=null, $timestamp=null) {
return parent::remove($this->cf, $key, $columns, $super_column, $timestamp); return $this->mutatorInstance->remove($this->cf, $key, $columns, $super_column, $timestamp);
} }
} }
55 changes: 2 additions & 53 deletions include/phpcassa/Batch/Mutator.php
@@ -1,4 +1,4 @@
<? <?php
namespace phpcassa\Batch; namespace phpcassa\Batch;


use phpcassa\Util\Clock; use phpcassa\Util\Clock;
Expand All @@ -14,12 +14,8 @@
* *
* @package phpcassa\Batch * @package phpcassa\Batch
*/ */
class Mutator class Mutator extends AbstractMutator
{ {
protected $pool;
protected $buffer;
protected $cl;

/** /**
* Intialize a mutator with a connection pool and consistency level. * Intialize a mutator with a connection pool and consistency level.
* *
Expand All @@ -36,53 +32,6 @@ public function __construct($pool,
$this->cl = $consistency_level; $this->cl = $consistency_level;
} }


protected function enqueue($key, $cf, $mutations) {
$mut = array($key, $cf->column_family, $mutations);
$this->buffer[] = $mut;
}

/**
* Send all buffered mutations.
*
* If an error occurs, the buffer will be preserverd, allowing you to
* attempt to call send() again later or take other recovery actions.
*
* @param cassandra\ConsistencyLevel $consistency_level optional
* override for the mutator's default consistency level
*/
public function send($consistency_level=null) {
if ($consistency_level === null)
$wcl = $this->cl;
else
$wcl = $consistency_level;

$mutations = array();
foreach ($this->buffer as $mut_set) {
list($key, $cf, $cols) = $mut_set;

if (isset($mutations[$key])) {
$key_muts = $mutations[$key];
} else {
$key_muts = array();
}

if (isset($key_muts[$cf])) {
$cf_muts = $key_muts[$cf];
} else {
$cf_muts = array();
}

$cf_muts = array_merge($cf_muts, $cols);
$key_muts[$cf] = $cf_muts;
$mutations[$key] = $key_muts;
}

if (!empty($mutations)) {
$this->pool->call('batch_mutate', $mutations, $wcl);
}
$this->buffer = array();
}

/** /**
* Add an insertion to the buffer. * Add an insertion to the buffer.
* *
Expand Down
11 changes: 9 additions & 2 deletions include/phpcassa/Connection/ConnectionPool.php
Expand Up @@ -292,9 +292,16 @@ protected function handle_conn_failure($conn, $f, $exc, $retry_count) {
} }


/** /**
* This method called every time an error is logged. By default, it will
* call the PHP builtin function error_log() with a messageType of 0. To
* change this behavior, you can create a subclass and override this
* method.
*
* Note that PHP has strange logging behavior. In particular, if you are
* running the PHP cli and you haven't set a directive for error_log in
* your php.ini, this will log to stdout even if you've called
* error_reporting(0), which is supposed to suppress all logging.
* *
* Extracing error log function call so that writing to the error log
* can be over written.
* @param string $errorMsg * @param string $errorMsg
* @param int $messageType * @param int $messageType
*/ */
Expand Down

0 comments on commit b6bffed

Please sign in to comment.