Skip to content

Commit

Permalink
Merge pull request #460 from andig/yield
Browse files Browse the repository at this point in the history
Implement php generators
  • Loading branch information
andig committed Sep 17, 2016
2 parents c816985 + 89c2531 commit 7917a90
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 285 deletions.
20 changes: 10 additions & 10 deletions .travis.yml
@@ -1,8 +1,6 @@
language: php

php:
- 5.4
- 5.5
- 5.6
- 7.0
- nightly
Expand All @@ -29,21 +27,21 @@ env:
matrix:
include:
# httpd-based
- php: 5.4
- php: 5.6
env: DB=mysql TEST_COMPONENT=HTTPD
# push-server
- php: 5.4
- php: 5.6
env: DB=mysql TEST_COMPONENT=PUSH_SERVER
# from..to
- php: 5.4
- php: 5.6
env: DB=mysql DEPENDENCIES=lowest
- php: 5.4
- php: 5.6
env: DB=mysql DEPENDENCIES=highest
# jslint-only
- php: 5.4
- php: 5.6
env: DB= DEPENDENCIES= JSLINT=true
# security
- php: 5.4
- php: 5.6
env: DB= DEPENDENCIES= SECURITY=true
# remove hhvm/pgsql
exclude:
Expand All @@ -54,6 +52,8 @@ matrix:
allow_failures:
- php: nightly
- php: hhvm
- php: 5.6
env: DB=mysql TEST_COMPONENT=HTTPD

