Browse files

HTTPRequest: sendfile() syscall support

  • Loading branch information...
1 parent c8e68da commit ae74c479e5f958d6ab2efd0d59c13f2065c09844 @kakserpom committed Jun 25, 2012
Showing with 122 additions and 82 deletions.
  1. +3 −1 app-web/FileReader.php
  2. +22 −16 lib/Daemon.php
  3. +1 −0 lib/Daemon_Bootstrap.php
  4. +2 −1 lib/Daemon_MasterThread.php
  5. +4 −4 lib/Daemon_WorkerThread.php
  6. +28 −12 lib/FS.php
  7. +23 −12 lib/File.php
  8. +36 −33 lib/HTTPRequest.php
  9. +3 −3 lib/HTTPServerConnection.php
View
4 app-web/FileReader.php
@@ -115,7 +115,9 @@ public function file($path) {
$req = $this;
$job = $this->job;
$job('readfile', function ($name, $job) use ($req, $path) {
- $req->sendfile($path, function($file, $success) use ($job) {$job->setResult('readfile');});
+ $req->sendfile($path, function($file, $success) use ($job) {
+ $job->setResult('readfile');
+ });
});
}
public function autoindex($dir) {
View
38 lib/Daemon.php
@@ -274,24 +274,28 @@ public static function loadConfig($paths) {
}
/**
- * Open log descriptors.
+ * Open logs.
* @return void
*/
public static function openLogs() {
+ Daemon::$logpointer = fopen(Daemon::$config->logstorage->value, 'a');
+ return;
if (Daemon::$config->logging->value) {
- if (Daemon::$logpointer) {
- fclose(Daemon::$logpointer);
- Daemon::$logpointer = FALSE;
- }
-
- Daemon::$logpointer = fopen(Daemon::$config->logstorage->value, 'a+');
-
- if (isset(Daemon::$config->group->value)) {
- chgrp(Daemon::$config->logstorage->value, Daemon::$config->group->value);
- }
+ FS::open(Daemon::$config->logstorage->value, 'a', function ($file) {
+ Daemon::$logpointer = $file;
+ if (!$file) {
+ return;
+ }
+ if (isset(Daemon::$config->group->value)) {
+ chgrp(Daemon::$config->logstorage->value, Daemon::$config->group->value); // @TODO: rewrite to async I/O
+ }
- if (isset(Daemon::$config->user->value)) {
- chown(Daemon::$config->logstorage->value, Daemon::$config->user->value);
+ if (isset(Daemon::$config->user->value)) {
+ chown(Daemon::$config->logstorage->value, Daemon::$config->user->value); // @TODO: rewrite to async I/O
+ }
+ });
+ if (Daemon::$logpointer === null) { // block on first call
+ FS::waitAllEvents();
}
}
}
@@ -300,11 +304,11 @@ public static function openLogs() {
* Get state of workers.
* @return array - information.
*/
+ // @TODO: get rid of magic numbers in status (use constants)
public static function getStateOfWorkers($master = NULL) {
static $bufsize = 1024;
-
$offset = 0;
-
+
$stat = array(
'idle' => 0,
'busy' => 0,
@@ -443,7 +447,9 @@ public static function log() {
}
if (Daemon::$logpointer) {
- fwrite(Daemon::$logpointer, '[' . date('D, j M Y H:i:s', $mt[1]) . '.' . sprintf('%06d', $mt[0]*1000000) . ' ' . date('O') . '] ' . $msg . "\n");
+ $msg = '[' . date('D, j M Y H:i:s', $mt[1]) . '.' . sprintf('%06d', $mt[0]*1000000) . ' ' . date('O') . '] ' . $msg . "\n";
+ fwrite(Daemon::$logpointer, $msg);
+ //Daemon::$logpointer->write($msg);
}
}
View
1 lib/Daemon_Bootstrap.php
@@ -66,6 +66,7 @@ class Daemon_Bootstrap {
*/
public static function init() {
Daemon::initSettings();
+ FS::init();
Daemon::$runName = basename($_SERVER['argv'][0]);
$error = FALSE;
View
3 lib/Daemon_MasterThread.php
@@ -28,6 +28,7 @@ protected function run() {
$this->eventBase = event_base_new();
$this->registerEventSignals();
+ //FS::initEvent();
$this->fileWatcher = new FileWatcher;
$this->workers = new ThreadCollection;
@@ -112,7 +113,7 @@ public function updatedWorkers() {
foreach (Daemon::$config as $name => $section)
{
if (
- (!$section instanceof Daemon_ConfigSection)
+ (!$section instanceof Daemon_ConfigSection)
|| !isset($section->limitinstances)) {
continue;
View
8 lib/Daemon_WorkerThread.php
@@ -55,7 +55,7 @@ protected function run() {
$this->eventBase = event_base_new();
$this->registerEventSignals();
- FS::init();
+ FS::initEvent();
$this->fileWatcher = new FileWatcher;
@@ -495,9 +495,9 @@ public function shutdown($hard = FALSE) {
while (!$this->reloadReady) {
event_base_loop($this->eventBase);
}
-
- posix_kill(posix_getppid(), SIGCHLD);
- exit(0);
+ //FS::waitAllEvents(); // ensure that all I/O events completed before suicide
+ posix_kill(posix_getppid(), SIGCHLD); // praying to Master
+ exit(0); // R.I.P.
}
/**
View
40 lib/FS.php
@@ -25,20 +25,34 @@ public static function init() {
Daemon::log('FS: missing pecl-eio, Filesystem I/O performance compromised. Consider installing pecl-eio.');
return;
}
-
+ }
+ public static function initEvent() {
+ if (!self::$supported) {
+ return;
+ }
self::updateConfig();
-
self::$ev = event_new();
self::$fd = eio_get_event_stream();
event_set(self::$ev, self::$fd, EV_READ | EV_PERSIST, function ($fd, $events, $arg) {
if (eio_nreqs()) {
- @eio_poll(); // ignore bogus warnings
+ eio_poll();
}
});
event_base_set(self::$ev, Daemon::$process->eventBase);
event_add(self::$ev);
}
+ public static function waitAllEvents() {
+ if (!self::$supported) {
+ return;
+ }
+ Daemon::log('blocked');
+ while ($n = eio_nreqs()) {
+ eio_poll();
+ }
+ Daemon::log('unblocked');
+ }
+
public static function updateConfig() {
if (Daemon::$config->eiosetmaxidle->value !== null) {
eio_set_max_idle(Daemon::$config->eiosetmaxidle->value);
@@ -133,7 +147,7 @@ public static function syncfs($cb = null, $pri = EIO_PRI_DEFAULT) {
return eio_syncfs($pri, $cb);
}
- public function touch($path, $mtime, $atime = null, $cb = null, $pri = EIO_PRI_DEFAULT) {
+ public static function touch($path, $mtime, $atime = null, $cb = null, $pri = EIO_PRI_DEFAULT) {
if (!FS::$supported) {
$r = touch($path, $mtime, $atime);
if ($cb) {
@@ -144,7 +158,7 @@ public function touch($path, $mtime, $atime = null, $cb = null, $pri = EIO_PRI_D
return eio_utime($path, $atime, $mtime, $pri, $cb, $path);
}
- public function rmdir($path, $cb = null, $pri = EIO_PRI_DEFAULT) {
+ public static function rmdir($path, $cb = null, $pri = EIO_PRI_DEFAULT) {
if (!FS::$supported) {
$r = rmdir($path);
if ($cb) {
@@ -155,7 +169,7 @@ public function rmdir($path, $cb = null, $pri = EIO_PRI_DEFAULT) {
return eio_rmdir($path, $pri, $cb, $path);
}
- public function mkdir($path, $mode, $cb = null, $pri = EIO_PRI_DEFAULT) {
+ public static function mkdir($path, $mode, $cb = null, $pri = EIO_PRI_DEFAULT) {
if (!FS::$supported) {
$r = mkdir($path, $mode);
if ($cb) {
@@ -166,7 +180,7 @@ public function mkdir($path, $mode, $cb = null, $pri = EIO_PRI_DEFAULT) {
return eio_mkdir($path, $mode, $pri, $cb, $path);
}
- public function readdir($path, $cb = null, $flags, $pri = EIO_PRI_DEFAULT) {
+ public static function readdir($path, $cb = null, $flags, $pri = EIO_PRI_DEFAULT) {
if (!FS::$supported) {
$r = glob($path);
if ($cb) {
@@ -178,7 +192,7 @@ public function readdir($path, $cb = null, $flags, $pri = EIO_PRI_DEFAULT) {
}
- public function truncate($path, $offset = 0, $cb = null, $pri = EIO_PRI_DEFAULT) {
+ public static function truncate($path, $offset = 0, $cb = null, $pri = EIO_PRI_DEFAULT) {
if (!FS::$supported) {
$fp = fopen($path, 'r+');
$r = $fp && ftruncate($fp, $offset);
@@ -190,21 +204,22 @@ public function truncate($path, $offset = 0, $cb = null, $pri = EIO_PRI_DEFAULT)
return eio_truncate($path, $offset, $pri, $cb, $path);
}
- public static function sendfile($outfd, $path, $offset, $length, $cb, $pri = EIO_PRI_DEFAULT) {
+ public static function sendfile($outfd, $path, $cb, $offset = 0, $length = null, $pri = EIO_PRI_DEFAULT) {
if (!self::$supported) {
call_user_func($cb, false);
return;
}
- FS::open($path, 'r', function ($file) use ($cb, $pri, $outfd) {
+ FS::open($path, 'r', function ($file) use ($cb, $pri, $outfd, $offset, $length) {
if (!$file) {
call_user_func($cb, $path, false);
return;
}
- $file->sendfile($outfd, $offset, $length, $cb, $pri);
+ $file->sendfile($outfd, $cb, $offset, $length, $pri);
+
}, $pri);
}
- public function chown($path, $uid, $gid = -1, $pri = EIO_PRI_DEFAULT) {
+ public static function chown($path, $uid, $gid = -1, $pri = EIO_PRI_DEFAULT) {
if (!FS::$supported) {
$r = chown($path, $uid);
if ($gid !== -1) {
@@ -274,6 +289,7 @@ public static function open($path, $flags, $cb, $mode = null, $pri = EIO_PRI_DEF
$fd = fopen($path, $mode);
if (!$fd) {
call_user_func($cb, false);
+ return;
}
stream_set_blocking($fd, 0);
$file = new File($fd);
View
35 lib/File.php
@@ -147,21 +147,32 @@ public function read($length, $offset = null, $cb = null, $pri = EIO_PRI_DEFAULT
return true;
}
- public function sendfile($outfd, $length, $offset = null, $cb = null, $pri = EIO_PRI_DEFAULT) {
+ public function sendfile($outfd, $cb, $offset = 0, $length = null, $pri = EIO_PRI_DEFAULT) {
if (!FS::$supported) {
call_user_func($cb, $this, false);
return;
}
- eio_sendfile(
- $outf,
- $this->fd,
- $length,
- $offset !== null ? $offset : $this->pos,
- $pri,
- $cb,
- $this
- );
- return true;
+ static $chunkSize = 1024;
+ $handler = function ($file, $sent) use ($outfd, $cb, &$handler, &$offset, &$length, $pri, $chunkSize) {
+ if ($sent === -1) {
+ $sent = 0;
+ }
+ $offset += $sent;
+ $length -= $sent;
+ if ($length <= 0) {
+ call_user_func($cb, $file, true);
+ return;
+ }
+ eio_sendfile($outfd, $this->fd, $offset, min($chunkSize, $length), $pri, $handler, $this);
@igorw
igorw added a line comment Jun 25, 2012

Needs to have $that = $this.

@kakserpom
Owner
kakserpom added a line comment Jun 25, 2012
@igorw
igorw added a line comment Jun 25, 2012

eio_sendfile is not called from within $handler. That means that the context of $this has changed (at least on PHP 5.3). You need to assign $that = $this outside of the callback, then use ($that) on the callback and use $that within the callback. Just like in JavaScript.

@kakserpom
Owner
kakserpom added a line comment Jun 26, 2012

Fixed. I've replaced $this with $file (problem caused by blind copy-paste from another part of code). I understand what you mean, but the $file is already saved in request and passed to $handler in the first argument.
Thank you a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ };
+ if ($length !== null) {
+ $handler($this, -1);
+ return;
+ }
+ $this->stat(function ($file, $stat) use ($handler, &$length) {
+ $length = $stat['st_size'];
+ $handler($file, -1);
+ }, $pri);
}
public function readahead($length, $offset = null, $cb = null, $pri = EIO_PRI_DEFAULT) {
@@ -216,7 +227,7 @@ public function readAllChunked($cb = null, $chunkcb = null, $pri = EIO_PRI_DEFAU
call_user_func($chunkcb, $file, $data);
$offset += strlen($data);
$len = min($this->chunkSize, $size - $offset);
- if ($offset >= $stat['st_size']) {
+ if ($offset >= $size) {
call_user_func($cb, $file, true);
return;
}
View
69 lib/HTTPRequest.php
@@ -61,7 +61,7 @@ class HTTPRequest extends Request {
public $oldFashionUploadFP = false;
public $answerlen = 0;
public $contentLength;
- private $cookieNUm = 0;
+ private $cookieNum = 0;
public static $hvaltr = array(';' => '&', ' ' => '');
public static $htr = array('-' => '_');
@@ -106,13 +106,20 @@ public function preinit($req) {
public function sendfile($path, $cb, $pri = EIO_PRI_DEFAULT) {
$this->header('Content-Type: ' . MIME::get($path));
- if ($this->upstream->sendfileCap) {
- FS::sendfile($this->fd, $path, $cb, $pri);
+ if ($this->conn->sendfileCap) {
+ $req = $this;
+ $this->ensureSentHeaders();
+ FS::sendfile($this->conn->fd, $path, $cb, 0, null, $pri);
return;
}
$req = $this;
FS::readfileChunked($path, $cb,
function($file, $chunk) use ($req) { // readed chunk
+ static $first = true;
+ if ($first) {
+ $req->header('Content-Length: ' . $file->stat['st_size']);
+ $first = false;
+ }
$req->out($chunk);
}
);
@@ -284,23 +291,7 @@ public function stdin($c) {
$this->parseStdin();
}
- /**
- * Output some data
- * @param string String to out
- * @return boolean Success
- */
- public function out($s, $flush = true) {
- if ($flush) {
- ob_flush();
- }
-
- if ($this->aborted) {
- return false;
- }
-
- $l = strlen($s);
- $this->answerlen += $l;
-
+ public function ensureSentHeaders() {
if (!$this->headers_sent) {
if (isset($this->headers['STATUS'])) {
$h = (isset($this->attrs->noHttpVer) && ($this->attrs->noHttpVer) ? 'Status: ' : $this->attrs->server['SERVER_PROTOCOL']) . ' ' . $this->headers['STATUS'] . "\r\n";
@@ -322,22 +313,34 @@ public function out($s, $flush = true) {
$this->headers_sent_file = __FILE__;
$this->headers_sent_line = __LINE__;
$this->headers_sent = true;
+ $this->conn->requestOut($this, $h);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Output some data
+ * @param string String to out
+ * @return boolean Success
+ */
+ public function out($s, $flush = true) {
+ if ($flush) {
+ ob_flush();
+ }
- if (!Daemon::$compatMode) {
- if (
- !$this->attrs->chunked
- && !$this->sendfp
- ) {
- if (!isset($this->conn)) {
- return false;
- }
- return $this->conn->requestOut($this, $h . $s);
- }
-
- $this->conn->requestOut($this, $h);
- }
+ if ($this->aborted) {
+ return false;
+ }
+ if (!isset($this->conn)) {
+ return false;
}
+
+ $l = strlen($s);
+ $this->answerlen += $l;
+ $this->ensureSentHeaders();
+
if ($this->attrs->chunked) {
for ($o = 0; $o < $l;) {
$c = min($this->upstream->config->chunksize->value, $l - $o);
View
6 lib/HTTPServerConnection.php
@@ -97,7 +97,7 @@ public function onRead() {
$req->attrs->server['PHP_SELF'] = $u['path'];
$req->attrs->server['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null;
$req->attrs->server['SCRIPT_NAME'] = $req->attrs->server['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
- $req->attrs->server['SERVER_PROTOCOL'] = $command[2];
+ $req->attrs->server['SERVER_PROTOCOL'] = isset($command[2]) ? $command[2] : 'HTTP/1.1';
list(
$req->attrs->server['REMOTE_ADDR'],
@@ -228,8 +228,8 @@ public function freeRequest($req) {
unset($this->req);
}
public function badRequest($req) {
- $conn->write('<html><head><title>400 Bad Request</title></head><body bgcolor="white"><center><h1>400 Bad Request</h1></center></body></html>');
- $conn->finish();
+ $this->write('400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor="white"><center><h1>400 Bad Request</h1></center></body></html>');
+ $this->finish();
}
}

0 comments on commit ae74c47

Please sign in to comment.