Skip to content
Permalink
Browse files

Initial Commit

  • Loading branch information
prologic committed Jul 29, 2015
0 parents commit e46af3fd75e04f5b876cc80f239831266e0e57ed
Showing with 414 additions and 0 deletions.
  1. +6 −0 .gitignore
  2. +6 −0 Dockerfile
  3. +17 −0 README.md
  4. +21 −0 README.rst
  5. +13 −0 broker/__init__.py
  6. +8 −0 broker/events.py
  7. +152 −0 broker/main.py
  8. +36 −0 broker/mqtt.py
  9. +68 −0 broker/plugin.py
  10. +28 −0 broker/utils.py
  11. +11 −0 broker/version.py
  12. +5 −0 requirements.txt
  13. +40 −0 setup.py
  14. +3 −0 testapi.sh
@@ -0,0 +1,6 @@
*~
dist
build
*.bak
*.py[co]
*.egg-info
@@ -0,0 +1,6 @@
FROM crux/python:onbuild

EXPOSE 80

ENTRYPOINT ["broker"]
CMD []
@@ -0,0 +1,17 @@
broker
======

OpenKnot Broker.

Installation
------------

Either pull the automatically updated [Docker](http://docker.com/) image:

$ docker pull openknot/broker

Or install from the development repository:

$ git clone https://github.com/openknot/broker.git
$ cd broker
$ pip install -r requirements.txt
@@ -0,0 +1,21 @@
.. _docker: http://docker.com/
.. _dotCloud: http://dotcloud.com/


broker
======

OpenKnot Broker.

Installation
------------

Either pull the automatically updated `Docker`_ image::
$ docker pull openknot/broker

Or install from the development repository::
$ git clone https://github.com/openknot/broker.git
$ cd broker
$ pip install -r requirements.txt
@@ -0,0 +1,13 @@
"""broker - OpenKnot Broker
OpenKnot Broker
:copyright: CopyRight (C) 2015 by James Mills
"""


__author__ = "James Mills, prologic at shortcircuit dot net dot au"
__date__ = "29th July 2015"


from .version import version as __version__ # noqa
@@ -0,0 +1,8 @@
"""Events"""


from circuits import Event


class message(Event):
"""message Event"""
@@ -0,0 +1,152 @@
#!/usr/bin/env python


"""OpenKnot Broker Daemon"""


from __future__ import print_function


import os
import sys
import logging
from os import environ
from logging import getLogger
from json import dumps, loads
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser


from circuits.web import Controller, Server
from circuits import handler, Component, Debugger


from .events import message
from .mqtt import mqtt, MQTT
from .utils import parse_bind, waitfor


def setup_logging(args):
logstream = sys.stderr if args.logfile is None else open(args.logfile, "a")

logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.DEBUG if args.debug else logging.INFO,
stream=logstream,
)

return getLogger(__name__)


def setup_mqtt(args, logger):
host, port = parse_bind(args.url)

logger.debug("Waiting for MQTT Service on {0:s}:{1:d} ...".format(host, port))

if not waitfor(host, port):
logger.error("Timed out waiting for MQTT Service on {0:s}:{1:d} ...".format(host, port))
raise SystemExit(1)


class JSONSerializer(Component):

channel = "web"

# 1 higher than the default response handler
@handler("response", priority=1.0)
def serialize_response_body(self, response):
if isinstance(response.body, dict):
response.headers["Content-Type"] = "application/json"
response.body = dumps(response.body)


class Dispatcher(Component):

def message(self, payload):
protocol = payload.get("protocol", "unknown")

self.fire(mqtt(protocol, payload))


class API(Controller):

channel = "/message"

def POST(self, event, *args, **kwargs):
req, res = event.args[:2]
payload = loads(req.body.read())

self.fire(message(payload))

return {"success": True}


class App(Component):

def init(self, args):
self.args = args

self.logger = getLogger(__name__)

if self.args.debug:
Debugger().register(self)

bind = parse_bind(self.args.bind)

MQTT(args.url).register(self)

Server(bind).register(self)
JSONSerializer().register(self)

API().register(self)

def signal(self, *args):
raise SystemExit(0)


def parse_args():
parser = ArgumentParser(
description=__doc__,
formatter_class=ArgumentDefaultsHelpFormatter
)

