Skip to content

Commit

Permalink
Python moderator work and general clean up #203
Browse files Browse the repository at this point in the history
  • Loading branch information
ccotter committed Nov 13, 2012
1 parent 6385951 commit 84568cb
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 101 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -102,18 +102,23 @@ public void configureChannel(ConfigurableServerChannel channel) {
sync = server.getChannel(this.syncEngineChannel); sync = server.getChannel(this.syncEngineChannel);
sync.addListener(this); sync.addListener(this);


this.sessionModerator = SessionModerator.getInstance(this, (String)config.get("sessionModerator"), confKey); this.sessionModerator = SessionModerator.getInstance(this,
(String)config.get("sessionModerator"), confKey);
if (null == this.sessionModerator) { if (null == this.sessionModerator) {
config.put("moderatorIsUpdater", false); config.put("moderatorIsUpdater", false);
/* Perhaps config.get("sessionModerator") had an exception or didn't exist, /* Perhaps config.get("sessionModerator") had an exception or
so either we try to create default implementation of moderator, or throw an exception. */ * didn't exist, so either we try to create default implementation
log.severe("SessionModerator.getInstance(" + config.get("sessionModerator") + * of moderator, or throw an exception. */
") failed, reverting to trying to create default implementation."); log.severe("SessionModerator.getInstance(" +
this.sessionModerator = SessionModerator.getInstance(this, null, confKey); config.get("sessionModerator") + ") failed, reverting to " +
"trying to create default implementation.");
this.sessionModerator = SessionModerator.getInstance(this,
null, confKey);
if (null == this.sessionModerator) { if (null == this.sessionModerator) {
throw new CowebException("Create SessionModerator", ""); throw new CowebException("Create SessionModerator", "");
} }
log.severe("SessionModerator created default implementation, but moderator can no longer be updater."); log.severe("SessionModerator created default implementation, but " +
"moderator can no longer be updater.");
} }


// create the late join handler. clients will be updaters by default. // create the late join handler. clients will be updaters by default.
Expand Down
2 changes: 1 addition & 1 deletion servers/python/coweb/container.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, options):
# allow easy override of default settings without busting manager # allow easy override of default settings without busting manager
# creation # creation
self.on_configure() self.on_configure()

# adjust all paths to make them absolute relative to container loc now # adjust all paths to make them absolute relative to container loc now
self.httpStaticPath = self.get_absolute_path(self.httpStaticPath) self.httpStaticPath = self.get_absolute_path(self.httpStaticPath)
for i, path in enumerate(self.cowebBotLocalPaths): for i, path in enumerate(self.cowebBotLocalPaths):
Expand Down
38 changes: 19 additions & 19 deletions servers/python/coweb/moderator.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@
class SessionModerator: class SessionModerator:
def __init__(self): def __init__(self):
print("hello from moderator") print("hello from moderator")
def canClientJoinSession(self): def canClientJoinSession(self, client, message):
raise NotImplementedError() raise NotImplementedError()
def canClientMakeServiceRequest(self): def canClientMakeServiceRequest(self, svcName, client, botMessage):
raise NotImplementedError() raise NotImplementedError()
def canClientSubscribeService(self): def getLateJoinState(self):
raise NotImplementedError()
def canClientSubscribeService(self, svcName, client, message):
raise NotImplementedError() raise NotImplementedError()
def onClientJoinSession(self): def onClientJoinSession(self, client, message):
raise NotImplementedError() raise NotImplementedError()
def onClientLeaveSession(self): def onClientLeaveSession(self, client):
raise NotImplementedError() raise NotImplementedError()
def onServiceResponse(self): def onServiceResponse(self, svcName, data, error, isPublic):
raise NotImplementedError() raise NotImplementedError()
def onSessionEnd(self): def onSessionEnd(self):
raise NotImplementedError() raise NotImplementedError()
def onSessionReady(self): def onSessionReady(self):
raise NotImplementedError() raise NotImplementedError()
def onSync(self): def onSync(self, client, data):
raise NotImplementedError()
def getLateJoinState(self):
raise NotImplementedError() raise NotImplementedError()


@staticmethod @staticmethod
Expand All @@ -38,25 +38,25 @@ def getInstance(klass, key):
return moderator return moderator


