Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a HAPI2.0 transporter implementation RabbitMQConnector.
- Loading branch information
Showing
8 changed files
with
277 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
#!/usr/bin/env python | ||
""" | ||
Copyright (C) 2015 Project Hatohol | ||
This file is part of Hatohol. | ||
Hatohol is free software: you can redistribute it and/or modify | ||
it under the terms of the GNU Lesser General Public License, version 3 | ||
as published by the Free Software Foundation. | ||
Hatohol is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU Lesser General Public License for more details. | ||
You should have received a copy of the GNU Lesser General Public | ||
License along with Hatohol. If not, see | ||
<http://www.gnu.org/licenses/>. | ||
""" | ||
|
||
import logging | ||
import pika | ||
from transporter import Transporter | ||
|
||
class RabbitMQConnector(Transporter): | ||
|
||
def __init__(self): | ||
Transporter.__init__(self) | ||
self._channel = None | ||
|
||
def setup(self, transporter_args): | ||
""" | ||
@param transporter_args | ||
The following keys shall be included. | ||
- amqp_broker A broker IP or FQDN. | ||
- amqp_port A broker port. | ||
- amqp_vhost A virtual host. | ||
- amqp_queue A queue name. | ||
- amqp_user A user name. | ||
- amqp_password A password. | ||
""" | ||
|
||
def set_if_not_none(kwargs, key, val): | ||
if val is not None: | ||
kwargs[key] = val | ||
|
||
broker = transporter_args["amqp_broker"] | ||
port = transporter_args["amqp_port"] | ||
vhost = transporter_args["amqp_vhost"] | ||
queue_name = transporter_args["amqp_queue"] | ||
user_name = transporter_args["amqp_user"] | ||
password = transporter_args["amqp_password"] | ||
|
||
logging.debug("Called stub method: call()."); | ||
self._queue_name = queue_name | ||
credentials = pika.credentials.PlainCredentials(user_name, password) | ||
|
||
conn_args = {} | ||
set_if_not_none(conn_args, "host", broker) | ||
set_if_not_none(conn_args, "port", port) | ||
set_if_not_none(conn_args, "virtual_host", vhost) | ||
set_if_not_none(conn_args, "credentials", credentials) | ||
param = pika.connection.ConnectionParameters(**conn_args) | ||
connection = pika.adapters.blocking_connection.BlockingConnection(param) | ||
self._channel = connection.channel() | ||
self._channel.queue_declare(queue=queue_name) | ||
|
||
def call(self, msg): | ||
self.__publish(msg) | ||
|
||
def reply(self, msg): | ||
self.__publish(msg) | ||
|
||
def run_receive_loop(self): | ||
assert self._channel != None | ||
|
||
self._channel.basic_consume(self.__consume_handler, | ||
queue=self._queue_name, no_ack=True) | ||
self._channel.start_consuming() | ||
|
||
def __consume_handler(self, ch, method, properties, body): | ||
receiver = self.get_receiver() | ||
if receiver is None: | ||
logging.warning("Receiver is not registered.") | ||
return | ||
receiver(self._channel, body) | ||
|
||
def __publish(self, msg): | ||
self._channel.basic_publish(exchange="", routing_key=self._queue_name, | ||
body=msg) | ||
|
||
@classmethod | ||
def define_arguments(cls, parser): | ||
parser.add_argument("--amqp-broker", type=str, default="localhost") | ||
parser.add_argument("--amqp-port", type=int, default=None) | ||
parser.add_argument("--amqp-vhost", type=str, default=None) | ||
parser.add_argument("--amqp-queue", type=str, default="hap2-queue") | ||
parser.add_argument("--amqp-user", type=str, default="hatohol") | ||
parser.add_argument("--amqp-password", type=str, default="hatohol") | ||
|
||
@classmethod | ||
def parse_arguments(cls, args): | ||
return {"amqp_broker": args.amqp_broker, | ||
"amqp_port": args.amqp_port, | ||
"amqp_vhost": args.amqp_vhost, | ||
"amqp_queue": args.amqp_queue, | ||
"amqp_user": args.amqp_user, | ||
"amqp_password": args.amqp_password} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
#!/usr/bin/env python | ||
""" | ||
Copyright (C) 2015 Project Hatohol | ||
This file is part of Hatohol. | ||
Hatohol is free software: you can redistribute it and/or modify | ||
it under the terms of the GNU Lesser General Public License, version 3 | ||
as published by the Free Software Foundation. | ||
Hatohol is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU Lesser General Public License for more details. | ||
You should have received a copy of the GNU Lesser General Public | ||
License along with Hatohol. If not, see | ||
<http://www.gnu.org/licenses/>. | ||
""" | ||
import unittest | ||
import os | ||
import subprocess | ||
from rabbitmqconnector import RabbitMQConnector | ||
|
||
class TestRabbitMQConnector(unittest.TestCase): | ||
""" | ||
Before executing this test, some setting for RabbitMQ is needed. | ||
See README in the same directory. | ||
""" | ||
@classmethod | ||
def setUpClass(cls): | ||
cls.__broker = "localhost" | ||
port = os.getenv("RABBITMQ_NODE_PORT") | ||
cls.__port = None | ||
if port is not None: | ||
cls.__port = int(port) | ||
cls.__vhost = "test" | ||
cls.__queue_name = "test_queue" | ||
cls.__user_name = "test_user" | ||
cls.__password = "test_password" | ||
|
||
def test_setup(self): | ||
conn = RabbitMQConnector() | ||
conn.setup(self.__get_default_transporter_args()) | ||
|
||
def test_call(self): | ||
TEST_BODY = "CALL TEST" | ||
self.__delete_test_queue() | ||
conn = self.__create_connected_connector() | ||
conn.call(TEST_BODY) | ||
self.assertEqual(self.__get_from_test_queue(), TEST_BODY) | ||
|
||
def test_reply(self): | ||
TEST_BODY = "REPLY TEST" | ||
self.__delete_test_queue() | ||
conn = self.__create_connected_connector() | ||
conn.reply(TEST_BODY) | ||
self.assertEqual(self.__get_from_test_queue(), TEST_BODY) | ||
|
||
def test_run_receive_loop_without_connect(self): | ||
conn = RabbitMQConnector() | ||
self.assertRaises(AssertionError, conn.run_receive_loop) | ||
|
||
def test_run_receive_loop(self): | ||
class Receiver(): | ||
def __call__(self, channel, msg): | ||
self.msg = msg | ||
channel.stop_consuming() | ||
|
||
TEST_BODY = "FOO" | ||
self.__delete_test_queue() | ||
conn = RabbitMQConnector() | ||
conn.setup(self.__get_default_transporter_args()) | ||
receiver = Receiver() | ||
conn.set_receiver(receiver) | ||
self.__publish(TEST_BODY) | ||
conn.run_receive_loop() | ||
self.assertEquals(receiver.msg, TEST_BODY) | ||
|
||
def __get_default_transporter_args(self): | ||
args = {"amqp_broker": self.__broker, "amqp_port": self.__port, | ||
"amqp_vhost": self.__vhost, "amqp_queue": self.__queue_name, | ||
"amqp_user": self.__user_name, "amqp_password": self.__password} | ||
return args | ||
|
||
def __create_connected_connector(self): | ||
conn = RabbitMQConnector() | ||
conn.setup(self.__get_default_transporter_args()) | ||
return conn | ||
|
||
def __execute(self, args): | ||
subproc = subprocess.Popen(args, stdout=subprocess.PIPE) | ||
output = subproc.communicate()[0] | ||
self.assertEquals(subproc.returncode, 0) | ||
return output | ||
|
||
|
||
def __build_broker_url(self): | ||
port = "" | ||
if self.__port is not None: | ||
port = ":%d" % self.__port | ||
return "amqp://%s:%s@%s%s/%s" % (self.__user_name, self.__password, | ||
self.__broker, port, self.__vhost) | ||
|
||
def __build_broker_options(self): | ||
if os.getenv("RABBITMQ_CONNECTOR_TEST_WORKAROUND") is not None: | ||
return ["-u", self.__build_broker_url()] | ||
|
||
def append_if_not_none(li, key, val): | ||
if val is not None: | ||
li.append(key) | ||
li.append(val) | ||
|
||
opt = [] | ||
server = self.__broker | ||
if server is not None and self.__port is not None: | ||
server += ":%d" % self.__port | ||
append_if_not_none(opt, "--server", server) | ||
append_if_not_none(opt, "--vhost", self.__vhost) | ||
append_if_not_none(opt, "--username", self.__user_name) | ||
append_if_not_none(opt, "--password", self.__password) | ||
return opt | ||
|
||
def __publish(self, body): | ||
args = ["amqp-publish"] | ||
args.extend(self.__build_broker_options()) | ||
args.extend(["-r", self.__queue_name, "-b", body]) | ||
subprocess.Popen(args).communicate() | ||
|
||
def __get_from_test_queue(self): | ||
args = ["amqp-get"] | ||
args.extend(self.__build_broker_options()) | ||
args.extend(["-q", self.__queue_name]) | ||
return self.__execute(args) | ||
|
||
def __delete_test_queue(self): | ||
args = ["amqp-delete-queue"] | ||
args.extend(self.__build_broker_options()) | ||
args.extend(["-q", self.__queue_name]) | ||
subprocess.Popen(args).communicate() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/bin/sh | ||
|
||
sudo rabbitmqctl add_vhost test | ||
sudo rabbitmqctl add_user test_user test_password | ||
sudo rabbitmqctl set_permissions -p test test_user ".*" ".*" ".*" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
#!/bin/sh | ||
mkdir -p /etc/rabbitmq/rabbitmq.conf.d | ||
echo RABBITMQ_NODE_PORT=5673 > /etc/rabbitmq/rabbitmq.conf.d/port | ||
echo "export RABBITMQ_NODE_PORT=5673" >> /etc/default/rabbitmq-server |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters