Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
  • 2 commits
  • 4 files changed
  • 0 commit comments
  • 1 contributor
Showing with 113 additions and 10 deletions.
  1. +1 −1 shaveet/api.py
  2. +3 −0 shaveet/config.py
  3. +28 −8 shaveet/lookup.py
  4. +81 −1 shaveet/utils.py
View
2 shaveet/api.py
@@ -127,7 +127,7 @@ def get_client_channels(client_id):
return get_client(client_id).get_subscribed_channels()
def kill_client(client_id):
- return discard_client(get_client(client_id))
+ return discard_client(get_client(client_id),True)
#this is the wsgi application entry point
api_wsgi = wsgi_jsonrpc.WSGIJSONRPCApplication(methods=[subscribe,subscribe_many,unsubscribe,unsubscribe_many,unsubscribe_all_channel,new_message,
View
3 shaveet/config.py
@@ -6,6 +6,9 @@
MAX_IDLE_CLIENT = 10#Seconds
MIN_ALIVE_TIME = 60
MAX_CLIENTS_GC = 1000
+MAX_CLIENTS_RECONNECT = 100000#Amount of clients to keep for reconnection
+MAX_CLIENT_RECONNECT_IDLE = 300#Seconds
CLIENT_GC_INTERVAL = 10
+
IPS = ['127.0.0.1']
LOG_PATH = os.path.expanduser('~/shaveet.log')
View
36 shaveet/lookup.py
@@ -1,13 +1,16 @@
#std
+import logging
from time import time
#gevent
from gevent.event import Event
#shaveet
-from shaveet.utils import guid
+from shaveet.utils import guid,LRU
from shaveet.consts import SYSTEM_ID,ADMIN_CHANNEL
-from shaveet.config import MAX_MESSAGES_PER_CHANNEL,MAX_IDLE_CLIENT,COMET_TIMEOUT,MIN_ALIVE_TIME
+from shaveet.config import MAX_MESSAGES_PER_CHANNEL,MAX_IDLE_CLIENT,COMET_TIMEOUT,MIN_ALIVE_TIME,MAX_CLIENTS_RECONNECT
import shaveet#for api
+logger = logging.getLogger("shaveet.gc")
+
class Client(object):
"""
Represents a client in shaveet,a client is identified by string called id and is unique per client.
@@ -31,11 +34,14 @@ def touch(self):
def get_subscribed_channels(self):
return self.channels.keys()
- def add_channel(self,channel_name):
+ def add_channel(self,channel_name,cursor = False):
+ existed = channel_exist(channel_name)
chn = get_channel(channel_name)
if not channel_name in self.channels:
- #set to chn.id so we don't get old messages
- self.channels[channel_name] = chn.id - 1
+ if cursor and existed:#in case of reconnection set to old position
+ self.channels[channel_name] = cursor
+ else:#set to chn.id so we don't get old messages
+ self.channels[channel_name] = chn.id - 1
chn.add_client(self.id)
#notifies that this client has a new channel
self.channels_event.set()
@@ -74,6 +80,8 @@ def remove_from_channels(self):
_channels = {}
#client id -> Client
_clients = {}
+#client id -> Client
+_dead_clients = LRU(MAX_CLIENTS_RECONNECT)
###############
# client #
@@ -96,15 +104,27 @@ def client_exists(client_id):
def all_clients():
return _clients.copy()
-def discard_client(client):
+def discard_client(client,kill=False):
if client_exists(client.id):
+ old_channels = client.channels.copy()
client.remove_from_channels()
del _clients[client.id]
- del client
+ if kill:
+ del client
+ else:
+ _dead_clients[client.id] = (client,old_channels)
+
def get_client_with_key(client_id_key):
- print client_id_key
client_id,key = client_id_key.split("_!!_")
+ if client_id in _dead_clients:
+ #resurrect the client :)
+ client,old_channels = _dead_clients[client_id]
+ _clients[client_id] = client
+ for channel_name,cursor in old_channels.iteritems():
+ client.add_channel(channel_name,cursor)
+ del _dead_clients[client_id]
+ logger.info("%s came back to life!",client_id)
client = _clients[client_id]
if client.key != key:
raise KeyError("Wrong key for client")
View
82 shaveet/utils.py
@@ -40,4 +40,84 @@ def __call__(self,env,start_response):
return self.app(env,start_response)
else:
start_response('404 Not Found',[('Content-Type', 'text/html')])
- return ['Nothing here buddy']
+ return ['Nothing here buddy']
+
+
+class Node(object):
+ __slots__ = ['prev', 'next', 'me']
+ def __init__(self, prev, me):
+ self.prev = prev
+ self.me = me
+ self.next = None
+
+class LRU:
+ """
+ Implementation of a length-limited O(1) LRU queue.
+ Built for and used by PyPE:
+ http://pype.sourceforge.net
+ Copyright 2003 Josiah Carlson.
+ """
+ def __init__(self, count, pairs=[]):
+ self.count = max(count, 1)
+ self.d = {}
+ self.first = None
+ self.last = None
+ for key, value in pairs:
+ self[key] = value
+ def __contains__(self, obj):
+ return obj in self.d
+ def __getitem__(self, obj):
+ a = self.d[obj].me
+ self[a[0]] = a[1]
+ return a[1]
+ def __setitem__(self, obj, val):
+ if obj in self.d:
+ del self[obj]
+ nobj = Node(self.last, (obj, val))
+ if self.first is None:
+ self.first = nobj
+ if self.last:
+ self.last.next = nobj
+ self.last = nobj
+ self.d[obj] = nobj
+ if len(self.d) > self.count:
+ if self.first == self.last:
+ self.first = None
+ self.last = None
+ return
+ a = self.first
+ a.next.prev = None
+ self.first = a.next
+ a.next = None
+ del self.d[a.me[0]]
+ del a
+ def __delitem__(self, obj):
+ nobj = self.d[obj]
+ if nobj.prev:
+ nobj.prev.next = nobj.next
+ else:
+ self.first = nobj.next
+ if nobj.next:
+ nobj.next.prev = nobj.prev
+ else:
+ self.last = nobj.prev
+ del self.d[obj]
+ def __iter__(self):
+ cur = self.first
+ while cur != None:
+ cur2 = cur.next
+ yield cur.me[1]
+ cur = cur2
+ def iteritems(self):
+ cur = self.first
+ while cur != None:
+ cur2 = cur.next
+ yield cur.me
+ cur = cur2
+ def iterkeys(self):
+ return iter(self.d)
+ def itervalues(self):
+ for i,j in self.iteritems():
+ yield j
+ def keys(self):
+ return self.d.keys()

No commit comments for this range

Something went wrong with that request. Please try again.