class DefaultSessionModerator(SessionModerator): class DefaultSessionModerator(SessionModerator):
def canClientJoinSession(self): def canClientJoinSession(self, client, message):
return True return True
def canClientMakeServiceRequest(self): def canClientMakeServiceRequest(self, svcName, client, botMessage):
return True return True
def canClientSubscribeService(self): def getLateJoinState(self):
print("in late get state")
return {}
def canClientSubscribeService(self, svcName, client, message):
return True return True
def onClientJoinSession(self): def onClientJoinSession(self, client, message):
pass pass
def onClientLeaveSession(self): def onClientLeaveSession(self, client):
pass pass
def onServiceResponse(self): def onServiceResponse(self, svcName, data, error, isPublic):
pass pass
def onSessionEnd(self): def onSessionEnd(self):
pass pass
def onSessionReady(self): def onSessionReady(self):
pass pass
def onSync(self): def onSync(self, client, data):
pass pass
def getLateJoinState(self):
print("in late get state")
return {}


1 change: 1 addition & 0 deletions servers/python/coweb/session/__init__.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ def create_session(collab, *args, **kwargs):
'''Builds a cooperative or non-cooperative session.''' '''Builds a cooperative or non-cooperative session.'''
cls = CollabSession if collab else Session cls = CollabSession if collab else Session
return cls(*args, **kwargs) return cls(*args, **kwargs)

99 changes: 32 additions & 67 deletions servers/python/coweb/session/collab.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
import re import re
# coweb # coweb
from . import session from . import session
from .. import OEHandler
from ..moderator import SessionModerator
from coweb.session.late_join_handler import LateJoinHandler from coweb.session.late_join_handler import LateJoinHandler
from coweb.session.moderator_late_join_handler import ModeratorLateJoinHandler from coweb.session.moderator_late_join_handler import ModeratorLateJoinHandler


OEHandler = OEHandler.OEHandler getServiceNameFromChannel = session.getServiceNameFromChannel
session_sync_regex = re.compile("/session/([A-z0-9]+)/sync(.*)");

log = logging.getLogger('coweb.session') log = logging.getLogger('coweb.session')


class CollabSession(session.Session): class CollabSession(session.Session):
Expand All @@ -31,16 +27,6 @@ def __init__(self, *args, **kwargs):
self.collab = True self.collab = True
self._connectionClass = CollabSessionConnection self._connectionClass = CollabSessionConnection


# operation total order
self._opOrder = -1

# TODO make this optional
self._opengine = OEHandler(self, 0)

# Moderator?
self._moderator = SessionModerator.getInstance(
self._container.moderatorClass, self.key)

if self._container.moderatorIsUpdater: if self._container.moderatorIsUpdater:
self._lateJoinHandler = ModeratorLateJoinHandler(self) self._lateJoinHandler = ModeratorLateJoinHandler(self)
else: else:
Expand All @@ -51,14 +37,6 @@ def get_order(self):
self._opOrder += 1 self._opOrder += 1
return self._opOrder return self._opOrder


def sync_for_service(self, client, req):
'''
Forwards sync events to service bridge if at least one service has
permission for sync events enabled.
'''
if self._services.needsSync:
self._services.on_user_sync(client, req)

def on_purging_client(self, cid, client): def on_purging_client(self, cid, client):
'''Override to remove updater and end a session when none left.''' '''Override to remove updater and end a session when none left.'''
# notify all bots of unsubscribing user # notify all bots of unsubscribing user
Expand All @@ -75,66 +53,53 @@ class CollabSessionConnection(session.SessionConnection):
'''Connection for collaborative sessions.''' '''Connection for collaborative sessions.'''
def on_publish(self, cl, req, res): def on_publish(self, cl, req, res):
'''Override to handle updater and sync logic.''' '''Override to handle updater and sync logic.'''
manager = self._manager
channel = req['channel'] channel = req['channel']
if channel == '/service/session/updater': if channel == '/service/session/updater':
# handle updater response # handle updater response
data = req.get('data', None) data = req.get('data', None)
try: try:
self._manager.onUpdaterSendState(cl, data) manager.onUpdaterSendState(cl, data)
except Exception: except Exception:
# bad updater, disconnect and assign a new one # bad updater, disconnect and assign a new one
self._manager.remove_bad_client(cl) manager.remove_bad_client(cl)
return return
else: elif channel == manager.syncAppChannel:
matches = session_sync_regex.match(channel); manager._lateJoinHandler.clearLastState()
if matches: req['data']['siteId'] = cl.siteId
# handle sync events req['data']['order'] = manager.get_order()
try: # Push sync to op engine.
# put siteId on message sync = manager._opengine.syncInbound(req['data'])
req['data']['siteId'] = cl.siteId if sync:
self._manager._lateJoinHandler.ensureUpdater(cl) manager._moderator.onSync(cl, sync)
except (KeyError, AttributeError): elif channel == manager.engineSyncChannel:
# not allowed to publish syncs, disconnect manager._opengine.engineSyncInbound(req['data'])
self._manager.remove_bad_client(cl)
return
# last state no longer valid
self._manager._lateJoinHandler.clearLastState()
if '/app' == matches.group(2):
# App sync.
# Put total order sequence number on message
req['data']['order'] = self._manager.get_order()
# let manager deal with sync if forwarding it to services
self._manager.sync_for_service(cl, req)
if self._manager._opengine:
# Push sync to op engine.
self._manager._opengine.syncInbound(req['data'])
elif '/engine' == matches.group(2):
# Engine sync.
if self._manager._opengine:
self._manager._opengine.engineSyncInbound(req['data'])
# delegate all other handling to base class # delegate all other handling to base class
super(CollabSessionConnection, self).on_publish(cl, req, res) super(CollabSessionConnection, self).on_publish(cl, req, res)


