Skip to content

Commit

Permalink
tornado websocket server prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
nemesisdesign committed Aug 18, 2013
1 parent ca0cbd8 commit 26373a3
Show file tree
Hide file tree
Showing 21 changed files with 550 additions and 5 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@ settings.py
python/
projects/ninux/ninux/media/nodes/
projects/ninux/ninux/media/external/
projects/ninux/ninux/media/status-icons/
projects/georate/georate/media/nodes/
projects/georate/georate/media/external/
projects/georate/georate/media/status-icons/
debug.log
*~
._*
startshell
activate
runserver
TODO.txt
nodeshot.websockets.private
nodeshot.websockets.public
celerybeat.pid
celerybeat-schedule
runtests
coverage
load_fixtures.sh
7 changes: 2 additions & 5 deletions nodeshot/core/nodes/urls.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from django.conf.urls import patterns, include, url
from rest_framework.urlpatterns import format_suffix_patterns
from django.conf.urls import patterns, url


urlpatterns = patterns('nodeshot.core.nodes.views',
Expand All @@ -13,6 +12,4 @@

# status
url(r'^/status/$', 'status_list', name='api_status_list'),
)

#urlpatterns = format_suffix_patterns(urlpatterns)
)
7 changes: 7 additions & 0 deletions nodeshot/core/websockets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.conf import settings

PUBLIC_PIPE = settings.NODESHOT['WEBSOCKETS']['PUBLIC_PIPE']
PRIVATE_PIPE = settings.NODESHOT['WEBSOCKETS']['PRIVATE_PIPE']
ADDRESS = settings.NODESHOT['WEBSOCKETS'].get('LISTENING_ADDRESS', 8080)
PORT = settings.NODESHOT['WEBSOCKETS'].get('LISTENING_PORT', 8080)
DOMAIN = settings.NODESHOT['WEBSOCKETS']['DOMAIN']
106 changes: 106 additions & 0 deletions nodeshot/core/websockets/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import uuid
import tornado.websocket

from . import PUBLIC_PIPE # contained in __init__.py


class WebSocketHandler(tornado.websocket.WebSocketHandler):
"""
simple websocket server for bidirectional communication between client and server
"""

# public means non authenticated
# private means authenticated
channels = {
'public': {},
'private': {}
}

def send_message(self, *args):
""" alias to write_message """
self.write_message(*args)

def add_client(self, user_id=None):
"""
Adds current instance to public or private channel.
If user_id is specified it will be added to the private channel,
If user_id is not specified it will be added to the public one instead.
"""
if user_id is None:
# generate a random uuid if it's an unauthenticated client
self.channel = 'public'
user_id = uuid.uuid1().hex
else:
self.channel = 'private'

self.id = user_id
WebSocketHandler.channels[self.channel][self.id] = self
print 'Client connected to the %s channel.' % self.channel

def remove_client(self):
""" removes a client """
del WebSocketHandler.channels[self.channel][self.id]

@classmethod
def broadcast(cls, message):
""" broadcast message to all connected clients """
clients = cls.get_clients()
# loop over every client and send message
for id, client in clients.iteritems():
client.send_message(message)

@classmethod
def send_private_message(self, user_id, message):
"""
Send a message to a specific client.
Returns True if successful, False otherwise
"""
try:
client = self.channels['private'][user_id]
except KeyError:
print 'client with id %s not found' % user_id
return False

client.send_message(message)
print 'message sent to client #%s' % user_id
return True

@classmethod
def get_clients(self):
""" return a merge of public and private clients """
public = self.channels['public']
private = self.channels['private']
return dict(public.items() + private.items())

def open(self):
""" method which is called every time a new client connects """
print 'Connection opened.'

# retrieve user_id if specified
user_id = self.get_argument("user_id", None)
# add client to list of connected clients
self.add_client(user_id)
# welcome message
self.send_message("Welcome to nodeshot websocket server.")
# new client connected message
client_count = len(self.get_clients().keys())
new_client_message = 'New client connected, now we have %d %s!' % (client_count, 'client' if client_count <= 1 else 'clients')
# broadcast new client connected message to all connected clients
self.broadcast(new_client_message)

print WebSocketHandler.channels['private']

def on_message(self, message):
""" method which is called every time the server gets a message from a client """
if message == "help":
self.send_message("Need help, huh?")
print 'Message received: \'%s\'' % message

def on_close(self):
""" method which is called every time a client disconnects """
print 'Connection closed.'
self.remove_client()

client_count = len(self.get_clients().keys())
new_client_message = '1 client disconnected, now we have %d %s!' % (client_count, 'client' if client_count <= 1 else 'clients')
self.broadcast(new_client_message)
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings

from nodeshot.core.websockets.server import start as start_server


class Command(BaseCommand):
help = "Start Tornado WebSocket Server"

def handle(self, *args, **options):
""" Go baby go! """
start_server()
10 changes: 10 additions & 0 deletions nodeshot/core/websockets/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
register websocket signals
"""

from django.conf import settings
from importlib import import_module


for registrar in settings.NODESHOT['WEBSOCKETS']['REGISTRARS']:
import_module(registrar)
7 changes: 7 additions & 0 deletions nodeshot/core/websockets/registrars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Websocket Registrars are a flexible way to broadcast websocket messages.
To customize the behaviour of the websocket server, simply copy this file, edit it
according to your needs, place it on your python path and add its path to
settings.NODESHOT['WEBSOCKETS']['REGISTRARS']
"""
45 changes: 45 additions & 0 deletions nodeshot/core/websockets/registrars/nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver

