Permalink
Browse files

First revision of greader plugin

- Implement system for plugin internal command issuing
- Rephrase feed adding to avoid canto-remote shell invocation
- Some minor improvements

The feed sync is pretty functional, however even without debug logging
the plugin is still crushing start-up performance and I'm still not
happy with that.

Signed-off-by: Jack Miller <jack@codezen.org>
  • Loading branch information...
1 parent 431c5e7 commit dfba58c4a2b93382ae188f442d2e67fe8e91d3dd @themoken committed Sep 28, 2012
Showing with 90 additions and 60 deletions.
  1. +44 −4 canto_next/canto_backend.py
  2. +11 −7 canto_next/hooks.py
  3. +2 −0 canto_next/server.py
  4. +33 −49 plugins/greader.py
@@ -178,6 +178,9 @@ def _reparse_config(self, originating_socket):
# Propagate config changes to watching sockets.
+ # On_config_change must be prepared to have originating_socket = None for internal requests that
+ # nonetheless must be propagated.
+
def on_config_change(self, change, originating_socket):
self._reparse_config(originating_socket)
@@ -237,6 +240,9 @@ def on_kill_socket(self, socket):
protection.unprotect((socket, "auto"))
self.check_dead_feeds()
+ def queue_internal(self, cb, func, args):
+ self.queue.put((cb, func, args))
+
# We need to be alerted on certain events, ensure
# we get notified about them.
@@ -247,6 +253,11 @@ def setup_hooks(self):
on_hook("tag_change", self.on_tag_change)
on_hook("kill_socket", self.on_kill_socket)
+ # For plugins
+ on_hook("set_configs", lambda x, y : self.queue_internal(x, self.in_setconfigs, y))
+ on_hook("del_configs", lambda x, y : self.queue_internal(x, self.in_delconfigs, y))
+ on_hook("get_configs", lambda x, y : self.queue_internal(x, self.in_configs, y))
+
# Return list of item tuples after global transforms have
# been performed on them.
@@ -415,15 +426,22 @@ def cmd_setattributes(self, socket, args):
# CONFIGS [ "top_sec", ... ] -> { "top_sec" : full_value }
- def cmd_configs(self, socket, args):
+ # Internal
+
+ def in_configs(self, args):
if args:
ret = {}
for topsec in args:
if topsec in self.conf.json:
ret[topsec] = self.conf.json[topsec]
else:
ret = self.conf.json
+ return ret
+
+ # External
+ def cmd_configs(self, socket, args):
+ ret = self.in_configs(args)
self.write(socket, "CONFIGS", ret)
# Please NOTE that SET and DEL do no locking, no revision tracking
@@ -448,15 +466,22 @@ def cmd_configs(self, socket, args):
# SETCONFIGS { "key" : "value", ...}
+ def in_setconfigs(self, args):
+ self.cmd_setconfigs(None, args)
+ return self.conf.json
+
def cmd_setconfigs(self, socket, args):
self.conf.merge(args.copy())
call_hook("config_change", [args, socket])
-
# DELCONFIGS { "key" : "DELETE", ...}
+ def in_delconfigs(self, args):
+ cmd_delconfigs(None, args)
+ return self.conf.json
+
def cmd_delconfigs(self, socket, args):
self.conf.delete(args.copy())
@@ -535,9 +560,24 @@ def run(self):
# =(
if self.alarmed:
- (socket, (cmd, args)) = self.queue.get(True, 0.1)
+ r = self.queue.get(True, 0.1)
+ else:
+ r = self.queue.get(True, 1)
+
+ log.debug("!!! %s" % (r,))
+
+ # 3-tuple = internal command
+ if len(r) == 3:
+ cb, func, args = r
+ r = func(args)
+ if cb:
+ cb(r)
+ continue
+
+ # 2-tuple = external command
else:
- (socket, (cmd, args)) = self.queue.get(True, 1)
+ socket, (cmd, args) = r
+
except queue.Empty:
pass
else:
View
@@ -7,12 +7,12 @@
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
-hooks = {}
+import traceback
+import logging
+
+log = logging.getLogger("HOOKS")
-# Since the inception of the work_done hook that's called from the main loop
-# (but doesn't usually have a lot of work to do), these hooks are logged at
-# level 8, which is sub-debug level. They can be seen if the daemon is run
-# with -vv, but most of the time will be extraneous.
+hooks = {}
def on_hook(hook, func):
if hook in hooks:
@@ -30,5 +30,9 @@ def call_hook(hook, args):
# List copy here so hooks can remove themselves
# without effecting our iteration.
- for func in hooks[hook][:]:
- func(*args)
+ try:
+ for func in hooks[hook][:]:
+ func(*args)
+ except:
+ log.error("Error calling hook %s (func: %s args: %s)" % (hook, func, args))
+ log.error(traceback.format_exc())
View
@@ -101,6 +101,8 @@ def accept_conn(self, conn):
# Write a (cmd, args) to a single connection.
def write(self, conn, cmd, args):
+ if not conn:
+ return None
return self.do_write(conn, cmd, args)
# Write a (cmd, args) to every connection.
View
@@ -1,6 +1,6 @@
# Google Reader Sync Plugin
# by Jack Miller
-# v1.0
+# v0.2
#
# If this is placed in the .canto-ng/plugins directory, along with a copy of
# the libgreader source ported to py3k and you set the USERNAME and PASSWORD
@@ -26,78 +26,60 @@
# about the daemon not taking a connection fast enough.
#
# TODO
-#
-# - Get subscription synchronization working (infra works when run outside of
-# plugin interface, haven't quite gotten the internal implementation right.
-# - Should probably stop using canto-remote
# - Speed
USERNAME="user@gmail.com"
PASSWORD="password"
# You shouldn't have to change anything beyond this line.
-from canto_next.feed import DaemonFeedPlugin
-from canto_next.hooks import on_hook, remove_hook
+from canto_next.feed import DaemonFeedPlugin, allfeeds
+from canto_next.hooks import call_hook, on_hook
from plugins.libgreader import GoogleReader, ClientAuthMethod
from threading import Lock
+import traceback
import subprocess
import logging
import sys
-import os
log = logging.getLogger("GREADER")
-sub_synced = False
-
-def get_reader_urls(reader):
- reader.buildSubscriptionList()
- return [ f.feedUrl for f in reader.getSubscriptionList() ]
-
-def get_canto_urls():
- listfeeds = subprocess.check_output(['canto-remote', 'listfeeds'])
- listfeeds = listfeeds.decode("UTF-8")
- return [ l for l in listfeeds.split('\n') if l.startswith('http') ]
-
-def add_reader_urls(reader, new_urls):
- for url in new_urls:
- log.info("Adding %s to Google Reader" % url)
- r = reader.subscribe("feed/" + url)
- if r:
- log.info("...OK")
- else:
- log.info("...FAILED!")
-
-def add_canto_urls(new_urls):
- for url in new_urls:
- subprocess.check_output(['canto-remote', 'addfeed', url])
-
def sync_subscriptions():
log.info("Syncing subscriptions with Google")
auth = ClientAuthMethod(USERNAME, PASSWORD)
reader = GoogleReader(auth)
+ reader.buildSubscriptionList()
+
+ gurls = [ (f.title, f.feedUrl) for f in reader.getSubscriptionList() ]
+ curls = [ f.URL for f in allfeeds.get_feeds() ]
+ names = [ f.name for f in allfeeds.get_feeds() ]
+
+ new_feeds = []
+ for gtitle, gurl in gurls[:]:
+ if gurl not in curls:
- if os.fork():
- gurls = get_reader_urls(reader)
- curls = get_canto_urls()
+ # Handle name collisions because we're not prepared to handle ERROR
+ # responses from config
- for gurl in gurls[:]:
- if gurl in curls:
- gurls.remove(gurl)
+ if gtitle in names:
+ offset = 2
+ while (gtitle + " (%d)" % offset) in names:
+ offset += 1
+ gtitle = gtitle + " (%d)" % offset
- for curl in curls[:]:
- if curl in gurls:
- curls.remove(curl)
+ attrs = { "url" : gurl, "name" : gtitle }
+ new_feeds.append(attrs)
+ names.append(gtitle)
- self.add_reader_urls(reader, curls)
- self.add_canto_urls(gurls)
- sys.exit(0)
+ call_hook("set_configs", [ None, { "feeds" : new_feeds }])
- remove_hook("serving", sync_subscriptions)
+ for curl in curls[:]:
+ if curl not in gurls:
+ reader.subscribe('feed/' + curl)
-#on_hook("serving", sync_subscriptions)
+on_hook("serving", sync_subscriptions)
auth = ClientAuthMethod(USERNAME, PASSWORD)
@@ -107,10 +89,12 @@ def sync_subscriptions():
def lock_reader(fn):
def lock_wrap(*args, **kwargs):
reader_lock.acquire()
- log.debug("got reader_lock")
- r = fn(*args, **kwargs)
+ try:
+ r = fn(*args, **kwargs)
+ except:
+ log.error("FAILURE")
+ log.error(traceback.format_exc())
reader_lock.release()
- log.debug("released reader_lock")
return r
return lock_wrap

0 comments on commit dfba58c

Please sign in to comment.