def on_subscribe(self, cl, req, res): def on_subscribe(self, cl, req, res):
'''Override to handle late-joiner logic.''' '''Override to handle late-joiner logic.'''
manager = self._manager
sub = req['subscription'] sub = req['subscription']
didSub = True didSub = True
if sub.startswith('/service/bot'): pub = sub.startswith('/bot')
# handle private subscribe to bot if pub or sub.startswith('/service/bot'):
didSub = self._manager.subscribe_to_service(cl, req, res, False) # public subscribe to bot (/bot)
elif sub.startswith('/bot'): # handle private subscribe to bot (/service/bot)
# public subscribe to bot svcName = getServiceNameFromChannel(sub, pub)
didSub = self._manager.subscribe_to_service(cl, req, res, True) if manager._moderator.canClientSubscribeService(svcName, cl, req):
didSub = manager.subscribe_to_service(cl, req, res, pub)
elif sub == '/service/session/join/*': elif sub == '/service/session/join/*':
ext = req['ext'] if manager._moderator.canClientJoinSession(cl, req):
coweb = ext['coweb'] ext = req['ext']
updaterType = coweb['updaterType'] coweb = ext['coweb']
cl.updaterType = updaterType updaterType = coweb['updaterType']
self._manager._lateJoinHandler.onClientJoin(cl) cl.updaterType = updaterType
didSub = True if manager._lateJoinHandler.onClientJoin(cl):
manager._moderator.onSessionReady()
elif sub == '/service/session/updater': elif sub == '/service/session/updater':
self._manager._lateJoinHandler.addUpdater(cl) manager._lateJoinHandler.addUpdater(cl)
didSub = True manager._moderator.onClientJoinSession(cl, req)
if didSub: if didSub:
# don't run default handling if sub failed # don't run default handling if sub failed
super(CollabSessionConnection, self).on_subscribe(cl, req, res) super(CollabSessionConnection, self).on_subscribe(cl, req, res)
Expand Down
9 changes: 4 additions & 5 deletions servers/python/coweb/session/late_join_handler.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ def addUpdater(self, client, notify=True):
if notify: if notify:
self._sendRosterAvailable(client) self._sendRosterAvailable(client)


def ensureUpdater(self, client):
'''Ensures a client is an updater. Exception if not.'''
# just try to access to ensure
self._updaters[client.clientId]

def removeUpdater(self, client): def removeUpdater(self, client):
'''Removes a client as an updater.''' '''Removes a client as an updater.'''
# remove this client from the assigned siteids # remove this client from the assigned siteids
Expand Down Expand Up @@ -187,6 +182,7 @@ def _sendRosterUnavailable(self, client):
} }
self._session.publish(msg) self._session.publish(msg)


# Returns true iff this is the first client in the session.
def onClientJoin(self, client): def onClientJoin(self, client):
'''Queues a late-joiner to receive full state.''' '''Queues a late-joiner to receive full state.'''
clientId = client.clientId clientId = client.clientId
Expand All @@ -200,12 +196,14 @@ def onClientJoin(self, client):


roster = self._getRosterList(client) roster = self._getRosterList(client)


isFirst = False
sendState = False sendState = False
# First client in? # First client in?
if not len(self._updaters): if not len(self._updaters):
self.addUpdater(client, False) self.addUpdater(client, False)
data = [] data = []
sendState = True sendState = True
isFirst = True
elif self._lastState is None: elif self._lastState is None:
self.assignUpdater(client) self.assignUpdater(client)
else: else:
Expand All @@ -225,6 +223,7 @@ def onClientJoin(self, client):
'channel':'/service/session/join/state', 'channel':'/service/session/join/state',
'data': data 'data': data
}) })
return isFirst


