Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

initial commit

  • Loading branch information...
commit c80337171d4175482c1542fb59bffa9a0c2ec4d9 0 parents
@arnehilmann arnehilmann authored
0  README.md
No changes.
59 build.py
@@ -0,0 +1,59 @@
+from pythonbuilder.core import use_plugin, init, Author
+
+use_plugin("python.core")
+#use_plugin("python.unittest")
+#use_plugin("python.integrationtest")
+#use_plugin("python.coverage")
+#use_plugin("python.pychecker")
+#use_plugin("python.pymetrics")
+use_plugin("python.pylint")
+use_plugin("python.distutils")
+use_plugin("python.pydev")
+
+use_plugin("copy_resources")
+#use_plugin("filter_resources")
+
+default_task = ["analyze", "publish"]
+
+version = "1.0.3"
+summary = "Yet Another Deployment Tool - The BroadcastServer Part"
+description = '''Yet Another Deployment Tool - The BroadcastServer Part
+- provides channels for publish/subscribe
+- handles messages form yadtbroadcast-client
+- caches state information for newly connecting clients
+
+for more documentation, visit http://code.google.com/p/yadt/
+'''
+authors = [Author("Arne Hilmann", "arne.hilmann@gmail.com")]
+
+requires = "python >= 2.5 python-twisted >= 11.0.0 autobahn >= 0.4.10"
+
+url = "http://code.google.com/p/yadt"
+license = "GNU GPL v3"
+
+@init
+def set_properties (project):
+ project.depends_on("Twisted")
+ project.depends_on("autobahn")
+
+ project.set_property("coverage_break_build", False)
+ project.set_property("pychecker_break_build", False)
+
+ project.get_property("distutils_commands").append("bdist_rpm")
+ project.set_property("copy_resources_target", "$dir_dist")
+ project.get_property("copy_resources_glob").append("setup.cfg")
+ project.set_property('dir_dist_scripts', 'scripts')
+
+ #project.get_property("distutils_commands").append("bdist_egg")
+ project.set_property("distutils_classifiers", [
+ 'Development Status :: 4 - Beta',
+ 'Environment :: Console',
+ 'Intended Audience :: Developers',
+ 'Intended Audience :: System Administrators',
+ 'License :: OSI Approved :: GNU General Public License (GPL)',
+ 'Programming Language :: Python',
+ 'Topic :: System :: Networking',
+ 'Topic :: System :: Software Distribution',
+ 'Topic :: System :: Systems Administration'
+ ])
+
4 setup.cfg
@@ -0,0 +1,4 @@
+[bdist_rpm]
+packager = Arne Hilmann <arne.hilmann@gmail.com>
+requires = python >= 2.6 python-twisted >= 12 python-autobahn >= 0.4.10 yadtbroadcast-server-config yadtbroadcast-server-docroot
+release = 1%{?dist}
97 src/main/python/yadtbroadcastserver/__init__.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+import sys
+import re
+import json
+import yaml
+import os
+import exceptions
+import socket
+
+from twisted.internet import reactor
+from twisted.python import log
+from twisted.web import static, server
+from twisted.application import internet, service
+
+from autobahn.wamp import WampServerFactory, WampProtocol, WampServerProtocol
+
+from broadcastserverconfig import *
+
+class BroadcastServerProtocol(WampServerProtocol):
+ cache = {}
+ cache_dirty = False
+
+ def onSessionOpen(self):
+ #for v in sorted(vars(self)):
+ #print '%s\t%s' % (v, getattr(self, v))
+ log.msg('new session from %s:%s' % (self.peer.host, self.peer.port))
+ self.registerForPubSub('', True)
+
+ def connectionLost(self, reason):
+ text = getattr(reason, 'value', reason)
+ log.msg('lost session from %s:%s: %s' % (self.peer.host, self.peer.port, text))
+ WampServerProtocol.connectionLost(self, reason)
+
+ def onMessage(self, msg, binary):
+ on_subscribe_for_topic = None
+ if not binary:
+ try:
+ obj = json.loads(msg)
+ if type(obj) == list:
+ if obj[0] == WampProtocol.MESSAGE_TYPEID_SUBSCRIBE:
+ topicUri = self.prefixes.resolveOrPass(obj[1])
+ on_subscribe_for_topic = topicUri
+ elif obj[0] == WampProtocol.MESSAGE_TYPEID_PUBLISH:
+ topicUri = self.prefixes.resolveOrPass(obj[1])
+ payload = obj[2]
+ self.update_cache(topicUri, payload, self.cache)
+ except Exception, e:
+ log.msg(e)
+ pass
+ result = WampServerProtocol.onMessage(self, msg, binary)
+ if self.cache.get(on_subscribe_for_topic):
+ log.msg("sending initial full_update for %s" % on_subscribe_for_topic)
+ self.dispatch(on_subscribe_for_topic, self.cache[on_subscribe_for_topic], eligible=[self])
+ return result
+
+
+ def update_cache(self, topicUri, payload, cache):
+ if payload['id'] == "full-update":
+ log.msg("caching full update for %s" % topicUri)
+ cache[topicUri] = payload
+ BroadcastServerProtocol.cache_dirty = True
+
+ if payload['id'] == "service-change":
+ cached_target = cache.get(topicUri)
+ if cached_target:
+ for changed in payload['payload']:
+ for hosts in cached_target['payload']:
+ for host in hosts:
+ for service in host['services']:
+ if service['uri'] == changed['uri']:
+ service['state'] = changed['state']
+ log.msg('new state of %(uri)s: %(state)s' % changed)
+ BroadcastServerProtocol.cache_dirty = True
+ break
+
+ @classmethod
+ def store_cache(cls):
+ #log.msg('checking cache dirty? %s' % str(cls.cache_dirty))
+ if cls.cache_dirty:
+ log.msg('saving cache on disk')
+ f = open(CACHE_FILE, 'w')
+ json.dump(cls.cache, f)
+ f.close()
+ cls.cache_dirty = False
+ reactor.callLater(STORE_CACHE_AFTER_SECONDS, cls.store_cache)
+
+ @classmethod
+ def init_cache(cls):
+ try:
+ f = open(CACHE_FILE)
+ cls.cache = json.load(f)
+ f.close()
+ except Exception, e:
+ log.msg(e)
+ cls.cache = {}
+ reactor.callLater(STORE_CACHE_AFTER_SECONDS, cls.store_cache)
+
63 src/main/scripts/yadtbroadcast-server.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+import sys
+import re
+import json
+import yaml
+import os
+import exceptions
+import socket
+
+from twisted.internet import reactor
+from twisted.python import log
+from twisted.web import static, server
+from twisted.python.logfile import LogFile
+
+from autobahn.wamp import WampServerFactory
+
+
+sys.path.append('/etc/yadtbroadcast-server')
+from broadcastserverconfig import *
+
+log.startLogging(LogFile.fromFullPath(LOG_FILE))
+
+import yadtbroadcastserver
+try:
+ os.makedirs(os.path.dirname(CACHE_FILE))
+except exceptions.OSError, e:
+ if e.errno != 17:
+ log.err()
+try:
+ os.makedirs(os.path.dirname(LOG_FILE))
+except exceptions.OSError, e:
+ if e.errno != 17:
+ log.err()
+
+# TODO refactor: use util method in ws lib for url creation
+host = socket.gethostbyaddr(socket.gethostname())[0]
+uri = 'ws://%s:%s/' % (host, WS_PORT)
+factory = WampServerFactory(uri, debugWamp=False)
+factory.protocol = yadtbroadcastserver.BroadcastServerProtocol
+yadtbroadcastserver.BroadcastServerProtocol.init_cache()
+reactor.listenTCP(WS_PORT, factory)
+log.msg('ws listens on port %s' % WS_PORT)
+
+docroot = static.File(DOCROOT_DIR)
+try:
+ for name, path in LOGS.iteritems():
+ if os.path.exists(path):
+ log.msg('adding path %s under /%s' % (path, name))
+ docroot.putChild(name, static.File(path, 'text/plain'))
+ else:
+ log.msg('ignoring path %s, because it does not exist.' % path)
+except:
+ pass
+reactor.listenTCP(HTTP_PORT, server.Site(docroot))
+log.msg('http listens on port %s' % HTTP_PORT)
+
+
+reactor.run()
+
+print 'shutting down server'
+yadtbroadcastserver.BroadcastServerProtocol.store_cache()
+
+print 'done.'
Please sign in to comment.
Something went wrong with that request. Please try again.