Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Fetching contributors…
Cannot retrieve contributors at this time
402 lines (339 sloc) 11.2 KB
# Python library for Remember The Milk API
__author__ = 'Sridhar Ratnakumar <>'
__all__ = (
import warnings
import urllib
import logging
from hashlib import md5
warnings.simplefilter('default', ImportWarning)
_use_simplejson = False
import simplejson
_use_simplejson = True
except ImportError:
from django.utils import simplejson
_use_simplejson = True
except ImportError:
if not _use_simplejson:
warnings.warn("simplejson module is not available, "
"falling back to the internal JSON parser. "
"Please consider installing the simplejson module from "
"", ImportWarning,
LOG = logging.getLogger(__name__)
class RTMError(Exception): pass
class RTMAPIError(RTMError): pass
class AuthStateMachine(object):
class NoData(RTMError): pass
def __init__(self, states):
self.states = states = {}
def dataReceived(self, state, datum):
if state not in self.states:
raise RTMError, "Invalid state <%s>" % state[state] = datum
def get(self, state):
if state in
raise AuthStateMachine.NoData, 'No data for <%s>' % state
class RTM(object):
def __init__(self, apiKey, secret, token=None):
self.apiKey = apiKey
self.secret = secret
self.authInfo = AuthStateMachine(['frob', 'token'])
# this enables one to do 'rtm.tasks.getList()', for example
for prefix, methods in API.items():
setattr(self, prefix,
RTMAPICategory(self, prefix, methods))
if token:
self.authInfo.dataReceived('token', token)
def _sign(self, params):
"Sign the parameters with MD5 hash"
pairs = ''.join(['%s%s' % (k,v) for k,v in sortedItems(params)])
return md5(self.secret+pairs).hexdigest()
def get(self, **params):
"Get the XML response for the passed `params`."
params['api_key'] = self.apiKey
params['format'] = 'json'
params['api_sig'] = self._sign(params)
json = openURL(SERVICE_URL, params).read()
LOG.debug("JSON response: \n%s" % json)
if _use_simplejson:
data = dottedDict('ROOT', simplejson.loads(json))
data = dottedJSON(json)
rsp = data.rsp
if rsp.stat == 'fail':
raise RTMAPIError, 'API call failed - %s (%s)' % (
rsp.err.msg, rsp.err.code)
return rsp
def getNewFrob(self):
rsp = self.get(method='rtm.auth.getFrob')
self.authInfo.dataReceived('frob', rsp.frob)
return rsp.frob
def getAuthURL(self):
frob = self.authInfo.get('frob')
except AuthStateMachine.NoData:
frob = self.getNewFrob()
params = {
'api_key': self.apiKey,
'perms' : 'delete',
'frob' : frob
params['api_sig'] = self._sign(params)
return AUTH_SERVICE_URL + '?' + urllib.urlencode(params)
def getToken(self):
frob = self.authInfo.get('frob')
rsp = self.get(method='rtm.auth.getToken', frob=frob)
self.authInfo.dataReceived('token', rsp.auth.token)
return rsp.auth.token
class RTMAPICategory:
"See the `API` structure and `RTM.__init__`"
def __init__(self, rtm, prefix, methods):
self.rtm = rtm
self.prefix = prefix
self.methods = methods
def __getattr__(self, attr):
if attr in self.methods:
rargs, oargs = self.methods[attr]
if self.prefix == 'tasksNotes':
aname = 'rtm.tasks.notes.%s' % attr
aname = 'rtm.%s.%s' % (self.prefix, attr)
return lambda **params: self.callMethod(
aname, rargs, oargs, **params)
raise AttributeError, 'No such attribute: %s' % attr
def callMethod(self, aname, rargs, oargs, **params):
# Sanity checks
for requiredArg in rargs:
if requiredArg not in params:
raise TypeError, 'Required parameter (%s) missing' % requiredArg
for param in params:
if param not in rargs + oargs:
warnings.warn('Invalid parameter (%s)' % param)
return self.rtm.get(method=aname,
# Utility functions
def sortedItems(dictionary):
"Return a list of (key, value) sorted based on keys"
keys = dictionary.keys()
for key in keys:
yield key, dictionary[key]
def openURL(url, queryArgs=None):
if queryArgs:
url = url + '?' + urllib.urlencode(queryArgs)
LOG.debug("URL> %s", url)
return urllib.urlopen(url)
class dottedDict(object):
"""Make dictionary items accessible via the object-dot notation."""
def __init__(self, name, dictionary):
self._name = name
if type(dictionary) is dict:
for key, value in dictionary.items():
if type(value) is dict:
value = dottedDict(key, value)
elif type(value) in (list, tuple) and key != 'tag':
value = [dottedDict('%s_%d' % (key, i), item)
for i, item in indexed(value)]
setattr(self, key, value)
raise ValueError, 'not a dict: %s' % dictionary
def __repr__(self):
children = [c for c in dir(self) if not c.startswith('_')]
return 'dotted <%s> : %s' % (
', '.join(children))
def safeEval(string):
return eval(string, {}, {})
def dottedJSON(json):
return dottedDict('ROOT', safeEval(json))
def indexed(seq):
index = 0
for item in seq:
yield index, item
index += 1
# API spec
API = {
'auth': {
[('auth_token',), ()],
[(), ()],
[('frob',), ()]
'contacts': {
[('timeline', 'contact'), ()],
[('timeline', 'contact_id'), ()],
[(), ()]
'groups': {
[('timeline', 'group'), ()],
[('timeline', 'group_id', 'contact_id'), ()],
[('timeline', 'group_id'), ()],
[(), ()],
[('timeline', 'group_id', 'contact_id'), ()],
'lists': {
[('timeline', 'name',), ('filter',)],
[('timeline', 'list_id'), ()],
[('timeline', 'list_id'), ()],
[(), ()],
[('timeline'), ('list_id')],
[('timeline', 'list_id', 'name'), ()],
[('timeline',), ('list_id',)]
'locations': {
[(), ()]
'reflection': {
[('methodName',), ()],
[(), ()]
'settings': {
[(), ()]
'tasks': {
[('timeline', 'name',), ('list_id', 'parse',)],
[('timeline', 'list_id', 'taskseries_id', 'task_id', 'tags'),
[('timeline', 'list_id', 'taskseries_id', 'task_id',), ()],
[('timeline', 'list_id', 'taskseries_id', 'task_id'), ()],
('list_id', 'filter', 'last_sync')],
[('timeline', 'list_id', 'taskseries_id', 'task_id', 'direction'),
[('timeline', 'from_list_id', 'to_list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id', 'tags'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
('due', 'has_due_time', 'parse')],
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id', 'name'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
[('timeline', 'list_id', 'taskseries_id', 'task_id'),
'tasksNotes': {
[('timeline', 'list_id', 'taskseries_id', 'task_id', 'note_title', 'note_text'), ()],
[('timeline', 'note_id'), ()],
[('timeline', 'note_id', 'note_title', 'note_text'), ()]
'test': {
[(), ()],
[(), ()]
'time': {
[('to_timezone',), ('from_timezone', 'to_timezone', 'time')],
[('text',), ('timezone', 'dateformat')]
'timelines': {
[(), ()]
'timezones': {
[(), ()]
'transactions': {
[('timeline', 'transaction_id'), ()]
def createRTM(apiKey, secret, token=None):
rtm = RTM(apiKey, secret, token)
if token is None:
print 'No token found'
print 'Give me access here:', rtm.getAuthURL()
raw_input('Press enter once you gave access')
print 'Note down this token for future use:', rtm.getToken()
return rtm
def test(apiKey, secret, token=None):
rtm = createRTM(apiKey, secret, token)
rspTasks = rtm.tasks.getList(filter='dueWithin:"1 week of today"')
print [ for t in rspTasks.tasks.list.taskseries]
rspLists = rtm.lists.getList()
# print rspLists.lists.list
print [(, for x in rspLists.lists.list]
def set_log_level(level):
'''Sets the log level of the logger used by the module.
>>> import rtm
>>> import logging
>>> rtm.set_log_level(logging.INFO)
Jump to Line
Something went wrong with that request. Please try again.