def onUpdaterSendState(self, updater, data): def onUpdaterSendState(self, updater, data):
'''Sends full state to a late-joiner waiting for it.''' '''Sends full state to a late-joiner waiting for it.'''
Expand Down
3 changes: 3 additions & 0 deletions servers/python/coweb/session/moderator_late_join_handler.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, session):
super(ModeratorLateJoinHandler, self).__init__(session) super(ModeratorLateJoinHandler, self).__init__(session)
self._moderator = session._moderator self._moderator = session._moderator


# Returns true iff this is the first client in the session.
def onClientJoin(self, client): def onClientJoin(self, client):
'''Overrides to have the moderator immediatelty send late join state''' '''Overrides to have the moderator immediatelty send late join state'''
clientId = client.clientId clientId = client.clientId
Expand Down Expand Up @@ -41,6 +42,8 @@ def onClientJoin(self, client):


if 0 == self.getUpdaterCount(): if 0 == self.getUpdaterCount():
self.addUpdater(client, False) self.addUpdater(client, False)
return True
else: else:
self.addUpdater(client, True) self.addUpdater(client, True)
return False


33 changes: 31 additions & 2 deletions servers/python/coweb/session/session.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,9 +16,25 @@
# coweb # coweb
from .. import bayeux from .. import bayeux
from .. import service from .. import service
from .. import OEHandler
from ..moderator import SessionModerator


OEHandler = OEHandler.OEHandler
log = logging.getLogger('coweb.session') log = logging.getLogger('coweb.session')


def getServiceNameFromChannel(channel, pub):
# Public channels are of form /bot/<NAME>
# Private channels are of form /service/bot/<NAME>/(request|response)
parts = channel.split("/")
if pub:
if 3 == len(parts):
return parts[2]
else:
if 5 == len(parts) and \
("request" == parts[4] or "response" == parts[4]):
return parts[3];
return ""

class Session(bayeux.BayeuxManager): class Session(bayeux.BayeuxManager):
''' '''
Manages a session instance that supports services but no user cooperative Manages a session instance that supports services but no user cooperative
Expand All @@ -36,13 +52,23 @@ def __init__(self, container, key, cacheState, *args, **kwargs):
self.sessionId self.sessionId
self.rosterUnavailableChannel = '/session/%s/roster/unavailable' % \ self.rosterUnavailableChannel = '/session/%s/roster/unavailable' % \
self.sessionId self.sessionId
self.syncAppChannel = '/session/%s/sync/app' % self.sessionId
self.syncEngineChannel = '/session/%s/sync/engine' % self.sessionId


self._connectionClass = SessionConnection self._connectionClass = SessionConnection
self._container = container self._container = container
self._application = container.webApp self._application = container.webApp
# bridge between users and service bots # bridge between users and service bots
self._services = service.ServiceSessionBridge(container, self) self._services = service.ServiceSessionBridge(container, self)


# Operation total order, setup OP engine.
self._opOrder = -1
self._opengine = OEHandler(self, 0)

# Use moderator?
self._moderator = SessionModerator.getInstance(
self._container.moderatorClass, self.key)

def build_connection(self, handler): def build_connection(self, handler):
'''Override to build proper connection.''' '''Override to build proper connection.'''
return self._connectionClass(handler, self) return self._connectionClass(handler, self)
Expand Down Expand Up @@ -168,9 +194,12 @@ def on_publish(self, cl, req, res):
'''Overrides to handle bot requests.''' '''Overrides to handle bot requests.'''
ch = req['channel'] ch = req['channel']
if ch.startswith('/service/bot'): if ch.startswith('/service/bot'):
svcName = getServiceNameFromChannel(ch, False)
# private bot message # private bot message
if not self._manager.request_for_service(cl, req, res): if self._manager._moderator.canClientMakeServiceRequest(svcName,
return cl, req):
if not self._manager.request_for_service(cl, req, res):
return
elif not self._manager.collab: elif not self._manager.collab:
# disallow posting to any other channel by clients # disallow posting to any other channel by clients
res['error'] = '402:%s:not-allowed' % client.clientId res['error'] = '402:%s:not-allowed' % client.clientId
Expand Down

0 comments on commit 84568cb

Please sign in to comment.