from nodeshot.core.nodes.signals import node_status_changed
from nodeshot.core.nodes.models import Node

from ..tasks import send_message


# ------ NODE CREATED ------ #

@receiver(post_save, sender=Node)
def node_created_handler(sender, **kwargs):
if kwargs['created']:
obj = kwargs['instance']
message = 'node "%s" has been added' % obj.name
send_message.delay(message)

# ------ NODE STATUS CHANGED ------ #

@receiver(node_status_changed)
def node_status_changed_handler(**kwargs):
obj = kwargs['instance']
obj.old_status = kwargs['old_status'].name
obj.new_status = kwargs['new_status'].name
message = 'node "%s" changed its status from "%s" to "%s"' % (obj.name, obj.old_status, obj.new_status)
send_message.delay(message)


# ------ NODE DELETED ------ #

@receiver(pre_delete, sender=Node)
def node_deleted_handler(sender, **kwargs):
obj = kwargs['instance']
message = 'node "%s" has been deleted' % obj.name
send_message.delay(message)


# ------ DISCONNECT UTILITY ------ #

def disconnect():
""" disconnect signals """
post_save.disconnect(node_created_handler, sender=Node)
node_status_changed.disconnect(node_status_changed_handler)
pre_delete.disconnect(node_deleted_handler, sender=Node)
30 changes: 30 additions & 0 deletions nodeshot/core/websockets/registrars/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import simplejson as json

from django.db.models.signals import post_save
from django.dispatch import receiver
from django.core.urlresolvers import reverse

from nodeshot.community.notifications.models import Notification
from ..tasks import send_message


# ------ NEW NOTIFICATIONS ------ #

@receiver(post_save, sender=Notification)
def new_notification_handler(sender, **kwargs):
if kwargs['created']:
obj = kwargs['instance']
message = {
'user_id': str(obj.to_user.id),
'model': 'notification',
'type': obj.type,
'url': reverse('api_notification_detail', args=[obj.id])
}
send_message(json.dumps(message), pipe='private')


# ------ DISCONNECT UTILITY ------ #

def disconnect():
""" disconnect signals """
post_save.disconnect(new_notification_handler, sender=Notification)
91 changes: 91 additions & 0 deletions nodeshot/core/websockets/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
import time
import simplejson as json
from threading import Thread

import tornado.web
import tornado.ioloop

from .handlers import WebSocketHandler
from . import DOMAIN, ADDRESS, PORT, PUBLIC_PIPE, PRIVATE_PIPE # contained in __init__.py


application = tornado.web.Application([
(r'/', WebSocketHandler),
])


def public_broadcaster():
"""
Thread which runs in parallel and constantly checks for new messages
in the public pipe and broadcasts them publicly to all connected clients.
"""
while __websocket_server_running__:
pipein = open(PUBLIC_PIPE, 'r')
line = pipein.readline().replace('\n', '').replace('\r', '')
if line != '':
WebSocketHandler.broadcast(line)
print line

remaining_lines = pipein.read()
pipein.close()
pipeout = open(PUBLIC_PIPE, 'w')
pipeout.write(remaining_lines)
pipeout.close()
else:
pipein.close()

time.sleep(0.05)

public_broadcaster_thread = Thread(target=public_broadcaster, args=[])
public_broadcaster_thread.deamon = True


def private_messenger():
"""
Thread which runs in parallel and constantly checks for new messages
in the private pipe and sends them to the specific client.
If client is not connected the message is discarded.
"""
while __websocket_server_running__:
pipein = open(PRIVATE_PIPE, 'r')
line = pipein.readline().replace('\n', '').replace('\r', '')
if line != '':
message = json.loads(line)
WebSocketHandler.send_private_message(user_id=message['user_id'],
message=message)
print line

remaining_lines = pipein.read()
pipein.close()
pipeout = open(PRIVATE_PIPE, 'w')
pipeout.write(remaining_lines)
pipeout.close()
else:
pipein.close()

time.sleep(0.05)

private_messenger_thread = Thread(target=private_messenger, args=[])
private_messenger_thread.deamon = True


def start():
global __websocket_server_running__
__websocket_server_running__ = True

application.listen(PORT, address=ADDRESS)
websocktserver = tornado.ioloop.IOLoop.instance()

try:
print "\nStarted Tornado Wesocket Server at ws://%s:%s\n" % (ADDRESS, PORT)

public_broadcaster_thread.start()
private_messenger_thread.start()
websocktserver.start()
# on exit
except (KeyboardInterrupt, SystemExit):
__websocket_server_running__ = False
websocktserver.stop()

print "\nStopped Tornado Wesocket Server\n"
22 changes: 22 additions & 0 deletions nodeshot/core/websockets/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os
from django.conf import settings
from celery import task


@task
def send_message(message, pipe='public'):
"""
writes message to pipe
"""
if pipe not in ['public', 'private']:
raise ArgumentError('pipe argument can be only "public" or "private"')
else:
pipe = pipe.upper()

pipe_path = settings.NODESHOT['WEBSOCKETS']['%s_PIPE' % pipe]

# create file if it doesn't exist, append contents
pipeout = open(pipe_path, 'a')

pipeout.write('%s\n' % message)
pipeout.close()
Loading

0 comments on commit 26373a3

Please sign in to comment.