Skip to content

Commit

Permalink
Upgrade to react/socket 0.8.*
Browse files Browse the repository at this point in the history
  • Loading branch information
jjok committed Aug 10, 2017
1 parent 41afe92 commit 2f89a4a
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 99 deletions.
6 changes: 1 addition & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
}
},
"require": {
"react/socket-client": "0.5.*",
"react/stream": "0.4.*",
"react/event-loop": "0.4.*",
"react/dns": "0.4.*",
"react/promise": "^2.4.1"
"react/socket": "^0.8.0"
},
"require-dev": {
"phpunit/phpunit": "^5.0.0",
Expand Down
3 changes: 1 addition & 2 deletions examples/config.php.example
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
<?php

$config = array(
'server' => 'yourMqttBroker.tld',
'port' => 1883,
'broker' => 'yourMqttBroker.tld:1883',
'options' => new \oliverlorenz\reactphpmqtt\packet\ConnectionOptions(array(
'keepAlive' => 120,
)),
Expand Down
14 changes: 8 additions & 6 deletions examples/connectPublish.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

use oliverlorenz\reactphpmqtt\ClientFactory;
use oliverlorenz\reactphpmqtt\protocol\Version4;
use React\Stream\Stream;
use React\Socket\ConnectionInterface as Connection;

require __DIR__ . '/../vendor/autoload.php';

$config = require 'config.php';

$connector = ClientFactory::createClient(new Version4(), '8.8.8.8');
$client = ClientFactory::createClient(new Version4(), '8.8.8.8');

$p = $connector->create($config['server'], $config['port'], $config['options']);
$p->then(function(Stream $stream) use ($connector) {
return $connector->publish($stream, 'hello/world', 'example message');
$p = $client->connect($config['broker'], $config['options']);
$client->getLoop()->addPeriodicTimer(10, function () use ($p, $client) {
$p->then(function(Connection $stream) use ($client) {
return $client->publish($stream, 'hello/world', 'example message');
});
});

$connector->getLoop()->run();
$client->getLoop()->run();
12 changes: 6 additions & 6 deletions examples/connectSubscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
use oliverlorenz\reactphpmqtt\ClientFactory;
use oliverlorenz\reactphpmqtt\packet\Publish;
use oliverlorenz\reactphpmqtt\protocol\Version4;
use React\Stream\Stream;
use React\Socket\ConnectionInterface as Stream;

require __DIR__ . '/../vendor/autoload.php';

$config = require 'config.php';

$connector = ClientFactory::createClient(new Version4(), '8.8.8.8');
$client = ClientFactory::createClient(new Version4(), '8.8.8.8');

$p = $connector->create($config['server'], $config['port'], $config['options']);
$p->then(function(Stream $stream) use ($connector) {
$p = $client->connect($config['broker'], $config['options']);
$p->then(function(Stream $stream) use ($client) {
$stream->on(Publish::EVENT, function(Publish $message) {
printf(
'Received payload "%s" for topic "%s"%s',
Expand All @@ -22,7 +22,7 @@
);
});

return $connector->subscribe($stream, 'hello/world', 0);
$client->subscribe($stream, 'hello/world', 0);
});

$connector->getLoop()->run();
$client->getLoop()->run();
17 changes: 11 additions & 6 deletions src/ClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,34 @@
use oliverlorenz\reactphpmqtt\protocol\Version;
use React\Dns\Resolver\Factory as DnsResolverFactory;
use React\EventLoop\Factory as EventLoopFactory;
use React\Socket\DnsConnector;
use React\Socket\SecureConnector;
use React\Socket\TcpConnector;

class ClientFactory
{
public static function createClient(Version $version, $resolverIp = '8.8.8.8')
{
$loop = EventLoopFactory::create();
$resolver = self::createDnsResolver($resolverIp, $loop);
$connector = self::createDnsConnector($resolverIp, $loop);

return new Connector($loop, $resolver, $version);
return new Connector($loop, $connector, $version);
}

public static function createSecureClient(Version $version, $resolverIp = '8.8.8.8')
{
$loop = EventLoopFactory::create();
$resolver = self::createDnsResolver($resolverIp, $loop);
$connector = self::createDnsConnector($resolverIp, $loop);
$secureConnector = new SecureConnector($connector, $loop);

return new SecureConnector($loop, $resolver, $version);
return new Connector($loop, $secureConnector, $version);
}

private static function createDnsResolver($resolverIp, $loop)
private static function createDnsConnector($resolverIp, $loop)
{
$dnsResolverFactory = new DnsResolverFactory();
$resolver = $dnsResolverFactory->createCached($resolverIp, $loop);

return $dnsResolverFactory->createCached($resolverIp, $loop);
return new DnsConnector(new TcpConnector($loop), $resolver);
}
}
102 changes: 54 additions & 48 deletions src/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,66 +22,66 @@
use oliverlorenz\reactphpmqtt\packet\UnsubscribeAck;
use oliverlorenz\reactphpmqtt\protocol\Version;
use oliverlorenz\reactphpmqtt\protocol\Violation as ProtocolViolation;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\EventLoop\LoopInterface as Loop;
use React\EventLoop\Timer\Timer;
use React\Promise\Deferred;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
use React\SocketClient\ConnectorInterface;
use React\Stream\Stream;

class Connector implements ConnectorInterface {
use React\Socket\ConnectionInterface as Connection;
use React\Socket\ConnectorInterface as ReactConnector;

class Connector
{
/**
* @var $loop LoopInterface
* @var $loop Loop
*/
private $loop;
protected $socketConnector;
private $socketConnector;
private $version;

private $messageCounter = 1;

public function __construct(LoopInterface $loop, Resolver $resolver, Version $version)
public function __construct(Loop $loop, ReactConnector $connector, Version $version)
{
$this->version = $version;
$this->socketConnector = new \React\SocketClient\Connector($loop, $resolver);
$this->socketConnector = $connector;
$this->loop = $loop;
}

/**
* Creates a new connection
*
* @param string $host
* @param int $port [optional]
* @param string $uri
* @param ConnectionOptions|null $options [optional]
*
* @return PromiseInterface Resolves to a \React\Stream\Stream once a connection has been established
*/
public function create(
$host,
$port = 1883,
public function connect(
$uri,
ConnectionOptions $options = null
) {
// Set default connection options, if none provided
if($options == null) {
$options = $this->getDefaultConnectionOptions();
}

return $this->socketConnector->create($host, $port)
->then(function (Stream $stream) use ($options) {
return $this->connect($stream, $options);
})
->then(function (Stream $stream) {
return $this->listenForPackets($stream);
})
->then(function(Stream $stream) use ($options) {
return $this->keepAlive($stream, $options->keepAlive);
});
$promise = $this->socketConnector->connect($uri);
$promise->then(function(Connection $stream) {
$this->listenForPackets($stream);
});
$connection = $promise->then(function(Connection $stream) use ($options) {
return $this->sendConnectPacket($stream, $options);
});
$connection->then(function(Connection $stream) use ($options) {
return $this->keepAlive($stream, $options->keepAlive);
});

return $connection;
}

private function listenForPackets(Stream $stream)
private function listenForPackets(Connection $stream)
{
$stream->on('data', function ($rawData) use ($stream) {
$stream->on('data', function($rawData) use ($stream) {
try {
foreach (Factory::getNextPacket($this->version, $rawData) as $packet) {
$stream->emit($packet::EVENT, [$packet]);
Expand All @@ -93,16 +93,16 @@ private function listenForPackets(Stream $stream)
$stream->emit('INVALID', [$e]);
}
});

$deferred = new Deferred();
$stream->on(ConnectionAck::EVENT, function($message) use ($stream, $deferred) {
$deferred->resolve($stream);
});

return $deferred->promise();
//
// $deferred = new Deferred();
// $stream->on(ConnectionAck::EVENT, function($message) use ($stream, $deferred) {
// $deferred->resolve($stream);
// });
//
// return $deferred->promise();
}

private function keepAlive(Stream $stream, $keepAlive)
private function keepAlive(Connection $stream, $keepAlive)
{
if($keepAlive > 0) {
$interval = $keepAlive / 2;
Expand All @@ -119,7 +119,7 @@ private function keepAlive(Stream $stream, $keepAlive)
/**
* @return \React\Promise\Promise
*/
public function connect(Stream $stream, ConnectionOptions $options) {
public function sendConnectPacket(Connection $stream, ConnectionOptions $options) {
$packet = new Connect(
$this->version,
$options->username,
Expand All @@ -136,16 +136,22 @@ public function connect(Stream $stream, ConnectionOptions $options) {
echo MessageHelper::getReadableByRawString($message);

$deferred = new Deferred();
if ($stream->write($message)) {
$stream->on(ConnectionAck::EVENT, function($message) use ($stream, $deferred) {
$deferred->resolve($stream);
} else {
$deferred->reject();
}
});

$stream->write($message);
// $deferred = new Deferred();
// if ($stream->write($message)) {
// $deferred->resolve($stream);
// } else {
// $deferred->reject();
// }

return $deferred->promise();
}

private function sendPacketToStream(Stream $stream, ControlPacket $controlPacket)
private function sendPacketToStream(Connection $stream, ControlPacket $controlPacket)
{
echo "send:\t\t" . get_class($controlPacket) . "\n";
$message = $controlPacket->get();
Expand All @@ -154,12 +160,12 @@ private function sendPacketToStream(Stream $stream, ControlPacket $controlPacket
}

/**
* @param Stream $stream
* @param Connection $stream
* @param string $topic
* @param int $qos
* @return \React\Promise\Promise
*/
public function subscribe(Stream $stream, $topic, $qos = 0)
public function subscribe(Connection $stream, $topic, $qos = 0)
{
$packet = new Subscribe($this->version);
$packet->addSubscription($topic, $qos);
Expand All @@ -174,11 +180,11 @@ public function subscribe(Stream $stream, $topic, $qos = 0)
}

/**
* @param Stream $stream
* @param Connection $stream
* @param string $topic
* @return \React\Promise\Promise
*/
public function unsubscribe(Stream $stream, $topic)
public function unsubscribe(Connection $stream, $topic)
{
$packet = new Unsubscribe($this->version);
$packet->removeSubscription($topic);
Expand All @@ -192,7 +198,7 @@ public function unsubscribe(Stream $stream, $topic)
return $deferred->promise();
}

public function disconnect(Stream $stream)
public function disconnect(Connection $stream)
{
$packet = new Disconnect($this->version);
$this->sendPacketToStream($stream, $packet);
Expand All @@ -204,7 +210,7 @@ public function disconnect(Stream $stream)
/**
* @return \React\Promise\Promise
*/
public function publish(Stream $stream, $topic, $message, $qos = 0, $dup = false, $retain = false)
public function publish(Connection $stream, $topic, $message, $qos = 0, $dup = false, $retain = false)
{
$packet = new Publish($this->version);
$packet->setTopic($topic);
Expand All @@ -227,7 +233,7 @@ public function publish(Stream $stream, $topic, $message, $qos = 0, $dup = false
}

/**
* @return LoopInterface
* @return Loop
*/
public function getLoop()
{
Expand Down
26 changes: 0 additions & 26 deletions src/SecureConnector.php

This file was deleted.

0 comments on commit 2f89a4a

Please sign in to comment.