Permalink
Browse files

Merge pull request #57 from Erika31/master

New "genericWebSocketServer" dealing with "hixie-76" et "hybi-10" IETF protocols
  • Loading branch information...
kakserpom committed Mar 7, 2012
2 parents 863f604 + a8e4ec0 commit 78cd6db036fa63c5fd02690e6dbfaadeaf9831a1
@@ -0,0 +1,99 @@
+<?php
+
+define("EXTERNAL_APP_CLASSES_DIR", dirname(__FILE__) . "/../../../app/") ;
+
+require_once(EXTERNAL_APP_CLASSES_DIR . "/modules/app/classes/Base_Class.class.php") ;
+require_once(EXTERNAL_APP_CLASSES_DIR . "/modules/app/classes/DB_Connection.class.php") ;
+require_once(EXTERNAL_APP_CLASSES_DIR . "/modules/app/classes/DB_GenericTable.class.php") ;
+require_once(EXTERNAL_APP_CLASSES_DIR . "/modules/app/classes/Tools.class.php") ;
+require_once(EXTERNAL_APP_CLASSES_DIR . "/modules/app/classes/Crypto.class.php") ;
+require_once(EXTERNAL_APP_CLASSES_DIR . "/modules/app/classes/Session.class.php") ;
+require_once(EXTERNAL_APP_CLASSES_DIR . "/modules/app/classes/wsServer.class.php") ;
+
+class ExampleGenericWebSocket extends AppInstance {
+
+ /**
+ * Called when the worker is ready to go.
+ * @return void
+ */
+ public function onReady() {
+ if ($this->WS = Daemon::$appResolver->getInstanceByAppName('genericWebSocketServer')) {
+ $this->WS->addRoute('exampleApp', function ($client) {
+ return new ExampleGenericWebSocketRoute($client);
+ }
+ );
+
+ // If you want to manage timeout events :
+ $appInstance = $this ;
+ $this->WS->registerEventTimeout(function() use ($appInstance) {
+ return $appInstance->onEventTimeout() ;
+ }
+ ) ;
+ }
+ }
+
+ /**
+ * Called when a timeout event is raised
+ * @return void
+ */
+ public function onEventTimeout() {
+
+ }
+
+ /**
+ * Called when application instance is going to shutdown
+ * @todo protected?
+ * @return boolean Ready to shutdown?
+ */
+ public function onShutdown() {
+ return TRUE;
+ }
+
+}
+
+class ExampleGenericWebSocketRoute extends WebSocketRoute {
+
+ /**
+ * Called when new frame received.
+ * @param string Frame's contents.
+ * @param string Frame's type. ("STRING" or "BINARY")
+ * @return void
+ */
+ public function onFrame($data, $type) {
+ if ($data === 'ping') {
+ $this->client->sendFrame('pong', "STRING", function($client) {
+ Daemon::log($client->clientAddr . ' : SEND pong');
+ }
+ );
+ return ;
+ }
+ }
+
+ /**
+ * Called when the connection is handshaked.
+ * @return void
+ */
+ public function onHandshake() {
+
+ Daemon::log($this->client->clientAddr . ' : Handshake success') ;
+ }
+
+ /**
+ * Called when session finished.
+ * @return void
+ */
+ public function onFinish() {
+
+ Daemon::log($this->client->clientAddr . ' : Disconnected');
+ }
+
+ /**
+ * Called when the worker is going to shutdown.
+ * @return boolean Ready to shutdown?
+ */
+ public function gracefulShutdown() {
+
+ Daemon::log($this->client->clientAddr . ' : Gracefully disconnecting (as requested by server)') ;
+ return TRUE ;
+ }
+}
@@ -0,0 +1,267 @@
+<?php
+
+class genericWebSocketServer extends AsyncServer
+{
+ public $sessions = array();
+ public $routes = array();
+
+ protected $timeout_cb;
+
+ const BINARY = 0x80;
+ const STRING = 0x00;
+
+ /**
+ * Registering event timeout callback function
+ * @param Closure Callback function
+ * @return void
+ */
+
+ public function registerEventTimeout($cb)
+ {
+ if ($cb === NULL || is_callable($cb))
+ {
+ $this->timeout_cb = $cb ;
+ }
+ }
+
+ /**
+ * Setting default config options
+ * Overriden from AppInstance::getConfigDefaults
+ * @return array|false
+ */
+
+ protected function getConfigDefaults()
+ {
+ return array(
+ // listen to
+ 'listen' => 'tcp://0.0.0.0',
+ // listen port
+ 'listenport' => 55556,
+ // max allowed packet size
+ 'maxallowedpacket' => new Daemon_ConfigEntrySize('16k'),
+ // disabled by default
+ 'enable' => 0,
+ // no event_timeout by default
+ 'ev_timeout' => -1
+ );
+ }
+
+ /**
+ * Event of appInstance. Adds default settings and binds sockets.
+ * @return void
+ */
+
+ public function init()
+ {
+ $this->update();
+
+ if ($this->config->enable->value)
+ {
+ $this->bindSockets(
+ $this->config->listen->value,
+ $this->config->listenport->value
+ );
+ }
+ }
+
+ /**
+ * Enable all events of sockets
+ * @return void
+ */
+
+ public function enableSocketEvents()
+ {
+ foreach ($this->socketEvents as $ev)
+ {
+ event_base_set($ev, Daemon::$process->eventBase);
+ event_add($ev, $this->config->ev_timeout->value); // With specified timeout
+ }
+ }
+
+ /**
+ * Called when a request to HTTP-server looks like WebSocket handshake query.
+ * @return void
+ */
+/*
+ public function inheritFromRequest($req, $appInstance)
+ {
+ $connId = $req->attrs->connId;
+
+ unset(Daemon::$process->queue[$connId . '-' . $req->attrs->id]);
+
+ $this->buf[$connId] = $appInstance->buf[$connId];
+
+ unset($appInstance->buf[$connId]);
+ unset($appInstance->poolState[$connId]);
+
+ $set = event_buffer_set_callback(
+ $this->buf[$connId],
+ $this->directReads ? NULL : array($this, 'onReadEvent'),
+ array($this, 'onWriteEvent'),
+ array($this, 'onFailureEvent'),
+ array($connId)
+ );
+
+ unset(Daemon::$process->readPoolState[$connId]);
+
+ $this->poolState[$connId] = array();
+
+ $this->sessions[$connId] = new genericWebSocketSession($connId, $this);
+ $this->sessions[$connId]->clientAddr = $req->attrs->server['REMOTE_ADDR'];
+ $this->sessions[$connId]->server = $req->attrs->server;
+ $this->sessions[$connId]->firstline = TRUE;
+ $this->sessions[$connId]->stdin("\r\n" . $req->attrs->inbuf);
+ }
+*/
+ /**
+ * Adds a route if it doesn't exist already.
+ * @param string Route name.
+ * @param mixed Route's callback.
+ * @return boolean Success.
+ */
+
+ public function addRoute($route, $cb)
+ {
+ if (isset($this->routes[$route]))
+ {
+ Daemon::log(__METHOD__ . ' Route \'' . $route . '\' is already taken.');
+ return FALSE;
+ }
+
+ $this->routes[$route] = $cb;
+
+ return TRUE;
+ }
+
+ /**
+ * Force add/replace a route.
+ * @param string Route name.
+ * @param mixed Route's callback.
+ * @return boolean Success.
+ */
+
+ public function setRoute($route, $cb)
+ {
+ $this->routes[$route] = $cb;
+
+ return TRUE;
+ }
+
+ /**
+ * Removes a route.
+ * @param string Route name.
+ * @return boolean Success.
+ */
+
+ public function removeRoute($route)
+ {
+ if (!isset($this->routes[$route]))
+ {
+ return FALSE;
+ }
+
+ unset($this->routes[$route]);
+
+ return TRUE;
+ }
+
+ /**
+ * Event of appInstance.
+ * @return void
+ */
+
+ public function onReady()
+ {
+ if ($this->config->enable->value)
+ {
+ $this->enableSocketEvents();
+ }
+ }
+
+ /**
+ * Called when remote host is trying to establish the connection
+ * @param resource Descriptor
+ * @param integer Events
+ * @param mixed Attached variable
+ * @return boolean If true then we can accept new connections, else we can't
+ */
+
+ public function checkAccept($stream, $events, $arg)
+ {
+ if (!parent::checkAccept($stream, $events, $arg))
+ {
+ return FALSE;
+ }
+
+ $sockId = $arg[0];
+
+ event_add($this->socketEvents[$sockId], $this->config->ev_timeout->value) ; // With specified timeout
+
+ // Always return FALSE to skip adding event without timeout in "parent::onAcceptEvent"...
+ return FALSE ;
+ }
+
+ /**
+ * Called when remote host is trying to establish the connection
+ * @param integer Connection's ID
+ * @param string Address
+ * @return boolean Accept/Drop the connection
+ */
+
+ public function onAccept($connId, $addr)
+ {
+ if (parent::onAccept($connId, $addr))
+ {
+ Daemon::log("New client : " . $addr) ;
+
+ return TRUE ;
+ }
+
+ return FALSE ;
+ }
+
+ /**
+ * Event of asyncServer
+ * @param integer Connection's ID
+ * @param string Peer's address
+ * @return void
+ */
+ protected function onAccepted($connId, $addr)
+ {
+ $this->sessions[$connId] = new genericWebSocketSession($connId, $this);
+ $this->sessions[$connId]->clientAddr = $addr;
+ }
+
+ /**
+ * Called when new connections is waiting for accept
+ * @param resource Descriptor
+ * @param integer Events
+ * @param mixed Attached variable
+ * @return void
+ */
+
+ public function onAcceptEvent($stream, $events, $arg)
+ {
+ if ($events & EV_TIMEOUT)
+ {
+ $sockId = $arg[0];
+
+ if ($this->timeout_cb !== NULL)
+ {
+ call_user_func($this->timeout_cb) ;
+ }
+
+ event_add($this->socketEvents[$sockId], $this->config->ev_timeout->value) ;
+ return ;
+ }
+
+ parent::onAcceptEvent($stream, $events, $arg);
+ }
+/*
+ public function onTimeout()
+ {
+
+ }
+*/
+}
+
View
@@ -329,9 +329,12 @@ public function onAccept($connId, $addr) {
/**
* Called when remote host is trying to establish the connection
+ * @param resource Descriptor
+ * @param integer Events
+ * @param mixed Attached variable
* @return boolean If true then we can accept new connections, else we can't
*/
- public function checkAccept() {
+ public function checkAccept($stream, $events, $arg) {
if (Daemon::$process->reload) {
return FALSE;
}
@@ -505,7 +508,7 @@ public function onAcceptEvent($stream, $events, $arg) {
Daemon::$process->log(get_class($this) . '::' . __METHOD__ . '(' . $sockId . ') invoked.');
}
- if ($this->checkAccept()) {
+ if ($this->checkAccept($stream, $events, $arg)) {
event_add($this->socketEvents[$sockId]);
}
Oops, something went wrong.

0 comments on commit 78cd6db

Please sign in to comment.