Skip to content

Commit

Permalink
allow to pool servers together when accepting incoming connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Baptouuuu committed Mar 9, 2024
1 parent 547e810 commit 4a8d9a2
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- `Innmind\IO\Sockets::servers()`
- `Innmind\IO\Sockets\Servers`
- `Innmind\IO\Sockets\Server\Pool`

## 2.6.0 - 2024-03-09

Expand Down
10 changes: 10 additions & 0 deletions src/Sockets/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public static function of(
);
}

/**
* @param self<T> $socket
*
* @return Server\Pool<T>
*/
public function with(self $socket): Server\Pool
{
return Server\Pool::of($this->watch, $this->socket, $socket->unwrap());
}

/**
* @return T
*/
Expand Down
174 changes: 174 additions & 0 deletions src/Sockets/Server/Pool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
<?php
declare(strict_types = 1);

namespace Innmind\IO\Sockets\Server;

use Innmind\IO\Sockets\{
Server,
Client,
};
use Innmind\TimeContinuum\ElapsedPeriod;
use Innmind\Socket\{
Server as Socket,
Server\Connection,
};
use Innmind\Stream\Watch;
use Innmind\Immutable\{
Sequence,
Predicate\Instance,
};

/**
* @template-covariant T of Socket
*/
final class Pool
{
/** @var non-empty-list<T> */
private array $sockets;
/** @var callable(?ElapsedPeriod): Watch */
private $watch;
/** @var callable(T): Sequence<T> */
private $wait;

/**
* @psalm-mutation-free
*
* @param callable(?ElapsedPeriod): Watch $watch
* @param non-empty-list<T> $sockets
* @param callable(T): Sequence<T> $wait
*/
private function __construct(
callable $watch,
array $sockets,
callable $wait,
) {
$this->watch = $watch;
$this->sockets = $sockets;
$this->wait = $wait;
}

/**
* @psalm-mutation-free
* @internal
* @template A of Socket
*
* @param callable(?ElapsedPeriod): Watch $watch
* @param A $first
* @param A $second
*
* @return self<A>
*/
public static function of(
callable $watch,
Socket $first,
Socket $second,
): self {
/** @var self<A> */
return new self(
$watch,
[$first, $second],
static fn(Socket $socket) => Sequence::of($socket),
);
}

/**
* @param Server<T> $server
*
* @return self<T>
*/
public function with(Server $server): self
{
return new self(
$this->watch,
[...$this->sockets, $server->unwrap()],
$this->wait,
);
}

/**
* @return Sequence<T>
*/
public function unwrap(): Sequence
{
return Sequence::of(...$this->sockets);
}

/**
* Wait forever for the socket to be ready to read before tryin to use it
*
* @psalm-mutation-free
*
* @return self<T>
*/
public function watch(): self
{
/** @var self<T> */
return new self(
$this->watch,
$this->sockets,
fn(Socket $socket, Socket ...$sockets) => ($this->watch)(null)
->forRead($socket, ...$sockets)()
->map(
static fn($ready) => $ready
->toRead()
->filter(static fn($ready) => \in_array(
$ready,
[$socket, ...$sockets],
true,
))
->keep(Instance::of(Socket::class)),
)
->toSequence()
->flatMap(
static fn($toRead) => Sequence::of(...$toRead->toList()),
),
);
}

/**
* @psalm-mutation-free
*
* @return self<T>
*/
public function timeoutAfter(ElapsedPeriod $timeout): self
{
/** @var self<T> */
return new self(
$this->watch,
$this->sockets,
fn(Socket $socket, Socket ...$sockets) => ($this->watch)($timeout)
->forRead($socket, ...$sockets)()
->map(
static fn($ready) => $ready
->toRead()
->filter(static fn($ready) => \in_array(
$ready,
[$socket, ...$sockets],
true,
))
->keep(Instance::of(Socket::class)),
)
->toSequence()
->flatMap(
static fn($toRead) => Sequence::of(...$toRead->toList()),
),
);
}

/**
* @return Sequence<Client<Connection>>
*/
public function accept(): Sequence
{
return ($this->wait)(...$this->sockets)
->flatMap(
static fn($socket) => $socket
->accept()
->toSequence(),
)
->map(fn($client) => Client::of(
$this->watch,
$client,
));
}
}
107 changes: 107 additions & 0 deletions tests/FunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,111 @@ public function testServerAcceptConnection()
$client->close();
$server->close();
}

public function testServerPool()
{
@\unlink('/tmp/foo.sock');
@\unlink('/tmp/bar.sock');
@\unlink('/tmp/baz.sock');
$addressFoo = Address\Unix::of('/tmp/foo');
$addressBar = Address\Unix::of('/tmp/bar');
$addressBaz = Address\Unix::of('/tmp/baz');
$serverFoo = Server\Unix::recoverable($addressFoo)->match(
static fn($server) => $server,
static fn() => null,
);

$this->assertNotNull($serverFoo);

$serverBar = Server\Unix::recoverable($addressBar)->match(
static fn($server) => $server,
static fn() => null,
);

$this->assertNotNull($serverBar);

$serverBaz = Server\Unix::recoverable($addressBaz)->match(
static fn($server) => $server,
static fn() => null,
);

$this->assertNotNull($serverBaz);

$clientFoo = Client\Unix::of($addressFoo)->match(
static fn($socket) => $socket,
static fn() => null,
);

$this->assertNotNull($clientFoo);

$clientBar = Client\Unix::of($addressBar)->match(
static fn($socket) => $socket,
static fn() => null,
);

$this->assertNotNull($clientBar);

$clientBaz = Client\Unix::of($addressBaz)->match(
static fn($socket) => $socket,
static fn() => null,
);

$this->assertNotNull($clientBaz);

$_ = IO::of(Select::timeoutAfter(...))
->sockets()
->clients()
->wrap($clientFoo)
->send(Sequence::of(Str::of('foo')))
->match(
static fn() => null,
static fn() => null,
);
$_ = IO::of(Select::timeoutAfter(...))
->sockets()
->clients()
->wrap($clientBar)
->send(Sequence::of(Str::of('bar')))
->match(
static fn() => null,
static fn() => null,
);
$_ = IO::of(Select::timeoutAfter(...))
->sockets()
->clients()
->wrap($clientBaz)
->send(Sequence::of(Str::of('baz')))
->match(
static fn() => null,
static fn() => null,
);

$servers = IO::of(Select::timeoutAfter(...))
->sockets()
->servers();
$result = $servers
->wrap($serverFoo)
->with($servers->wrap($serverBar))
->with($servers->wrap($serverBaz))
->timeoutAfter(ElapsedPeriod::of(1_000))
->accept()
->flatMap(
static fn($client) => $client
->frames(Frame\Chunk::of(3))
->one()
->toSequence(),
)
->map(static fn($data) => $data->toString());

$this->assertCount(3, $result);
$this->assertTrue($result->contains('foo'));
$this->assertTrue($result->contains('bar'));
$this->assertTrue($result->contains('baz'));
$clientFoo->close();
$clientBar->close();
$clientBaz->close();
$serverFoo->close();
$serverBar->close();
$serverBaz->close();
}
}

0 comments on commit 4a8d9a2

Please sign in to comment.