notifications:
mail: "volkszaehler-dev@lists.volkszaehler.org"
Expand Down Expand Up @@ -85,8 +85,8 @@ install:
- |
if [ "$APC" = true ]; then
phpenv config-add ./test/bin/apc.ini
if php -r 'exit(version_compare(PHP_VERSION, "5.5", ">=")?0:1);'; then echo "no" | pecl install apcu-beta; fi
if php -r 'exit(function_exists("apc_store")?0:1);'; then echo "APC enabled"; else echo "APC disabled"; fi
if php -r 'exit(version_compare(PHP_VERSION, "5.5", ">=")?0:1);'; then echo "no" | pecl install apcu; fi
if php -r 'exit(function_exists("apcu_store")?0:1);'; then echo "APCu enabled"; else echo "APCu disabled"; fi
fi
before_script:
Expand Down
10 changes: 5 additions & 5 deletions composer.json
Expand Up @@ -11,13 +11,13 @@
"minimum-stability": "dev",
"prefer-stable": true,
"require": {
"php": ">=5.4.0",
"doctrine/orm": "^2.4",
"php": ">=5.6.0",
"doctrine/orm": "^2.5",
"doctrine/dbal": "^2.5",
"andig/dbcopy": "dev-master",
"symfony/console": "^2.6",
"symfony/http-kernel": "^2.6",
"symfony/http-foundation": "^2.6",
"symfony/console": "^2.6|^3.0",
"symfony/http-kernel": "^2.6|^3.0",
"symfony/http-foundation": "^2.6|^3.0",
"symfony/routing": "^2.6|^3.0",
"cboden/ratchet": "0.3.*",
"react/socket": "0.4.*",
Expand Down
4 changes: 4 additions & 0 deletions lib/Controller/DataController.php
Expand Up @@ -82,6 +82,10 @@ public function get($uuid) {
public function add($uuid) {
$channel = EntityController::factory($this->em, $uuid, true);

if (!$channel instanceof Model\Channel) {
throw new \Exception('Adding data is only supported for channels');
}

try { /* to parse new submission protocol */
$rawPost = $this->request->getContent(); // file_get_contents('php://input')

Expand Down
51 changes: 23 additions & 28 deletions lib/Interpreter/AccumulatorInterpreter.php
Expand Up @@ -39,45 +39,40 @@ class AccumulatorInterpreter extends Interpreter {
protected $valsum; // sum of delta values

/**
* Initialize data iterator
* Generate database tuples
*
* @return \Generator
*/
public function rewind() {
$this->key = 0;
public function getIterator() {
$this->rows = $this->getData();
$this->rows->rewind();

$this->valsum = 0;

// get starting value from skipped first row
if ($this->rowCount > 0) {
$this->ts_last = $this->getFrom();
$this->last_val = $this->rows->firstValue();
}
}

/**
* Iterate over result set
*/
public function current() {
$row = $this->rows->current();

// raw database values
if ($this->raw) {
return(array_slice($row, 0, 3));
foreach ($this->rows as $row) {
if ($this->raw) {
// raw database values
yield array_slice($row, 0, 3);
}
else {
$tuple = $this->convertRawTuple($row);
$this->valsum += $this->delta_val;

if (is_null($this->max) || $tuple[1] > $this->max[1]) {
$this->max = $tuple;
}

if (is_null($this->min) || $tuple[1] < $this->min[1]) {
$this->min = $tuple;
}

yield $tuple;
}
}

$tuple = $this->convertRawTuple($row);
$this->valsum += $this->delta_val;

if (is_null($this->max) || $tuple[1] > $this->max[1]) {
$this->max = $tuple;
}

if (is_null($this->min) || $tuple[1] < $this->min[1]) {
$this->min = $tuple;
}

return $tuple;
}

/**
Expand Down
94 changes: 6 additions & 88 deletions lib/Interpreter/AggregatorInterpreter.php
Expand Up @@ -30,20 +30,13 @@
/**
* Interpreter to aggregate child Channels or Aggregators
*
* The AggregatorInterpreter is used to aggregate multiple channels with the same
* indicator
* Placeholder only- aggregating child data is not implemented
*
* @author Steffen Vogel <info@steffenvogel.de>
* @author Andreas Goetz <cpuidle@gmx.de>
* @package default
*/
class AggregatorInterpreter extends Interpreter {
/**
* @var array of Interpreter
*/
protected $childrenInterpreter = array();

protected $aggregator;

/**
* Constructor
Expand All @@ -52,95 +45,20 @@ class AggregatorInterpreter extends Interpreter {
* @param ORM\EntityManager $em
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
* @todo handle channels in nested aggregators
* @todo handle child entities of different units
*/
public function __construct(Model\Aggregator $aggregator, ORM\EntityManager $em, $from, $to, $tupleCount = null, $groupBy = null) {
$this->aggregator = $aggregator;

foreach ($aggregator->getChildren() as $child) {
if ($child instanceof Model\Channel) {
$class = $child->getDefinition()->getInterpreter();
$this->childrenInterpreter[] = new $class($child, $em, $from, $to, $tupleCount, $groupBy);
}
}
}

/**
* Convert raw meter readings
*/
public function convertRawTuple($row) {
return null;
}

/*
* Iterator methods - not implemented
*/
public function rewind() {
}

public function current() {
}

public function valid() {
return false;
throw new \Exception('Getting data is not supported for groups');
}

/**
* Get total consumption of all channels - not implemented
* Generate database tuples
*/
public function getConsumption() {
public function getIterator() {
}

/**
* Just a passthrough to the channel interpreters
*
* @return array with the smallest value
*/
public function getMin() {
$min = null;
foreach ($this->childrenInterpreter as $interpreter) {
$arr = $interpreter->getMax();
if (! $min or $arr[1] < $min[1]) {
$min = $arr;
}
}
return $min;
}

/**
* Just a passthrough to the channel interpreters
*
* @return array with the biggest value
*/
public function getMax() {
$max = null;
foreach ($this->childrenInterpreter as $interpreter) {
$arr = $interpreter->getMax();
if (! $max or $arr[1] > $max[1]) {
$max = $arr;
}
}
return $max;
}

/**
* Just a passthrough to the channel interpreters
*
* @return float average value
* Convert raw meter readings
*/
public function getAverage() {
$sum = 0;
foreach ($this->childrenInterpreter as $interpreter) {
$sum += $interpreter->getAverage();
}
return (count($this->childrenInterpreter)) ? $sum / count($this->childrenInterpreter) : null;
public function convertRawTuple($row) {
}

/*
* Getter & setter
*/

public function getEntity() { return $this->aggregator; }
public function getChildrenInterpreter() { return $this->childrenInterpreter; }
}
68 changes: 23 additions & 45 deletions lib/Interpreter/DataIterator.php
Expand Up @@ -30,13 +30,9 @@
* @author Steffen Vogel <info@steffenvogel.de>
* @package default
*/
class DataIterator implements \Iterator, \Countable {
class DataIterator implements \IteratorAggregate, \Countable {
protected $stmt; // PDO statement

protected $current; // the current data
protected $key; // key
protected $rowKey; // internal key for PDO statement

protected $rowCount; // num of readings in PDOStatement
protected $tupleCount; // num of requested tuples
protected $packageSize; // num of rows we aggregate in each tuple
Expand Down Expand Up @@ -81,66 +77,48 @@ public function __construct(\PDOStatement $stmt, $rowCount, $tupleCount) {
}

/**
* Aggregate data
* @return next aggregated tuple
* @return \Generator
*/
public function next() {
$firstTimestamp = $this->lastTimestamp; // SensorInterpreter
public function getIterator() {
$this->lastTimestamp = $this->from;

$package = array(0, 0, 0, 0, 0);
for ($i = 0; $i < $this->packageSize && $tuple = $this->stmt->fetch(); $i++) {
$package[0] = $tuple[0]; // last timestamp of package will be used
$package[1] += $tuple[1];
$package[2] += $tuple[2];
do {
$firstTimestamp = $this->lastTimestamp; // SensorInterpreter

// special cases - auxilary information for specific interpreters
$package[3] = max($package[3], $tuple[1]); // AccumulatorInterpreter
$package[4] += $tuple[1] * ($tuple[0] - $this->lastTimestamp); // SensorInterpreter
$package = array(0, 0, 0, 0, 0);
for ($i = 0; $i < $this->packageSize && $tuple = $this->stmt->fetch(); $i++) {
$package[0] = $tuple[0]; // use timestamp of last package tuple as package timestamp
$package[1] += $tuple[1]; // ImpulseInterpreter
$package[2] += $tuple[2]; // number of rows in package
$package[3] = max($package[3], $tuple[1]); // AccumulatorInterpreter
$package[4] += $tuple[1] * ($tuple[0] - $this->lastTimestamp); // SensorInterpreter

$this->lastTimestamp = $tuple[0];
$this->rowKey++;
}
$this->lastTimestamp = $tuple[0];
}

$this->key++;
if ($package[2]) {
$this->to = $package[0];
$package[4] /= $this->lastTimestamp - $firstTimestamp; // weighed average for SensorInterpreter

if ($package[2]) {
$this->to = $package[0];
$package[4] /= $this->lastTimestamp - $firstTimestamp; // weighed average for SensorInterpreter
}
yield $package;
}
} while ($tuple !== false);

$this->current = $package;
$this->stmt->closeCursor();
}

/**
* Return first tuple's data
*/
public function firstValue() { return $this->firstValue; }

/**
* Rewind the iterator
*
* @internal Should only be called once: PDOStatement hasn't a rewind()
*/
public function rewind() {
$this->key = $this->rowKey = 0;
$this->lastTimestamp = $this->from;
$this->next(); // fetch first tuple
}

public function valid() {
return ($this->current[2] > 0); // current package contains at least 1 tuple
}

/**
/*
* Getter & setter
*/
public function getPackageSize() { return $this->packageSize; }
public function getFrom() { return $this->from; }
public function getTo() { return $this->to; }

public function count() { return $this->tupleCount; }
public function key() { return $this->key; }
public function current() { return $this->current; }
}

?>

0 comments on commit 7917a90

Please sign in to comment.