Skip to content

Commit

Permalink
write 'sync status' doc and send welcome mail
Browse files Browse the repository at this point in the history
--HG--
extra : rebase_source : 7e792689ca89fb10106680d89c99c50f9e160c19
  • Loading branch information
mhammond committed Jan 22, 2010
1 parent 9be58eb commit f360cdc
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 7 deletions.
10 changes: 10 additions & 0 deletions couch_docs/wq/rd.ext.core.send-welcome-message.json
@@ -0,0 +1,10 @@
{
"schemas" : {
"rd.ext.workqueue" : {
"source_schemas" : ["rd.core.sync-status"],
"code" : "RDFILE: *.py",
"content_type" : "application/x-python",
"info": "Sends a 'welcome' email after the first sync completes"
}
}
}
29 changes: 29 additions & 0 deletions couch_docs/wq/rd.ext.core.send-welcome-message.py
@@ -0,0 +1,29 @@
def handler(doc):
if doc['num_syncs'] != 1:
return

key = ['rd.account', 'proto', 'smtp']
result = open_view(key=key, reduce=False, include_docs=True)
rows = result['rows']
if not rows:
logger.warn("can't find an smtp account from which to send welcome email")
return
acct = rows[0]['doc']

# write a simple outgoing schema
addy = acct['username']
body = 'no really - welcome! Raindrop just synchronized %d of your messages' % doc['new_items']
item = {'body' : body,
'from' : ['email', addy],
'from_display': 'raindrop',
'to' : [
['email', addy],
],
'to_display' : ['you'],
'subject': "Welcome to raindrop",
# The 'state' bit...
'sent_state': None,
'outgoing_state': 'outgoing',
}
emit_schema('rd.msg.outgoing.simple', item)
logger.info("queueing welcome mail to '%s'", addy)
2 changes: 1 addition & 1 deletion server/python/raindrop/proto/imap.py
Expand Up @@ -205,7 +205,7 @@ def __init__(self, account, conductor, options):
def write_items(self, items):
try:
if items:
_ = yield self.conductor.pipeline.provide_schema_items(items)
_ = yield self.conductor.provide_schema_items(items)
except DocumentSaveError, exc:
# So - conflicts are a fact of life in this 'queue' model: we check
# if a record exists and it doesn't, so we queue the write. By the
Expand Down
2 changes: 1 addition & 1 deletion server/python/raindrop/proto/rss.py
Expand Up @@ -81,7 +81,7 @@ def maybe_update_doc(conductor, doc_model, doc, options):
si['items'] = items
si['attachments'] = a
si['_rev'] = doc['_rev']
_ = yield conductor.pipeline.provide_schema_items([si])
_ = yield conductor.provide_schema_items([si])
logger.info('updated feed %r', uri)


Expand Down
4 changes: 2 additions & 2 deletions server/python/raindrop/proto/skype.py
Expand Up @@ -242,7 +242,7 @@ def gen_items(self, chat_props, todo, msgs_by_id, need_chat):
).addCallback(self._cb_got_msg_props, chat_props, msg, tow)

if tow:
yield self.conductor.pipeline.provide_schema_items(tow)
yield self.conductor.provide_schema_items(tow)

logger.debug("finished processing chat %(skype_chatname)r", chat_props)

Expand Down Expand Up @@ -283,7 +283,7 @@ def _cb_got_friends(self, friends):
'rd_schema_id' : 'rd.identity.exists',
'items' : None,
'rd_ext_id': self.rd_extension_id})
return self.conductor.pipeline.provide_schema_items(schemas)
return self.conductor.provide_schema_items(schemas)


class SkypeAccount(base.AccountBase):
Expand Down
1 change: 1 addition & 0 deletions server/python/raindrop/proto/smtp.py
Expand Up @@ -103,6 +103,7 @@ def do_couchy():
logger.error("Failed to talk to couch\n%s",
Failure().getTraceback())
self._disconnectFromServer()
raise

def do_base(result):
if isinstance(result, Failure):
Expand Down
4 changes: 2 additions & 2 deletions server/python/raindrop/proto/test/__init__.py
Expand Up @@ -70,8 +70,8 @@ def sync_generator(self):
for i in xrange(num_docs):
yield self.check_test_message(i)
if self.bulk_docs:
pipeline = self.conductor.pipeline
yield pipeline.provide_schema_items(self.bulk_docs
conductor = self.conductor
yield conductor.provide_schema_items(self.bulk_docs
).addCallback(self.saved_bulk_messages, len(self.bulk_docs),
)

Expand Down
2 changes: 1 addition & 1 deletion server/python/raindrop/proto/twitter.py
Expand Up @@ -188,7 +188,7 @@ def attached(self, twit):
'items': items})

if infos:
_ = yield self.conductor.pipeline.provide_schema_items(infos)
_ = yield self.conductor.provide_schema_items(infos)


class TwitterAccount(base.AccountBase):
Expand Down
32 changes: 32 additions & 0 deletions server/python/raindrop/sync.py
Expand Up @@ -22,6 +22,7 @@
#

import logging
import time

from twisted.internet import reactor, defer
from twisted.python.failure import Failure
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(self, pipeline):
self.all_accounts = None
self.calllaters_waiting = {} # keyed by ID, value is a IDelayedCall
self.deferred = None
self.num_new_items = None

def _ohNoes(self, failure, *args, **kwargs):
logger.error('OH NOES! failure! %s', failure)
Expand Down Expand Up @@ -232,15 +234,45 @@ def sync(self, options, incoming=True, outgoing=True):
dl.append(self.sync_incoming(options))
return defer.DeferredList(dl)

@defer.inlineCallbacks
def provide_schema_items(self, items):
_ = yield self.pipeline.provide_schema_items(items)
self.num_new_items += len(items)

@defer.inlineCallbacks
def sync_outgoing(self, options):
# start looking for outgoing schemas to sync...
dl = (yield self._do_sync_outgoing())
_ = yield defer.DeferredList(dl)

@defer.inlineCallbacks
def _record_sync_status(self, result):
rd_key = ["raindrop", "sync-status"]
schema_id = 'rd.core.sync-status'
# see if an existing schema exists to get the existing number.
si = (yield self.doc_model.open_schemas([(rd_key, schema_id)]))[0]
num_syncs = 0 if si is None else si['num_syncs']

# a timestamp in UTC
items = {'timestamp': time.mktime(time.gmtime()),
'new_items': self.num_new_items,
'num_syncs': num_syncs + 1,
}
si = {'rd_key': rd_key,
'rd_schema_id': schema_id,
'rd_source': None,
'rd_ext_id': 'rd.core',
'items': items,
}
_ = yield self.pipeline.provide_schema_items([si])
self.num_new_items = None

def sync_incoming(self, options):
assert self.num_new_items is None # eek - we didn't reset correctly...
self.num_new_items = 0
if self.deferred is None:
self.deferred = defer.Deferred()
self.deferred.addCallback(self._record_sync_status)
# start synching all 'incoming' accounts.
accts = self._get_specified_accounts(options)
for account in accts:
Expand Down
36 changes: 36 additions & 0 deletions server/python/raindrop/tests/test_sync.py
@@ -0,0 +1,36 @@
from twisted.internet import defer

from raindrop.tests import TestCaseWithTestDB, FakeOptions
from raindrop.model import get_doc_model
from raindrop.proto import test as test_proto

import logging
logger = logging.getLogger(__name__)

class TestSyncing(TestCaseWithTestDB):
def make_config(self):
config = TestCaseWithTestDB.make_config(self)
# now add our smtp account
acct = config.accounts['test_smtp'] = {}
acct['proto'] = 'smtp'
acct['username'] = 'test_raindrop@test.mozillamessaging.com'
acct['id'] = 'smtp_test'
acct['ssl'] = False
return config

@defer.inlineCallbacks
def test_sync_state_doc(self, expected_num_syncs=1):
_ = yield self.deferMakeAnotherTestMessage(None)
_ = yield self.ensure_pipeline_complete()
# open the document with the sync state.
wanted = ["raindrop", "sync-status"], 'rd.core.sync-status'
si = (yield self.doc_model.open_schemas([wanted]))[0]
self.failUnless(si)
self.failUnlessEqual(si.get('new_items'), 1)
self.failUnlessEqual(si.get('num_syncs'), expected_num_syncs)

@defer.inlineCallbacks
def test_sync_state_doc_twice(self):
# make sure it works twice with the same conductor instance/db
_ = yield self.test_sync_state_doc()
_ = yield self.test_sync_state_doc(2)

0 comments on commit f360cdc

Please sign in to comment.