Skip to content
Permalink
Browse files

Attempt to buffer unsent messages

  • Loading branch information...
muglug committed May 30, 2019
1 parent 8fd5967 commit 9d7cf662792fa9c18305cb4514d5cbe8343f1ca0
Showing with 54 additions and 25 deletions.
  1. +54 −25 src/Psalm/Internal/Fork/Pool.php
@@ -129,17 +129,40 @@ public function __construct(
// Get the work for this process
$task_data_iterator = array_values($process_task_data_iterator)[$proc_id];
$task_done_buffer = '';
foreach ($task_data_iterator as $i => $task_data) {
$task_closure($i, $task_data);
$task_result = $task_closure($i, $task_data);
$task_done_message = new ForkTaskDoneMessage($task_result);
$serialized_message = $task_done_buffer . base64_encode(serialize($task_done_message)) . PHP_EOL;
$bytes_written = @fwrite($write_stream, $serialized_message);
if (strlen($serialized_message) !== $bytes_written) {
$task_done_buffer = substr($serialized_message, $bytes_written);

This comment has been minimized.

Copy link
@weirdan

weirdan May 30, 2019

Contributor

You need to flush the remaining $task_done_buffer after this loop, as it's not necessarily empty at that point (if last fwrite() did not write all that remained there).

This comment has been minimized.

Copy link
@muglug

muglug May 30, 2019

Author Member

It's prepended to the message below

This comment has been minimized.

Copy link
@weirdan

weirdan May 30, 2019

Contributor

Ah, right, missed that bit.

}
}
// Execute each child's shutdown closure before
// exiting the process
$results = $shutdown_closure();
// Serialize this child's produced results and send them to the parent.
$serialized_message = serialize($results ?: []);
fwrite($write_stream, $serialized_message);
$process_done_message = new ForkProcessDoneMessage($results ?: []);
$serialized_message = $task_done_buffer . base64_encode(serialize($process_done_message)) . PHP_EOL;
$bytes_to_write = strlen($serialized_message);
$bytes_written = 0;
while ($bytes_written < $bytes_to_write) {
// attemt to write the remaining unsent part
$bytes_written += @fwrite($write_stream, substr($serialized_message, $bytes_written));
if ($bytes_written < $bytes_to_write) {
// wait a bit
usleep(500000);
}
}
fclose($write_stream);
@@ -216,6 +239,8 @@ private function readResultsFromChildren()
/** @var array<int, string> $content */
$content = array_fill_keys(array_keys($streams), '');
$terminationMessages = [];
// Read the data off of all the stream.
while (count($streams) > 0) {
$needs_read = array_values($streams);
@@ -236,36 +261,40 @@ private function readResultsFromChildren()
$content[intval($file)] .= $buffer;
}
if (strpos($buffer, PHP_EOL) !== false) {
$serialized_messages = explode(PHP_EOL, $content[intval($file)]);
$content[intval($file)] = array_pop($serialized_messages);
foreach ($serialized_messages as $serialized_message) {
$message = unserialize(base64_decode($serialized_message));
if ($message instanceof ForkProcessDoneMessage) {
$terminationMessages[] = $message->data;
} elseif ($message instanceof ForkTaskDoneMessage) {
if ($this->task_done_closure !== null) {
($this->task_done_closure)($message->data);
}
} else {
error_log('Child should return ForkMessage - response type=' . gettype($message));
$this->did_have_error = true;
}
}
}
// If the stream has closed, stop trying to select on it.
if (feof($file)) {
if ($content[intval($file)] !== '') {
error_log('Child did not send full message before closing the connection');
$this->did_have_error = true;
}
fclose($file);
unset($streams[intval($file)]);
}
}
}
return array_values(
array_map(
/**
* @param string $data
*
* @return array
*/
function ($data) {
/** @var array */
$result = unserialize($data);
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_array($result)) {
error_log(
'Child terminated without returning a serialized array - response type=' . gettype($result)
);
$this->did_have_error = true;
}
return $result;
},
$content
)
);
return array_values($terminationMessages);
}
/**

0 comments on commit 9d7cf66

Please sign in to comment.
You can’t perform that action at this time.