Skip to content

Commit

Permalink
Merge pull request #16 from swoole/master
Browse files Browse the repository at this point in the history
Sync upstream
  • Loading branch information
Inokinoki committed Aug 23, 2018
2 parents 7610f5d + 01a4e1a commit bb6a4a1
Show file tree
Hide file tree
Showing 25 changed files with 577 additions and 328 deletions.
1 change: 1 addition & 0 deletions config.m4
Expand Up @@ -465,6 +465,7 @@ if test "$PHP_SWOOLE" != "no"; then
src/coroutine/context.cc \
src/coroutine/ucontext.cc \
src/coroutine/socket.cc \
src/coroutine/channel.cc \
src/memory/shared_memory.c \
src/memory/global_memory.c \
src/memory/ring_buffer.c \
Expand Down
22 changes: 22 additions & 0 deletions examples/coroutine/select/test.php
@@ -0,0 +1,22 @@
<?php
use Swoole\Coroutine as co;

$chan = new co\Channel(2);
$n = 10;
for ($i = 0; $i < $n; $i++) {
go(function () use ($i,$chan) {
$ret = $chan->push($i);
echo "push {$i} res:".var_export($ret, 1)."\n";
});
};
go(function ()use ($chan){
$bool = true;
while ($bool){
$data = $chan->pop();
echo "pop res:".var_export($data, 1)."\n";
if($data===false){
$bool = false;
}
//var_dump($data);
}
});
24 changes: 24 additions & 0 deletions examples/coroutine/util/resume003.php
@@ -0,0 +1,24 @@
<?php
go(function () {
$count = 0;
go(function () use (&$count) {
echo "task 1 start\n";
co::sleep(0.2);
echo "task 1 resume count $count\n";
if (++$count === 2) {
co::resume(1);
}
echo "task 1 end\n";
});
go(function () use (&$count) {
echo "task 2 start\n";
co::sleep(0.1);
echo "task 2 resume count $count\n";
if (++$count === 2) {
co::resume(1);
}
echo "task 2 end\n";
});
co::suspend();
});
echo "main \n";
4 changes: 2 additions & 2 deletions examples/http2/server.php
@@ -1,7 +1,7 @@
<?php
$key_dir = dirname(__DIR__) . '/ssl';
//$http = new swoole_http_server("0.0.0.0", 9501, SWOOLE_BASE);
$http = new swoole_http_server("0.0.0.0", 9501, SWOOLE_BASE, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$http = new swoole_http_server("0.0.0.0", 9501, SWOOLE_BASE);
//$http = new swoole_http_server("0.0.0.0", 9501, SWOOLE_BASE, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$http->set([
'open_http2_protocol' => 1,
'ssl_cert_file' => $key_dir.'/ssl.crt',
Expand Down
14 changes: 10 additions & 4 deletions examples/server.php
Expand Up @@ -90,6 +90,13 @@ static function getBuffer($fd, $create = true)
}, false);

//$serv->addprocess($process2);

$serv->addProcess(new swoole_process(function ($worker) use ($serv) {
swoole\async::exec('ls /', function ($retval) {
var_dump($retval);
});
}, false));

$serv->set(G::$config);
$serv->set(['reactor_num' => 4]);

