Skip to content

Commit

Permalink
channels
Browse files Browse the repository at this point in the history
  • Loading branch information
krakjoe committed Apr 5, 2019
1 parent b9f49d7 commit cf2c909
Show file tree
Hide file tree
Showing 20 changed files with 986 additions and 20 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,50 @@ final class parallel\Future {
*/
public static function select(array &$resolving, array &$resolved, array &$errored, array &$timedout, int $timeout) : int;
}

final class Channel {

/*
* Shall make an unbuffered channel with the given name
* @param string name
* @throws \parallel\Exception if channel already exists
*/
public static function make(string $name) : Channel;

/*
* Shall make a buffered channel with the given name and capacity
* @param string name
* @param int capacity may be -1 for unlimited, or a positive integer
* @throws \parallel\Exception if arguments are invalid
* @throws \parallel\Exception if channel already exists
*/
public static function make(string $name, int $capacity) : Channel;

/*
* Shall open the channel with the given name
* @param string name a previously made channel
* @throws \parallel\Exception if the channel cannot be found
*/
public static function open(string $name) : Channel;

/*
* Shall send the given value on this channel
* @param mixed value any non-object, non-resource, non-null value
*/
public function send(mixed $value) : bool;

/*
* Shall recv a value from this channel
* @returns mixed
*/
public function recv() : mixed;

/*
* Shall close the channel
* @throws \parallel\Exception if this channel was already closed
*/
public function close() : void;
}
```

Implementation
Expand Down
2 changes: 1 addition & 1 deletion config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ if test "$PHP_PARALLEL" != "no"; then
AX_CHECK_COMPILE_FLAG(-fstack-protector-strong, _MAINTAINER_CFLAGS="$_MAINTAINER_CFLAGS -fstack-protector-strong")
fi

PHP_NEW_EXTENSION(parallel, php_parallel.c src/monitor.c src/parallel.c src/future.c src/copy.c, $ext_shared,, "-Wall -DZEND_ENABLE_STATIC_TSRMLS_CACHE=1 $_MAINTAINER_CFLAGS")
PHP_NEW_EXTENSION(parallel, php_parallel.c src/monitor.c src/parallel.c src/future.c src/copy.c src/channel.c src/link.c, $ext_shared,, "-Wall -DZEND_ENABLE_STATIC_TSRMLS_CACHE=1 $_MAINTAINER_CFLAGS")

PHP_ADD_BUILD_DIR($ext_builddir/src, 1)
PHP_ADD_INCLUDE($ext_srcdir)
Expand Down
2 changes: 1 addition & 1 deletion config.w32
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ if (PHP_PARALLEL != 'no') {
EXTENSION("parallel", "php_parallel.c", PHP_PARALLEL_SHARED, "/DZEND_ENABLE_STATIC_TSRMLS_CACHE=1 /I" + configure_module_dirname);
ADD_SOURCES(
configure_module_dirname + "/src",
"copy.c monitor.c parallel.c future.c",
"copy.c monitor.c parallel.c future.c channel.c link.c",
"parallel"
);
} else {
Expand Down
3 changes: 3 additions & 0 deletions php_parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@

#include "src/parallel.h"
#include "src/future.h"
#include "src/channel.h"

/* {{{ PHP_MINIT_FUNCTION */
PHP_MINIT_FUNCTION(parallel)
{
php_parallel_startup();
php_parallel_future_startup();
php_parallel_channel_startup();

return SUCCESS;
} /* }}} */

/* {{{ PHP_MSHUTDOWN_FUNCTION */
PHP_MSHUTDOWN_FUNCTION(parallel)
{
php_parallel_channel_shutdown();
php_parallel_shutdown();

return SUCCESS;
Expand Down
2 changes: 1 addition & 1 deletion php_parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
extern zend_module_entry parallel_module_entry;
# define phpext_parallel_ptr &parallel_module_entry

# define PHP_PARALLEL_VERSION "0.8.4-dev"
# define PHP_PARALLEL_VERSION "0.9.0-dev"

# if defined(ZTS) && defined(COMPILE_DL_PARALLEL)
ZEND_TSRMLS_CACHE_EXTERN()
Expand Down
262 changes: 262 additions & 0 deletions src/channel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
+----------------------------------------------------------------------+
| parallel |
+----------------------------------------------------------------------+
| Copyright (c) Joe Watkins 2019 |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: krakjoe |
+----------------------------------------------------------------------+
*/
#ifndef HAVE_PARALLEL_CHANNEL
#define HAVE_PARALLEL_CHANNEL

#include "parallel.h"
#include "copy.h"
#include "channel.h"

#include "zend_exceptions.h"

zend_class_entry *php_parallel_channel_ce;
zend_object_handlers php_parallel_channel_handlers;

static zend_always_inline void php_parallel_channels_make(zval *return_value, zend_string *name, zend_bool buffered, zend_long capacity) {
php_parallel_channel_t *channel;

object_init_ex(return_value, php_parallel_channel_ce);

channel = php_parallel_channel_from(return_value);
channel->link = php_parallel_link_init(name, buffered, capacity);

zend_hash_add_ptr(
&php_parallel_channels.links,
php_parallel_link_name(channel->link),
php_parallel_link_copy(channel->link));
}

static zend_always_inline void php_parallel_channels_open(zval *return_value, php_parallel_link_t *link) {
php_parallel_channel_t *channel;

object_init_ex(return_value, php_parallel_channel_ce);

channel = php_parallel_channel_from(return_value);
channel->link = php_parallel_link_copy(link);
}

PHP_METHOD(Channel, make)
{
zend_string *name = NULL;
zend_bool buffered = 0;
zend_long capacity = -1;

if (ZEND_NUM_ARGS() == 1) {
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_QUIET, 1, 1)
Z_PARAM_STR(name)
ZEND_PARSE_PARAMETERS_END_EX(
php_parallel_exception("expected channel name");
return;
);
} else {
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_QUIET, 2, 2)
Z_PARAM_STR(name)
Z_PARAM_LONG(capacity)
ZEND_PARSE_PARAMETERS_END_EX(
php_parallel_exception("expected channel name and capacity");
return;
);

if (capacity < -1 || capacity == 0) {
php_parallel_exception(
"capacity may be -1 for unlimited, or a positive integer");
return;
}

buffered = 1;
}

php_parallel_monitor_lock(php_parallel_channels.monitor);

if (zend_hash_exists(&php_parallel_channels.links, name)) {
php_parallel_exception(
"channel named %s already exists, did you mean open?",
ZSTR_VAL(name));
} else {
php_parallel_channels_make(return_value, name, buffered, capacity);
}

php_parallel_monitor_unlock(php_parallel_channels.monitor);
}

PHP_METHOD(Channel, open)
{
zend_string *name = NULL;
php_parallel_link_t *link;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_QUIET, 1, 1)
Z_PARAM_STR(name)
ZEND_PARSE_PARAMETERS_END_EX(
php_parallel_exception("expected channel name");
return;
);

php_parallel_monitor_lock(php_parallel_channels.monitor);

if (!(link = zend_hash_find_ptr(&php_parallel_channels.links, name))) {
php_parallel_exception("channel named %s is not available, was it closed ?", ZSTR_VAL(name));
} else {
php_parallel_channels_open(return_value, link);
}

php_parallel_monitor_unlock(php_parallel_channels.monitor);
}

PHP_METHOD(Channel, send)
{
php_parallel_channel_t *channel = php_parallel_channel_from(getThis());
zval *value;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_QUIET, 1, 1)
Z_PARAM_ZVAL(value)
ZEND_PARSE_PARAMETERS_END_EX(
php_parallel_exception("expected value");
return;
);

if (Z_TYPE_P(value) == IS_OBJECT || Z_TYPE_P(value) == IS_NULL) {
php_parallel_exception(
"value %s is illegal",
zend_get_type_by_const(Z_TYPE_P(value)));
return;
}

if (php_parallel_link_closed(channel->link)) {
php_parallel_exception("channel(%s) closed",
ZSTR_VAL(php_parallel_link_name(channel->link)));
return;
}

RETURN_BOOL(php_parallel_link_send(channel->link, value));
}

PHP_METHOD(Channel, recv)
{
php_parallel_channel_t *channel = php_parallel_channel_from(getThis());

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_QUIET, 0, 0)
ZEND_PARSE_PARAMETERS_END_EX(
php_parallel_exception("expected no arguments");
return;
);

if (php_parallel_link_closed(channel->link)) {
php_parallel_exception("channel(%s) closed",
ZSTR_VAL(php_parallel_link_name(channel->link)));
return;
}

php_parallel_link_recv(channel->link, return_value);
}

PHP_METHOD(Channel, close)
{
php_parallel_channel_t *channel = php_parallel_channel_from(getThis());

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_QUIET, 0, 0)
ZEND_PARSE_PARAMETERS_END_EX(
php_parallel_exception("expected no arguments");
return;
);

if (!php_parallel_link_close(channel->link)) {
php_parallel_exception("channel(%s) already closed",
ZSTR_VAL(php_parallel_link_name(channel->link)));
}

php_parallel_monitor_lock(php_parallel_channels.monitor);
zend_hash_del(
&php_parallel_channels.links,
php_parallel_link_name(channel->link));
php_parallel_monitor_unlock(php_parallel_channels.monitor);
}

PHP_METHOD(Channel, __toString)
{
php_parallel_channel_t *channel =
php_parallel_channel_from(getThis());

RETURN_STR_COPY(php_parallel_link_name(channel->link));
}

zend_function_entry php_parallel_channel_methods[] = {
PHP_ME(Channel, make, NULL, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
PHP_ME(Channel, open, NULL, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
PHP_ME(Channel, send, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Channel, recv, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Channel, close, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Channel, __toString, NULL, ZEND_ACC_PUBLIC)
PHP_FE_END
};

zend_object* php_parallel_channel_create(zend_class_entry *type) {
php_parallel_channel_t *channel = ecalloc(1,
sizeof(php_parallel_channel_t) + zend_object_properties_size(type));

zend_object_std_init(&channel->std, type);

channel->std.handlers = &php_parallel_channel_handlers;

return &channel->std;
}

void php_parallel_channel_destroy(zend_object *o) {
php_parallel_channel_t *channel =
php_parallel_channel_fetch(o);

php_parallel_link_destroy(channel->link);

zend_object_std_dtor(o);
}

void php_parallel_channels_link_destroy(zval *zv) {
php_parallel_link_t *link = Z_PTR_P(zv);

php_parallel_link_destroy(link);
}

void php_parallel_channel_startup() {
zend_class_entry ce;

memcpy(&php_parallel_channel_handlers, zend_get_std_object_handlers(), sizeof(zend_object_handlers));

php_parallel_channel_handlers.offset = XtOffsetOf(php_parallel_channel_t, std);
php_parallel_channel_handlers.free_obj = php_parallel_channel_destroy;

INIT_NS_CLASS_ENTRY(ce, "parallel", "Channel", php_parallel_channel_methods);

php_parallel_channel_ce = zend_register_internal_class(&ce);
php_parallel_channel_ce->create_object = php_parallel_channel_create;
php_parallel_channel_ce->ce_flags |= ZEND_ACC_FINAL;

php_parallel_channels.monitor = php_parallel_monitor_create();

zend_hash_init(
&php_parallel_channels.links,
32,
NULL,
php_parallel_channels_link_destroy, 1);
}

void php_parallel_channel_shutdown() {
php_parallel_monitor_destroy(
php_parallel_channels.monitor);
zend_hash_destroy(&php_parallel_channels.links);
}

#endif

0 comments on commit cf2c909

Please sign in to comment.