Browse files

new Pool class

Pool test
Pool example
remove ::run requirement from Stackable
improve logic in thread routine to call ::run
be delicious!
  • Loading branch information...
1 parent e75e7ea commit 39e04525dc9d12208cda5655bcd4a97418a0dcd5 @krakjoe committed Mar 5, 2014
Showing with 591 additions and 112 deletions.
  1. +320 −0 classes/pool.h
  2. +6 −1 classes/stackable.h
  3. +68 −0 examples/NewPool.php
  4. +3 −0 package.xml
  5. +17 −0 php_pthreads.c
  6. +4 −0 php_pthreads.h
  7. +1 −1 src/handlers.c
  8. +63 −110 src/object.c
  9. +109 −0 tests/pools.phpt
View
320 classes/pool.h
@@ -0,0 +1,320 @@
+/*
+ +----------------------------------------------------------------------+
+ | pthreads |
+ +----------------------------------------------------------------------+
+ | Copyright (c) Joe Watkins 2012 |
+ +----------------------------------------------------------------------+
+ | This source file is subject to version 3.01 of the PHP license, |
+ | that is bundled with this package in the file LICENSE, and is |
+ | available through the world-wide-web at the following url: |
+ | http://www.php.net/license/3_01.txt |
+ | If you did not receive a copy of the PHP license and are unable to |
+ | obtain it through the world-wide-web, please send a note to |
+ | license@php.net so we can mail you a copy immediately. |
+ +----------------------------------------------------------------------+
+ | Author: Joe Watkins <joe.watkins@live.co.uk> |
+ +----------------------------------------------------------------------+
+ */
+#ifndef HAVE_PTHREADS_CLASS_POOL_H
+#define HAVE_PTHREADS_CLASS_POOL_H
+PHP_METHOD(Pool, __construct);
+PHP_METHOD(Pool, resize);
+PHP_METHOD(Pool, submit);
+PHP_METHOD(Pool, collect);
+PHP_METHOD(Pool, shutdown);
+PHP_METHOD(Pool, __destruct);
+
+ZEND_BEGIN_ARG_INFO_EX(Pool___construct, 0, 0, 2)
+ ZEND_ARG_INFO(0, size)
+ ZEND_ARG_INFO(0, class)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(Pool_resize, 0, 0, 1)
+ ZEND_ARG_INFO(0, size)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(Pool_submit, 0, 0, 1)
+ ZEND_ARG_INFO(0, task)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(Pool_collect, 0, 0, 1)
+ ZEND_ARG_INFO(0, callable)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(Pool_noargs, 0, 0, 0)
+ZEND_END_ARG_INFO()
+
+extern zend_function_entry pthreads_pool_methods[];
+#else
+# ifndef HAVE_PTHREADS_CLASS_POOL
+# define HAVE_PTHREADS_CLASS_POOL
+zend_function_entry pthreads_pool_methods[] = {
+ PHP_ME(Pool, __construct, Pool___construct, ZEND_ACC_PUBLIC)
+ PHP_ME(Pool, resize, Pool_resize, ZEND_ACC_PUBLIC)
+ PHP_ME(Pool, submit, Pool_submit, ZEND_ACC_PUBLIC)
+ PHP_ME(Pool, collect, Pool_collect, ZEND_ACC_PUBLIC)
+ PHP_ME(Pool, shutdown, Pool_noargs, ZEND_ACC_PUBLIC)
+ PHP_ME(Pool, __destruct, Pool_noargs, ZEND_ACC_PUBLIC)
+ {NULL, NULL, NULL}
+};
+
+/* {{{ proto Pool Pool::__construct(integer size, class worker [, array $ctor])
+ Construct a pool ready to create a maximum of $size workers of class $worker
+ $ctor will be used as arguments to constructor when spawning workers */
+PHP_METHOD(Pool, __construct)
+{
+ long size = 0;
+ zend_class_entry *clazz = NULL;
+ zval *ctor = NULL;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lC|a", &size, &clazz, &ctor) != SUCCESS) {
+ return;
+ }
+
+ zend_update_property_long(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("size"), size TSRMLS_CC);
+ zend_update_property_stringl(
+ Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("class"), clazz->name, clazz->name_length TSRMLS_CC);
+ if (ctor)
+ zend_update_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("ctor"), ctor TSRMLS_CC);
+} /* }}} */
+
+/* {{{ proto void Pool::resize(integer size)
+ Resize the pool to the given number of workers, if the pool size is being reduced
+ then the last workers started will be shutdown until the pool is the requested size */
+PHP_METHOD(Pool, resize) {
+ long newsize = 0;
+ zval *workers = NULL;
+ zval *size = NULL;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &newsize) != SUCCESS) {
+ return;
+ }
+
+ workers = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("workers"), 1 TSRMLS_CC);
+ size = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("size"), 1 TSRMLS_CC);
+
+ if (Z_TYPE_P(workers) == IS_ARRAY &&
+ newsize < zend_hash_num_elements(Z_ARRVAL_P(workers))) {
+ do {
+ zval **worker = NULL;
+ long top = zend_hash_num_elements(Z_ARRVAL_P(workers));
+
+ if (zend_hash_index_find(
+ Z_ARRVAL_P(workers), top-1, (void**)&worker) == SUCCESS) {
+ zend_call_method(
+ worker, Z_OBJCE_PP(worker), NULL, ZEND_STRL("shutdown"), NULL, 0, NULL, NULL TSRMLS_CC);
+
+ }
+
+ zend_hash_index_del(Z_ARRVAL_P(workers), top-1);
+ } while (zend_hash_num_elements(Z_ARRVAL_P(workers)) != newsize);
+ }
+
+ ZVAL_LONG(size, newsize);
+} /* }}} */
+
+/* {{{ proto bool Pool::submit(Stackable task)
+ Will submit the given task to the next worker in the pool, by default workers are selected round robin */
+PHP_METHOD(Pool, submit) {
+ zval *task = NULL;
+ zval *last = NULL;
+ zval *size = NULL;
+ zval *workers = NULL;
+ zval *worker = NULL;
+ zval *clazz = NULL;
+ zval *ctor = NULL;
+ zval *work = NULL;
+ zval **working = NULL;
+ zval **selected = NULL;
+
+ zend_class_entry **ce = NULL;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O", &task, pthreads_stackable_entry) != SUCCESS) {
+ return;
+ }
+
+ last = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("last"), 1 TSRMLS_CC);
+ size = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("size"), 1 TSRMLS_CC);
+ workers = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("workers"), 1 TSRMLS_CC);
+ work = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("work"), 1 TSRMLS_CC);
+
+ if (Z_TYPE_P(workers) != IS_ARRAY)
+ array_init(workers);
+
+ if (Z_TYPE_P(work ) != IS_ARRAY)
+ array_init(work);
+
+ if (Z_LVAL_P(last) >= Z_LVAL_P(size))
+ ZVAL_LONG(last, 0);
+
+ if (zend_hash_index_find(Z_ARRVAL_P(workers), Z_LVAL_P(last), (void**)&selected) != SUCCESS) {
+ MAKE_STD_ZVAL(worker);
+
+ clazz = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("class"), 1 TSRMLS_CC);
+ ctor = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("ctor"), 1 TSRMLS_CC);
+
+ zend_lookup_class(
+ Z_STRVAL_P(clazz), Z_STRLEN_P(clazz), &ce TSRMLS_CC);
+
+ object_init_ex(worker, *ce);
+
+ {
+ zend_class_entry *scope = EG(scope);
+ zend_function *constructor = NULL;
+ zval *retval = NULL;
+
+ EG(scope) = *ce;
+ constructor = Z_OBJ_HT_P(worker)->get_constructor(worker TSRMLS_CC);
+ EG(scope) = scope;
+
+ if (constructor) {
+ zend_fcall_info fci;
+ zend_fcall_info_cache fcc;
+
+ memset(&fci, 0, sizeof(zend_fcall_info));
+ memset(&fcc, 0, sizeof(zend_fcall_info));
+
+ fci.size = sizeof(zend_fcall_info);
+ fci.function_table = EG(function_table);
+ fci.object_ptr = worker;
+ fci.retval_ptr_ptr = &retval;
+ fci.no_separation = 1;
+
+ fcc.initialized = 1;
+ fcc.function_handler = constructor;
+ fcc.calling_scope = EG(scope);
+ fcc.called_scope = Z_OBJCE_P(worker);
+ fcc.object_ptr = worker;
+
+ if (ctor)
+ zend_fcall_info_args(&fci, ctor TSRMLS_CC);
+
+ zend_call_function(&fci, &fcc TSRMLS_CC);
+
+ if (ctor)
+ zend_fcall_info_args_clear(&fci, 1);
+
+ if (retval)
+ zval_ptr_dtor(&retval);
+ }
+
+ zend_call_method(&worker, Z_OBJCE_P(worker), NULL, ZEND_STRL("start"), NULL, 0, NULL, NULL TSRMLS_CC);
+ }
+
+ zend_hash_index_update(
+ Z_ARRVAL_P(workers), Z_LVAL_P(last),
+ (void**)&worker, sizeof(zval*), (void**)&selected);
+ Z_OBJ_HT_P(worker)->add_ref(worker TSRMLS_CC);
+ }
+
+ zend_hash_next_index_insert(
+ Z_ARRVAL_P(work), (void**) &task, sizeof(zval*), (void**)&working);
+ Z_SET_ISREF_P(task);
+ Z_ADDREF_P(task);
+
+ zend_call_method(selected, Z_OBJCE_PP(selected), NULL, ZEND_STRL("stack"), &return_value, 1, task, NULL TSRMLS_CC);
+ Z_LVAL_P(last)++;
+
+} /* }}} */
+
+/* {{{ proto void Pool::collect(Callable collector)
+ Shall execute the collector on each of the tasks in the working set
+ removing the task if the collector returns positively
+ the collector should be a function accepting a single task */
+PHP_METHOD(Pool, collect) {
+ zend_fcall_info fci;
+ zend_fcall_info_cache fcc;
+ zval *work = NULL;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", &fci, &fcc) != SUCCESS) {
+ return;
+ }
+
+ work = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("work"), 1 TSRMLS_CC);
+
+ if (Z_TYPE_P(work) == IS_ARRAY && zend_hash_num_elements(Z_ARRVAL_P(work))) {
+ HashPosition position;
+ zval **task = NULL;
+
+ for (zend_hash_internal_pointer_reset_ex(Z_ARRVAL_P(work), &position);
+ zend_hash_get_current_data_ex(Z_ARRVAL_P(work), (void**)&task, &position) == SUCCESS;
+ zend_hash_move_forward_ex(Z_ARRVAL_P(work), &position)) {
+ zval *remove = NULL;
+ zend_ulong index = 0L;
+
+ fci.retval_ptr_ptr = &remove;
+
+ zend_fcall_info_argn(&fci TSRMLS_CC, 1, task);
+
+ if (zend_call_function(&fci, &fcc TSRMLS_CC) == SUCCESS) {
+
+ if (remove) {
+ convert_to_boolean(remove);
+
+ if (Z_BVAL_P(remove)) {
+ if (zend_hash_get_current_key_ex(Z_ARRVAL_P(work), NULL, NULL, &index, 0, &position) != HASH_KEY_IS_STRING) {
+ zend_hash_index_del(
+ Z_ARRVAL_P(work), index);
+ }
+ }
+ }
+ }
+
+ zend_fcall_info_argn(&fci TSRMLS_CC, 0);
+
+ if (remove)
+ zval_ptr_dtor(&remove);
+ }
+ }
+} /* }}} */
+
+/* {{{ */
+static inline int pthreads_pool_shutdown(void *data TSRMLS_DC) {
+ zval **worker = (zval**) data;
+
+ zend_call_method(
+ worker, Z_OBJCE_PP(worker), NULL, ZEND_STRL("shutdown"), NULL, 0, NULL, NULL TSRMLS_CC);
+
+ return ZEND_HASH_APPLY_REMOVE;
+} /* }}} */
+
+/* {{{ proto void Pool::shutdown(void)
+ Will cause all the workers to finish executing their stacks and shutdown */
+PHP_METHOD(Pool, shutdown) {
+ zval *workers = NULL;
+
+ if (zend_parse_parameters_none() != SUCCESS) {
+ return;
+ }
+
+ workers = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("workers"), 1 TSRMLS_CC);
+
+ if (Z_TYPE_P(workers) == IS_ARRAY && zend_hash_num_elements(Z_ARRVAL_P(workers))) {
+ zend_hash_apply(Z_ARRVAL_P(workers), pthreads_pool_shutdown TSRMLS_CC);
+ }
+
+ zend_hash_clean(Z_ARRVAL_P(workers));
+} /* }}} */
+
+/* {{{ proto void Pool::__destruct()
+ Will shutdown all workers and destroy all references held to work */
+PHP_METHOD(Pool, __destruct) {
+ zval *workers = NULL;
+ zval *work = NULL;
+
+ if (zend_parse_parameters_none() != SUCCESS) {
+ return;
+ }
+
+ workers = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("workers"), 1 TSRMLS_CC);
+ work = zend_read_property(Z_OBJCE_P(getThis()), getThis(), ZEND_STRL("work"), 1 TSRMLS_CC);
+
+ if (Z_TYPE_P(workers) == IS_ARRAY && zend_hash_num_elements(Z_ARRVAL_P(workers))) {
+ zend_hash_apply(Z_ARRVAL_P(workers), pthreads_pool_shutdown TSRMLS_CC);
+ }
+
+ zend_hash_clean(Z_ARRVAL_P(workers));
+ zend_hash_clean(Z_ARRVAL_P(work));
+} /* }}} */
+# endif
+#endif
View
7 classes/stackable.h
@@ -17,6 +17,7 @@
*/
#ifndef HAVE_PTHREADS_CLASS_STACKABLE_H
#define HAVE_PTHREADS_CLASS_STACKABLE_H
+PHP_METHOD(Stackable, run);
PHP_METHOD(Stackable, wait);
PHP_METHOD(Stackable, notify);
PHP_METHOD(Stackable, isRunning);
@@ -78,7 +79,7 @@ extern zend_function_entry pthreads_stackable_methods[];
# ifndef HAVE_PTHREADS_CLASS_STACKABLE
# define HAVE_PTHREADS_CLASS_STACKABLE
zend_function_entry pthreads_stackable_methods[] = {
- PHP_ABSTRACT_ME(Stackable, run, Stackable_run)
+ PHP_ME(Stackable, run, Stackable_run, ZEND_ACC_PUBLIC)
PHP_ME(Stackable, wait, Stackable_wait, ZEND_ACC_PUBLIC|ZEND_ACC_FINAL)
PHP_ME(Stackable, notify, Stackable_notify, ZEND_ACC_PUBLIC|ZEND_ACC_FINAL)
PHP_ME(Stackable, isRunning, Stackable_isRunning, ZEND_ACC_PUBLIC|ZEND_ACC_FINAL)
@@ -94,6 +95,10 @@ zend_function_entry pthreads_stackable_methods[] = {
PHP_ME(Stackable, pop, Stackable_pop, ZEND_ACC_PUBLIC|ZEND_ACC_FINAL)
{NULL, NULL, NULL}
};
+
+/* {{{ */
+PHP_METHOD(Stackable, run) {} /* }}} */
+
/* {{{ proto boolean Stackable::wait([long timeout])
Will cause the calling thread to wait for notification from the referenced object
When a timeout is used and reached boolean false will return
View
68 examples/NewPool.php
@@ -0,0 +1,68 @@
+<?php
+
+class WebWorker extends Worker {
+
+ public function __construct(SafeLog $logger) {
+ $this->logger = $logger;
+ }
+
+ public function run(){}
+
+ protected $loger;
+}
+
+class WebWork extends Stackable {
+
+ public function isComplete() {
+ return $this->complete;
+ }
+
+ public function run() {
+ $this->worker
+ ->logger
+ ->log("%s executing in Thread #%lu",
+ __CLASS__, $this->worker->getThreadId());
+ $this->complete = true;
+ }
+
+ protected $complete;
+}
+
+class SafeLog extends Stackable {
+
+ protected function log($message, $args = []) {
+ $args = func_get_args();
+
+ if (($message = array_shift($args))) {
+ echo vsprintf(
+ "{$message}\n", $args);
+ }
+ }
+
+ public function run(){}
+}
+
+$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
+
+$pool->submit($w=new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->shutdown();
+
+$pool->collect(function($work){
+ return $work->isComplete();
+});
+
+var_dump($pool, $w);
+?>
View
3 package.xml
@@ -36,6 +36,7 @@
<file name="examples/KeepAliveSession.php" role="doc" />
<file name="examples/Mutexes.php" role="doc" />
<file name="examples/NewSynchronization.php" role="doc" />
+ <file name="examples/NewPool.php" role="doc" />
<file name="examples/Notifications.php" role="doc" />
<file name="examples/ObjectsAsParameters.php" role="doc" />
<file name="examples/Objects.php" role="doc" />
@@ -79,6 +80,7 @@
<file name="tests/normal-reads.phpt" role="test" />
<file name="tests/null-member-crash.phpt" role="test" />
<file name="tests/oomethods.phpt" role="test" />
+ <file name="tests/pools.phpt" role="test" />
<file name="tests/selective-inheritance.phpt" role="test" />
<file name="tests/shift-pop.phpt" role="test" />
<file name="tests/shutdown-handler.phpt" role="test" />
@@ -121,6 +123,7 @@
<file name="classes/stackable.h" role="src" />
<file name="classes/mutex.h" role="src" />
<file name="classes/cond.h" role="src" />
+ <file name="classes/pool.h" role="src" />
<file name="iterators/iterator.h" role="src" />
<file name="iterators/default.h" role="src" />
<file name="config.m4" role="src" />
View
17 php_pthreads.c
@@ -76,6 +76,7 @@ zend_class_entry *pthreads_worker_entry;
zend_class_entry *pthreads_stackable_entry;
zend_class_entry *pthreads_mutex_entry;
zend_class_entry *pthreads_condition_entry;
+zend_class_entry *pthreads_pool_entry;
zend_object_handlers pthreads_handlers;
zend_object_handlers *zend_handlers;
@@ -124,6 +125,7 @@ PHP_MINIT_FUNCTION(pthreads)
zend_class_entry ce;
zend_class_entry se;
zend_class_entry we;
+ zend_class_entry pe;
REGISTER_LONG_CONSTANT("PTHREADS_INHERIT_ALL", PTHREADS_INHERIT_ALL, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("PTHREADS_INHERIT_NONE", PTHREADS_INHERIT_NONE, CONST_CS | CONST_PERSISTENT);
@@ -172,6 +174,17 @@ PHP_MINIT_FUNCTION(pthreads)
pthreads_condition_entry=zend_register_internal_class(&ce TSRMLS_CC);
pthreads_condition_entry->ce_flags |= ZEND_ACC_FINAL;
+ INIT_CLASS_ENTRY(pe, "Pool", pthreads_pool_methods);
+ pe.serialize = zend_class_serialize_deny;
+ pe.unserialize = zend_class_unserialize_deny;
+ pthreads_pool_entry=zend_register_internal_class(&pe TSRMLS_CC);
+ zend_declare_property_long(pthreads_pool_entry, ZEND_STRL("size"), 1, ZEND_ACC_PROTECTED TSRMLS_CC);
+ zend_declare_property_null(pthreads_pool_entry, ZEND_STRL("class"), ZEND_ACC_PROTECTED TSRMLS_CC);
+ zend_declare_property_null(pthreads_pool_entry, ZEND_STRL("workers"), ZEND_ACC_PROTECTED TSRMLS_CC);
+ zend_declare_property_null(pthreads_pool_entry, ZEND_STRL("work"), ZEND_ACC_PROTECTED TSRMLS_CC);
+ zend_declare_property_null(pthreads_pool_entry, ZEND_STRL("ctor"), ZEND_ACC_PROTECTED TSRMLS_CC);
+ zend_declare_property_long(pthreads_pool_entry, ZEND_STRL("last"), 0, ZEND_ACC_PROTECTED TSRMLS_CC);
+
/*
* Setup standard and pthreads object handlers
*/
@@ -263,4 +276,8 @@ PHP_MINFO_FUNCTION(pthreads)
# include <classes/cond.h>
#endif
+#ifndef HAVE_PTHREADS_CLASS_POOL
+# include <classes/pool.h>
+#endif
+
#endif
View
4 php_pthreads.h
@@ -44,6 +44,10 @@ PHP_MINFO_FUNCTION(pthreads);
# include <classes/cond.h>
#endif
+#ifndef HAVE_PTHREADS_CLASS_POOL_H
+# include <classes/pool.h>
+#endif
+
extern zend_module_entry pthreads_module_entry;
#define phpext_pthreads_ptr &pthreads_module_entry
View
2 src/handlers.c
@@ -398,7 +398,7 @@ int pthreads_call_method(PTHREADS_CALL_METHOD_PASSTHRU_D) {
{
zend_op_array *ops = (zend_op_array*) call;
- if (ops) {
+ if (ops && ops->type == ZEND_USER_FUNCTION) {
if (ops->run_time_cache) {
efree(ops->run_time_cache);
ops->run_time_cache = NULL;
View
173 src/object.c
@@ -273,34 +273,29 @@ size_t pthreads_stack_next(PTHREAD thread, zval *this_ptr TSRMLS_DC) {
popped = Z_OBJCE_P(that_ptr);
/*
- * Switch scope to the next stackable
+ * Setup Executor
*/
- if (zend_hash_find(&popped->function_table, "run", sizeof("run"), (void**) &run)==SUCCESS) {
- /*
- * Setup Executor
- */
- EG(This) = that_ptr;
- EG(scope) = popped;
- EG(called_scope) = popped;
+ EG(This) = that_ptr;
+ EG(scope) = popped;
+ EG(called_scope) = popped;
+
+ /*
+ * Setup stackable for runtime
+ */
+ {
+ PTHREAD stackable = PTHREADS_FETCH_FROM(that_ptr);
- /*
- * Setup stackable for runtime
- */
- {
- PTHREAD stackable = PTHREADS_FETCH_FROM(that_ptr);
+ if (stackable) {
+ current->tid = thread->tid;
+ current->tls = thread->tls;
- if (stackable) {
- current->tid = thread->tid;
- current->tls = thread->tls;
-
- pthreads_connect(current, stackable TSRMLS_CC);
+ pthreads_connect(current, stackable TSRMLS_CC);
- pthreads_store_write(
- stackable->store, "worker", sizeof("worker")-1, &this_ptr TSRMLS_CC
- );
-
- Z_ADDREF_P(this_ptr);
- }
+ pthreads_store_write(
+ stackable->store, "worker", sizeof("worker")-1, &this_ptr TSRMLS_CC
+ );
+
+ Z_ADDREF_P(this_ptr);
}
}
}
@@ -817,97 +812,55 @@ static void * pthreads_routine(void *arg) {
/* connect $this */
if (pthreads_connect(PTHREADS_ZG(pointer)=thread, PTHREADS_FETCH_FROM(ZEG->This) TSRMLS_CC)==SUCCESS) {
- /* always the same no point recreating this for every execution */
- zval zmethod;
-
- ZVAL_STRINGL(&zmethod, "run", sizeof("run"), 0);
-
/* execute $this */
- do {
- zend_function *zrun;
- /* find zrun method */
- if (zend_hash_find(&(ZEG->scope->function_table), "run", sizeof("run"), (void**) &zrun)==SUCCESS) {
- zval *zresult;
- zend_fcall_info info;
- zend_fcall_info_cache cache;
-
- PTHREAD current = PTHREADS_FETCH_FROM(ZEG->This);
-
- /* populate a cache and call the run method */
- {
- /* initialize info object */
- info.size = sizeof(info);
- info.object_ptr = ZEG->This;
- info.function_name = &zmethod;
- info.retval_ptr_ptr = &zresult;
- info.no_separation = 1;
- info.symbol_table = NULL;
- info.param_count = 0;
- info.params = NULL;
-
- /* initialize cache object */
- cache.initialized = 1;
- cache.function_handler = zrun;
- cache.calling_scope = current->std.ce;
- cache.called_scope = current->std.ce;
- cache.object_ptr = ZEG->This;
-
- /* call the function */
- pthreads_state_set(current->state, PTHREADS_ST_RUNNING TSRMLS_CC);
- {
- zend_bool terminated = 0;
- /* graceful fatalities */
- zend_try {
- /* ::run */
- zend_call_function(&info, &cache TSRMLS_CC);
- } zend_catch {
- /* catches fatal errors and uncaught exceptions */
- terminated = 1;
-
- /* danger lurking ... */
- if (PTHREADS_ZG(signal) == PTHREADS_KILL_SIGNAL) {
- /* like, totally bail man ! */
- zend_bailout();
- }
- } zend_end_try();
-
- if (current) {
- /* set terminated state */
- if (terminated) {
- pthreads_state_set(
- current->state, PTHREADS_ST_ERROR TSRMLS_CC);
- /* save error information */
- pthreads_error_save(current->error TSRMLS_CC);
- }
-
- /* unset running for waiters */
- pthreads_state_unset(current->state, PTHREADS_ST_RUNNING TSRMLS_CC);
- }
-#if PHP_VERSION_ID > 50399
- {
- zend_op_array *ops = &zrun->op_array;
+ do {
+ PTHREAD current = PTHREADS_FETCH_FROM(ZEG->This);
+ zval *zresult = NULL;
+
+ pthreads_state_set(current->state, PTHREADS_ST_RUNNING TSRMLS_CC);
+ {
+ zend_bool terminated = 0;
+ /* graceful fatalities */
+ zend_try {
+ /* ::run */
+ zend_call_method(
+ &ZEG->This, ZEG->scope, NULL,
+ ZEND_STRL("run"),
+ &zresult, 0, NULL, NULL TSRMLS_CC);
+ } zend_catch {
+ /* catches fatal errors and uncaught exceptions */
+ terminated = 1;
- if (ops) {
- if (ops->run_time_cache) {
- efree(ops->run_time_cache);
- ops->run_time_cache = NULL;
- }
- }
- }
-#endif
+ /* danger lurking ... */
+ if (PTHREADS_ZG(signal) == PTHREADS_KILL_SIGNAL) {
+ /* like, totally bail man ! */
+ zend_bailout();
+ }
+ } zend_end_try();
+
+ if (current) {
+ /* set terminated state */
+ if (terminated) {
+ pthreads_state_set(
+ current->state, PTHREADS_ST_ERROR TSRMLS_CC);
+ /* save error information */
+ pthreads_error_save(current->error TSRMLS_CC);
+ }
+
+ /* unset running for waiters */
+ pthreads_state_unset(current->state, PTHREADS_ST_RUNNING TSRMLS_CC);
+ }
- /* deal with references to stackable */
- if (!terminated && inwork) {
- zval_ptr_dtor(&ZEG->This);
- } else inwork = 1;
+ /* deal with references to stackable */
+ if (!terminated && inwork) {
+ zval_ptr_dtor(&ZEG->This);
+ } else inwork = 1;
- /* deal with zresult (ignored) */
- if (zresult) {
- zval_ptr_dtor(&zresult);
- }
- }
+ /* deal with zresult (ignored) */
+ if (zresult) {
+ zval_ptr_dtor(&zresult);
}
- } else zend_error(E_ERROR, "pthreads has experienced an internal error while trying to execute %s::run", ZEG->scope->name);
+ }
} while(worker && pthreads_stack_next(thread, this_ptr TSRMLS_CC));
}
} zend_catch {
View
109 tests/pools.phpt
@@ -0,0 +1,109 @@
+--TEST--
+Test pooling
+--DESCRIPTION--
+This test verifies the functionality of selective inheritance
+--FILE--
+<?php
+
+class WebWorker extends Worker {
+
+ public function __construct(SafeLog $logger) {
+ $this->logger = $logger;
+ }
+
+ public function run(){}
+
+ protected $loger;
+}
+
+class WebWork extends Stackable {
+
+ public function isComplete() {
+ return $this->complete;
+ }
+
+ public function run() {
+ $this->worker
+ ->logger
+ ->log("%s executing in Thread #%lu",
+ __CLASS__, $this->worker->getThreadId());
+ $this->complete = true;
+ }
+
+ protected $complete;
+}
+
+class SafeLog extends Stackable {
+
+ protected function log($message, $args = []) {
+ $args = func_get_args();
+
+ if (($message = array_shift($args))) {
+ echo vsprintf(
+ "{$message}\n", $args);
+ }
+ }
+
+ public function run(){}
+}
+
+$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
+
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->submit(new WebWork());
+$pool->shutdown();
+
+$pool->collect(function($work){
+ return $work->isComplete();
+});
+
+var_dump($pool);
+?>
+--EXPECTF--
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+WebWork executing in Thread #%d
+object(Pool)#%d (6) {
+ ["size":protected]=>
+ int(8)
+ ["class":protected]=>
+ string(9) "WebWorker"
+ ["workers":protected]=>
+ array(0) {
+ }
+ ["work":protected]=>
+ array(0) {
+ }
+ ["ctor":protected]=>
+ array(1) {
+ [0]=>
+ object(SafeLog)#%d (0) {
+ }
+ }
+ ["last":protected]=>
+ int(6)
+}
+

0 comments on commit 39e0452

Please sign in to comment.