parser.add_argument(
"-b", "--bind", action="store", dest="bind", metavar="INT", type=str,
default=environ.get("BIND", "0.0.0.0:80"),
help="Interface and Port to Bind to"
)

parser.add_argument(
"-d", "--debug", action="store_true", dest="debug",
default=environ.get("DEBUG", False),
help="Enable Debug Mode"
)

parser.add_argument(
"-l", "--logfile", action="store", default=None,
dest="logfile", metavar="FILE", type=str,
help="Log file to store logs in"
)

parser.add_argument(
"-u", "--url", action="store", dest="url", metavar="URL", type=str,
default=environ.get("MQTT_PORT", environ.get("URL", None)), required=True,
help="MQTT URL"
)

return parser.parse_args()


def main():
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 0)

args = parse_args()

logger = setup_logging(args)

setup_mqtt(args, logger)

App(args).run()


if __name__ == "__main__":
main()
@@ -0,0 +1,36 @@
"""MQTT"""


from __future__ import print_function


from paho.mqtt.client import Client

from circuits import Event, Component


from .utils import parse_bind


class mqtt(Event):
"""mqtt Event"""


class MQTT(Component):

def init(self, url):
self.url = url

host, port = parse_bind(self.url)

self.client = Client()
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message

self.client.connect(host, port)

def _on_connect(self, client, userdata, flags, rc):
print("Connected with result code {0}".format(rc))

def _on_message(self, client, userdata, msg):
print("{0} {1}".format(msg.topic, msg.payload))
@@ -0,0 +1,68 @@
"""Plugin
Subclass :class:`Plugin` to create broker plugins with standarized CLI Options and API.
"""


from __future__ import print_function


from os import environ
from logging import getLogger
from inspect import getmodule
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser


from circuits import Component, Debugger


from .utils import parse_bind


def parse_args(parse=True, description=None):
parser = ArgumentParser(
description=(description or ""),
formatter_class=ArgumentDefaultsHelpFormatter
)

parser.add_argument(
"-b", "--bind", action="store", dest="bind", metavar="INT", type=str,
default=environ.get("BIND", "0.0.0.0:1338"),
help="Interface and Port to Bind to"
)

parser.add_argument(
"-d", "--debug", action="store_true", dest="debug",
default=environ.get("DEBUG", False),
help="Enable Debug Mode"
)

parser.add_argument(
"-u", "--url", action="store", dest="url", metavar="URL", type=str,
default=environ.get("URL", environ.get("BROKER_PORT", "udp://127.0.0.1:1338")),
help="broker Daemon URL"
)

return parser.parse_args() if parse else parser


class Plugin(Component):

def init(self, parse_args_cb=None):
# Get description from the first line of the plugin's __doc__
description = getattr(getmodule(self), "__doc__", "")

# Allow ArgumentsParser to be extended.
if parse_args_cb is not None:
self.args = parse_args_cb(parse_args(False, description)).parse_args()
else:
self.args = parse_args(description=description)

self.bind = parse_bind(self.args.bind)
self.url = parse_bind(self.args.url)

self.logger = getLogger(__name__)

def started(self, *args):
if self.args.debug:
Debugger().register(self)
@@ -0,0 +1,28 @@
"""Utilities"""


from time import sleep
from socket import AF_INET, SOCK_STREAM, socket


def parse_bind(s, default_port=1338):
# XXX: We ignore the protocol for now
if "://" in s:
_, s = s.split("://", 1)

if ":" in s:
address, port = s.split(":", 1)
port = int(port)
else:
address, port = s, default_port

return address, port


def waitfor(address, port, timeout=10):
sock = socket(AF_INET, SOCK_STREAM)
counter = timeout
while not sock.connect_ex((address, port)) == 0 and counter:
sleep(1)
counter -= 1
return counter
@@ -0,0 +1,11 @@
"""Version Module
So we only have to maintain version information in one place!
"""

version_info = (0, 0, 1, "dev") # (major, minor, patch, dev?)
version = (
".".join(map(str, version_info))
if version_info[-1] != "dev"
else "dev"
)
@@ -0,0 +1,5 @@
paho-mqtt

# circuits==3.1.0
# Development version of circuits
-e git+https://github.com/circuits/circuits.git#egg=circuits

0 comments on commit e46af3f

Please sign in to comment.
You can’t perform that action at this time.