Skip to content

Commit

Permalink
Merge pull request #53 from ale10257/master
Browse files Browse the repository at this point in the history
Fix ssl connection bug
  • Loading branch information
Mikhail Bakulin committed Oct 13, 2020
2 parents 457e697 + ac52e08 commit 2c5552a
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 32 deletions.
40 changes: 29 additions & 11 deletions Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
use mikemadisonweb\rabbitmq\exceptions\InvalidConfigException;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use Yii;
use yii\base\Component;
use yii\di\NotInstantiableException;
use yii\helpers\ArrayHelper;

class Configuration extends Component
Expand Down Expand Up @@ -145,44 +148,53 @@ public function getConfig() : Configuration
/**
* Get connection service
* @param string $connectionName
* @return AbstractConnection
* @return object|AbstractConnection
* @throws NotInstantiableException
* @throws \yii\base\InvalidConfigException
*/
public function getConnection(string $connectionName = '') : AbstractConnection
{
if ('' === $connectionName) {
$connectionName = self::DEFAULT_CONNECTION_NAME;
}

return \Yii::$container->get(sprintf(self::CONNECTION_SERVICE_NAME, $connectionName));
return Yii::$container->get(sprintf(self::CONNECTION_SERVICE_NAME, $connectionName));
}

/**
* Get producer service
* @param string $producerName
* @return Producer
* @return Producer|object
* @throws \yii\base\InvalidConfigException
* @throws NotInstantiableException
*/
public function getProducer(string $producerName) : Producer
public function getProducer(string $producerName)
{
return \Yii::$container->get(sprintf(self::PRODUCER_SERVICE_NAME, $producerName));
return Yii::$container->get(sprintf(self::PRODUCER_SERVICE_NAME, $producerName));
}

/**
* Get consumer service
* @param string $consumerName
* @return Consumer
* @return Consumer|object
* @throws NotInstantiableException
* @throws \yii\base\InvalidConfigException
*/
public function getConsumer(string $consumerName) : Consumer
public function getConsumer(string $consumerName)
{
return \Yii::$container->get(sprintf(self::CONSUMER_SERVICE_NAME, $consumerName));
return Yii::$container->get(sprintf(self::CONSUMER_SERVICE_NAME, $consumerName));
}

/**
* Get routing service
* @return Routing
* @param AbstractConnection $connection
* @return Routing|object|string
* @throws NotInstantiableException
* @throws \yii\base\InvalidConfigException
*/
public function getRouting(AbstractConnection $connection) : Routing
public function getRouting(AbstractConnection $connection)
{
return \Yii::$container->get(Configuration::ROUTING_SERVICE_NAME, ['conn' => $connection]);
return Yii::$container->get(Configuration::ROUTING_SERVICE_NAME, ['conn' => $connection]);
}

/**
Expand Down Expand Up @@ -261,6 +273,12 @@ protected function validateRequired()
if (isset($connection['type']) && !is_subclass_of($connection['type'], AbstractConnection::class)) {
throw new InvalidConfigException('Connection type should be a subclass of PhpAmqpLib\Connection\AbstractConnection.');
}
if (!empty($connection['ssl_context']) && empty($connection['type'])) {
throw new InvalidConfigException('If you are using a ssl connection, the connection type must be AMQPSSLConnection::class');
}
if (!empty($connection['ssl_context']) && $connection['type'] !== AMQPSSLConnection::class) {
throw new InvalidConfigException('If you are using a ssl connection, the connection type must be AMQPSSLConnection::class');
}
}

foreach ($this->exchanges as $exchange) {
Expand Down
17 changes: 17 additions & 0 deletions components/AbstractConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ public function __construct($class, array $parameters)
*/
public function createConnection() : AbstractConnection
{
if ($this->_parameters['ssl_context'] !== null) {
return new $this->_class(
$this->_parameters['host'],
$this->_parameters['port'],
$this->_parameters['user'],
$this->_parameters['password'],
$this->_parameters['vhost'],
$this->_parameters['ssl_context'],
[
'connection_timeout' => $this->_parameters['connection_timeout'],
'read_write_timeout' => $this->_parameters['read_write_timeout'],
'keepalive' => $this->_parameters['keepalive'],
'heartbeat' => $this->_parameters['heartbeat'],
'channel_rpc_timeout' => $this->_parameters['channel_rpc_timeout'],
]
);
}
return new $this->_class(
$this->_parameters['host'],
$this->_parameters['port'],
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
],
"require": {
"php": "^7.0",
"yiisoft/yii2": "^2.0",
"yiisoft/yii2": "^2.0.13",
"php-amqplib/php-amqplib": "^2.9"
},
"require-dev": {
Expand Down
41 changes: 21 additions & 20 deletions controllers/RabbitMQController.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use ReflectionException;
use yii\base\Action;
use yii\console\Controller;
use yii\console\ExitCode;
use yii\helpers\Console;

/**
Expand Down Expand Up @@ -101,7 +102,7 @@ public function actionConsume(string $name): int
{
$this->stderr(Console::ansiFormat("Consumer `{$name}` doesn't exist: {$e->getMessage()}\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}

$this->validateConsumerOptions($consumer);
Expand All @@ -111,7 +112,7 @@ public function actionConsume(string $name): int
}
$consumer->consume($this->messagesLimit);

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand All @@ -136,15 +137,15 @@ public function actionPublish(string $producerName, string $exchangeName, string
{
$this->stderr(Console::ansiFormat("Producer `{$producerName}` doesn't exist: {$e->getMessage()}\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}

$data = '';
if (posix_isatty(STDIN))
{
$this->stderr(Console::ansiFormat("Please pipe in some data in order to send it.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}
while (!feof(STDIN))
{
Expand All @@ -153,7 +154,7 @@ public function actionPublish(string $producerName, string $exchangeName, string
$producer->publish($data, $exchangeName, $routingKey);
$this->stdout("Message was successfully published.\n", Console::FG_GREEN);

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand All @@ -175,11 +176,11 @@ public function actionDeclareAll(string $connectionName = Configuration::DEFAULT
Console::ansiFormat("All configured entries was successfully declared.\n", [Console::FG_GREEN])
);

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}
$this->stderr(Console::ansiFormat("No queues, exchanges or bindings configured.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}

/**
Expand All @@ -201,12 +202,12 @@ public function actionDeclareExchange(
{
$this->stderr(Console::ansiFormat("Exchange `{$exchangeName}` is already exists.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}
$routing->declareExchange($exchangeName);
$this->stdout(Console::ansiFormat("Exchange `{$exchangeName}` was declared.\n", [Console::FG_GREEN]));

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand All @@ -228,12 +229,12 @@ public function actionDeclareQueue(
{
$this->stderr(Console::ansiFormat("Queue `{$queueName}` is already exists.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}
$routing->declareQueue($queueName);
$this->stdout(Console::ansiFormat("Queue `{$queueName}` was declared.\n", [Console::FG_GREEN]));

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand All @@ -249,19 +250,19 @@ public function actionDeleteAll(string $connection = Configuration::DEFAULT_CONN
if ($this->interactive)
{
$input = Console::prompt('Are you sure you want to delete all queues and exchanges?', ['default' => 'yes']);
if ($input !== 'yes')
if ($input !== 'yes' && $input !== 'y')
{
$this->stderr(Console::ansiFormat("Aborted.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}
}
$conn = \Yii::$app->rabbitmq->getConnection($connection);
$routing = \Yii::$app->rabbitmq->getRouting($conn);
$routing->deleteAll();
$this->stdout(Console::ansiFormat("All configured entries was deleted.\n", [Console::FG_GREEN]));

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand All @@ -284,15 +285,15 @@ public function actionDeleteExchange(
{
$this->stderr(Console::ansiFormat("Aborted.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}
}
$conn = \Yii::$app->rabbitmq->getConnection($connectionName);
$routing = \Yii::$app->rabbitmq->getRouting($conn);
$routing->deleteExchange($exchangeName);
$this->stdout(Console::ansiFormat("Exchange `{$exchangeName}` was deleted.\n", [Console::FG_GREEN]));

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand All @@ -315,7 +316,7 @@ public function actionDeleteQueue(
{
$this->stderr(Console::ansiFormat("Aborted.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}
}

Expand All @@ -324,7 +325,7 @@ public function actionDeleteQueue(
$routing->deleteQueue($queueName);
$this->stdout(Console::ansiFormat("Queue `{$queueName}` was deleted.\n", [Console::FG_GREEN]));

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand All @@ -350,7 +351,7 @@ public function actionPurgeQueue(
{
$this->stderr(Console::ansiFormat("Aborted.\n", [Console::FG_RED]));

return self::EXIT_CODE_ERROR;
return ExitCode::UNSPECIFIED_ERROR;
}
}

Expand All @@ -359,7 +360,7 @@ public function actionPurgeQueue(
$routing->purgeQueue($queueName);
$this->stdout(Console::ansiFormat("Queue `{$queueName}` was purged.\n", [Console::FG_GREEN]));

return self::EXIT_CODE_NORMAL;
return ExitCode::OK;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions events/RabbitMQConsumerEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace mikemadisonweb\rabbitmq\events;

use mikemadisonweb\rabbitmq\components\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
use yii\base\Event;

class RabbitMQConsumerEvent extends Event
Expand Down
2 changes: 2 additions & 0 deletions events/RabbitMQPublisherEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace mikemadisonweb\rabbitmq\events;

use mikemadisonweb\rabbitmq\components\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
use yii\base\Event;

class RabbitMQPublisherEvent extends Event
Expand Down

0 comments on commit 2c5552a

Please sign in to comment.