Expand Down Expand Up @@ -122,7 +129,7 @@ function forkChildInWorker() {
$process = new swoole_process( function (swoole_process $worker) use ($serv) {
// $serv = new swoole_server( "0.0.0.0", 9503 );
// $serv->set(array(
// 'worker_num' => 1
// 'worker_num' => 1
// ));
// $serv->on ( 'receive', function (swoole_server $serv, $fd, $from_id, $data) {
// $serv->send ( $fd, "Swoole: " . $data );
Expand All @@ -141,7 +148,7 @@ function forkChildInWorker() {
}

function processRename(swoole_server $serv, $worker_id) {

global $argv;
if ( $serv->taskworker)
{
Expand All @@ -159,7 +166,7 @@ function processRename(swoole_server $serv, $worker_id) {
}

function setTimerInWorker(swoole_server $serv, $worker_id) {

if ($worker_id == 0) {
echo "Start: ".microtime(true)."\n";
//$serv->addtimer(3000);
Expand Down Expand Up @@ -579,4 +586,3 @@ function broadcast(swoole_server $serv, $fd = 0, $data = "hello")
swoole_set_process_name("php {$argv[0]}: manager");
});
$serv->start();

107 changes: 107 additions & 0 deletions include/channel.h
@@ -0,0 +1,107 @@
#pragma once

#include "swoole.h"
#include "context.h"
#include "coroutine.h"
#include <string>
#include <iostream>
#include <list>
#include <queue>
#include <sys/stat.h>

namespace swoole {

enum channel_op
{
PRODUCER = 1,
CONSUMER = 2,
};

class Channel;

struct notify_msg_t
{
Channel *chan;
enum channel_op type;
};

struct timeout_msg_t
{
Channel *chan;
coroutine_t *co;
bool error;
swTimer_node *timer;
};

class Channel
{
private:
std::list<coroutine_t *> producer_queue;
std::list<coroutine_t *> consumer_queue;
std::queue<void *> data_queue;
size_t capacity;
uint32_t notify_producer_count;
uint32_t notify_consumer_count;

public:
bool closed;
inline bool is_empty()
{
return data_queue.size() == 0;
}

inline bool is_full()
{
return data_queue.size() == capacity;
}

inline size_t length()
{
return data_queue.size();
}

inline size_t consumer_num()
{
return consumer_queue.size();
}

inline size_t producer_num()
{
return producer_queue.size();
}

inline void remove(coroutine_t *co)
{
consumer_queue.remove(co);
}

inline coroutine_t* pop_coroutine(enum channel_op type)
{
coroutine_t* co;
if (type == PRODUCER)
{
co = producer_queue.front();
producer_queue.pop_front();
notify_producer_count--;
swDebug("resume producer[%d]", coroutine_get_cid(co));
}
else
{
co = consumer_queue.front();
consumer_queue.pop_front();
notify_consumer_count--;
swDebug("resume consumer[%d]", coroutine_get_cid(co));
}
return co;
}

Channel(size_t _capacity);
void yield(enum channel_op type);
void notify(enum channel_op type);
void* pop(double timeout = 0);
bool push(void *data);
bool close();
};

};

3 changes: 2 additions & 1 deletion include/coroutine.h
Expand Up @@ -41,7 +41,8 @@ void coroutine_release(coroutine_t *co);
void coroutine_set_ptr(coroutine_t *co, void *ptr);
void* coroutine_get_ptr_by_cid(int cid);
coroutine_t *coroutine_get_by_id(int cid);
int coroutine_get_cid();
int coroutine_get_current_cid();
int coroutine_get_cid(coroutine_t *co);
int coroutine_test_alloc_cid();
void coroutine_test_free_cid(int cid);

Expand Down
3 changes: 3 additions & 0 deletions include/swoole.h
Expand Up @@ -249,6 +249,7 @@ enum swFd_type
SW_FD_SIGNAL = 11, //signalfd
SW_FD_DNS_RESOLVER = 12, //dns resolver
SW_FD_INOTIFY = 13, //server socket
SW_FD_CHAN_PIPE = 14, //channel pipe
SW_FD_USER = 15, //SW_FD_USER or SW_FD_USER+n: for custom event
SW_FD_STREAM_CLIENT = 16, //swClient stream
SW_FD_DGRAM_CLIENT = 17, //swClient dgram
Expand Down Expand Up @@ -2153,6 +2154,8 @@ typedef struct
swHashMap *functions;
swLinkedList *hooks[SW_MAX_HOOK_TYPE];

swPipe *chan_pipe;

} swServerG;

extern swServerG SwooleG; //Local Global Variable
Expand Down
1 change: 0 additions & 1 deletion php_swoole.h
Expand Up @@ -233,7 +233,6 @@ enum php_swoole_fd_type
PHP_SWOOLE_FD_REDIS_CORO,
PHP_SWOOLE_FD_POSTGRESQL,
PHP_SWOOLE_FD_SOCKET,
PHP_SWOOLE_FD_CHAN_PIPE,
#endif
/**
* for Co::fread/Co::fwrite
Expand Down
7 changes: 6 additions & 1 deletion src/coroutine/base.cc
Expand Up @@ -214,11 +214,16 @@ coroutine_t* coroutine_get_by_id(int cid)
return swCoroG.coroutines[cid];
}

int coroutine_get_cid()
int coroutine_get_current_cid()
{
return swCoroG.current_cid;
}

int coroutine_get_cid(coroutine_t *co)
{
return co->cid;
}

int coroutine_test_alloc_cid()
{
int cid = alloc_cidmap();
Expand Down

0 comments on commit bb6a4a1

Please sign in to comment.