Permalink
Browse files

automatically sync last_read position between devices

  • Loading branch information...
1 parent 87a60c5 commit c4fee2a4aecff0928d97bceb56e600ca9b500038 @pwr committed May 13, 2012
@@ -1,4 +1,4 @@
-import os.path, logging
+import os.path
import binascii, time
import calibre
@@ -9,20 +9,33 @@
def apnx_path(book):
if book and book.file_path:
a_path = os.path.splitext(book.file_path)[0] + '.apnx'
- # logging.debug("checking for apnx file %s", apnx_path)
if os.path.isfile(a_path):
return a_path
-def get_last_read(asin, device = None):
- llr = _db.list_last_read(asin, 1, device)
+def get_last_read(asin):
+ llr = _db.list_last_read(asin, 1)
return llr[0] if llr else None
-
def has(asin):
return asin and _db.list_last_read(asin, 1)
-list_last_read = _db.list_last_read
get_all = _db.get_all
+list_last_read = _db.list_last_read
+
+def get_last_read_updates(device, furthest = True):
+ """
+ get last_read records from other devices that this device should be notified about
+ if 'furthest' is True, records are picked by the furthest location in the book
+ otherwise, the latest records are picked
+ """
+ lru = _db.get_last_read_updates(device.serial, furthest)
+ # only return last_read updates for the books we know to be on the device
+ return [ lr for lr in lru if device.books.get(lr.asin) > 0 ]
+
+def last_read_updated(device, asin):
+ # a device has applied last_read updates for a book, so we can delete its entry
+ # this way the next time the device asks for last_read updates, this book will be skipped
+ _db.delete_last_read(device.serial, asin)
def _bin(state):
if state:
View
@@ -6,18 +6,21 @@
def _namedtuple_row_factory(cursor, row):
- fields = [ col[0] for col in cursor.description ]
- Row = namedtuple('Row', fields)
- return Row(*row)
+ fields = [ col[0] for col in cursor.description ]
+ Row = namedtuple('Row', fields)
+ return Row(*row)
def _execute(query, parameters = ()):
- with sqlite3(_db_path) as db:
- if query.startswith('INSERT INTO '):
- qmarks = ('?', ) * len(parameters)
- query = query.replace('*', ','.join(qmarks))
- logging.debug("execute %s %s", query, parameters)
- db.execute(query, parameters)
- db.commit()
+ try:
+ with sqlite3(_db_path) as db:
+ if query.startswith('INSERT INTO '):
+ qmarks = ('?', ) * len(parameters)
+ query = query.replace('*', ','.join(qmarks))
+ db.execute(query, parameters)
+ db.commit()
+ logging.debug("execute %s %s", query, parameters)
+ except:
+ logging.exception("execute %s %s", query, parameters)
def get_all(asin):
with sqlite3(_db_path) as db:
@@ -31,8 +34,6 @@ def list_last_read(asin, count = -1):
# lists all last_reads, in reverse timestamp order
with sqlite3(_db_path) as db:
db.row_factory = _namedtuple_row_factory
- # it's enough to check the last_read table
- # if there are no entries there, quite unlikely to have bookmarks/notes/etc
return [ lr for lr in db.execute('SELECT * FROM last_read2 WHERE asin = ? ORDER BY timestamp DESC LIMIT ' + str(count), (asin, )) ]
def set_last_read(device_serial, asin, timestamp, begin, position, state):
@@ -41,6 +42,27 @@ def set_last_read(device_serial, asin, timestamp, begin, position, state):
_execute('INSERT INTO last_read2 (id, asin, device, timestamp, begin, pos, state) VALUES (*)',
(None, asin, device_serial, timestamp, begin, position, state))
+def delete_last_read(device_serial, asin):
+ _execute('DELETE FROM last_read2 WHERE asin = ? AND device = ?', (asin, device_serial))
+
+def get_last_read_updates(device_serial, furthest = True):
+ with sqlite3(_db_path) as db:
+ db.row_factory = _namedtuple_row_factory
+ # get all book ids for which this device has last_read entries
+ device_books = [ r[0] for r in db.execute('SELECT asin FROM last_read2 WHERE device = ?', (device_serial, )) ]
+ # logging.debug("%s has last_read for %s", device_serial, device_books)
+ # get all entries where the latest read was done by some other device
+ if furthest:
+ last_read_query = 'SELECT * FROM last_read2 GROUP BY asin HAVING pos = MAX(pos) AND device != ?'
+ else:
+ last_read_query = 'SELECT * FROM last_read2 GROUP BY asin HAVING timestamp = MAX(timestamp) AND device != ?'
+ latest_lr = [ lr for lr in db.execute(last_read_query, (device_serial, )) ]
+ # only pick the latest entries done by other devices, when this device also has an entry
+ latest_lr = [ lr for lr in latest_lr if lr.asin in device_books ]
+ if latest_lr:
+ logging.debug("%s needs to update last_read from %s", device_serial, [ (lr.asin, lr.device, lr.pos) for lr in latest_lr ])
+ return latest_lr
+
def create(device_serial, asin, kind, timestamp, begin, end, position, state, text):
_execute('INSERT INTO annotations2 VALUES (*)',
(None, asin, device_serial, kind, timestamp, begin, end, position, state, text, None))
@@ -73,6 +95,7 @@ def modify(device_serial, asin, kind, timestamp, begin, end, text):
)''')
_execute('CREATE INDEX IF NOT EXISTS index_last_read2_asin_timestamp_desc ON last_read2 (asin, timestamp DESC)')
_execute('CREATE INDEX IF NOT EXISTS index_last_read2_asin_device ON last_read2 (asin, device)')
+_execute('CREATE INDEX IF NOT EXISTS index_last_read2_device ON last_read2 (device)')
_execute('''
CREATE TABLE IF NOT EXISTS annotations2 (
@@ -7,12 +7,15 @@
def _migrate_2_last_read(db_path, row_factory):
try:
with sqlite3(db_path) as db:
+ try: db.execute('SELECT asin FROM last_read LIMIT 1')
+ except: return
db.row_factory = row_factory
for lr in db.execute('SELECT * FROM last_read'):
# all the timestamp the device sent are local time
timestamp = parse_timestamp(lr.timestamp)
db.execute('INSERT INTO last_read2 VALUES (?, ?, ?, ?, ?, ?, ?)',
(None, lr.asin, 'UNKNOWN', timestamp, lr.begin, lr.pos, lr.state))
+ db.commit()
except:
logging.exception("migrating to last_read2")
finally:
@@ -21,12 +24,15 @@ def _migrate_2_last_read(db_path, row_factory):
def _migrate_2_annotations(db_path, row_factory):
try:
with sqlite3(db_path) as db:
+ try: db.execute('SELECT asin FROM annotations LIMIT 1')
+ except: return
db.row_factory = row_factory
for anot in db.execute('SELECT * FROM annotations'):
# all the timestamp the device sent are local time
timestamp = parse_timestamp(anot.timestamp)
db.execute('INSERT INTO annotations2 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
(None, anot.asin, 'UNKNOWN', anot.kind, timestamp, anot.begin, anot.end, anot.pos, anot.state, anot.text, None))
+ db.commit()
except:
logging.exception("migrating to annotations2")
finally:
View
@@ -63,12 +63,6 @@ def update_all(devices):
)''')
_execute('CREATE INDEX IF NOT EXISTS index_devices_serial ON devices ( serial )')
-# guessed wrong.
-try: _execute('ALTER TABLE devices ADD COLUMN alias TEXT')
-except: pass
-try: _execute('ALTER TABLE devices ADD COLUMN kind TEXT')
-except: pass
-try: _execute('ALTER TABLE devices ADD COLUMN lto INTEGER DEFAULT -1')
-except: pass
-
-_execute('VACUUM')
+from devices.db_migration import migrate_3
+migrate_3(_db_path)
+del migrate_3
@@ -0,0 +1,12 @@
+from sqlite3 import connect as sqlite3
+
+
+def migrate_3(db_path):
+ with sqlite3(db_path) as db:
+ try: db.execute('ALTER TABLE devices ADD COLUMN alias TEXT')
+ except: pass
+ try: db.execute('ALTER TABLE devices ADD COLUMN kind TEXT')
+ except: pass
+ try: db.execute('ALTER TABLE devices ADD COLUMN lto INTEGER DEFAULT -1')
+ except: pass
+ db.execute('VACUUM')
@@ -10,7 +10,7 @@
import calibre, devices
-_LAST_READ = '<last_read annotation_time_utc="%d" country_code="%s" lto="%d" pos="%d" source_device="%s" version="0" />'
+_LAST_READ = '<last_read annotation_time_utc="%d" lto="%d" pos="%d" source_device="%s" method="FRL" version="0"/>'
def _last_read(book, exclude_device = None):
lr_list = []
@@ -21,8 +21,11 @@ def _last_read(book, exclude_device = None):
alias = device.alias if device else lr.device
alias = lr.device if alias is None \
else alias.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;').replace('\'', '&apos;')
- lr_list.append(_LAST_READ % (lr.timestamp * 1000, 'US', device_lto(device), lr.pos, alias))
- xml = '<?xml version="1.0" encoding="UTF-8"?><book>' + ''.join(lr_list) + '</book>'
+ lr_list.append(_LAST_READ % (lr.timestamp * 1000, device_lto(device), lr.pos, alias))
+ if lr_list:
+ xml = '<?xml version="1.0" encoding="UTF-8"?><book>' + ''.join(lr_list) + '</book>'
+ else:
+ xml = '<?xml version="1.0" encoding="UTF-8"?><book/>'
return DummyResponse(headers = { 'Content-Type': 'text/xml;charset=UTF-8' }, data = bytes(xml, 'UTF-8'))
@@ -5,8 +5,10 @@
from handlers.dummy import DummyResponse
from handlers.ksp import _servers_config, _first_contact
from handlers import is_uuid, TODO, TODO_PATH
-import calibre, qxml
+import devices, calibre, qxml
import config, features
+import annotations
+from annotations.lto import device_lto
def _rewrite_url(url):
@@ -21,22 +23,21 @@ def _rewrite_url(url):
url = url[:m.start()] + m.expand(replacement) + url[m.end():]
return url
-def _add_item(x_items, action, item_type, key = 'NONE', text = None, priority = 600, url = None, forced = False):
+def _add_item(x_items, action, item_type, key = 'NONE', text = None, priority = 600, sequence = 0, url = None, body = None, **kwargs):
item = qxml.add_child(x_items, 'item')
item.setAttribute('action', str(action))
item.setAttribute('is_incremental', 'false')
item.setAttribute('key', str(key))
item.setAttribute('priority', str(priority))
- item.setAttribute('sequence', '0')
+ item.setAttribute('sequence', str(sequence))
item.setAttribute('type', str(item_type))
if url:
item.setAttribute('url', url)
- if text:
- if forced:
- qxml.add_child(item, 'title', text)
- qxml.add_child(item, 'forced', 'true')
- else:
- qxml.set_text(item, text)
+ if body:
+ qxml.set_text(item, body)
+ else:
+ for k, v in kwargs.items():
+ qxml.add_child(item, k, str(v))
return item
def _filter_item(x_items, x_item):
@@ -79,44 +80,18 @@ def _filter_item(x_items, x_item):
x_items.removeChild(x_item)
return True
- # very unlikely for these to change upstream for books not downloaded from Amazon...
- # if action == 'UPD_ANOT' or action == 'UPD_LPRD':
- # # annotations and LPRD (last position read?)
- # item_key = x_item.getAttribute('key')
- # if is_uuid(item_key):
- # x_items.removeChild(x_item)
- # return True
-
return False
-def _process_xml(doc, device, reason):
- x_response = qxml.get_child(doc, 'response')
- x_items = qxml.get_child(x_response, 'items')
- if not x_items:
- return False
-
+def _consume_action_queue(device, x_items):
was_updated = False
-
- # rewrite urls
- for x_item in qxml.list_children(x_items, 'item'):
- was_updated |= _filter_item(x_items, x_item)
-
- if features.download_updated_books:
- for book in calibre.books().values():
- if book.needs_update_on(device) and book.cde_content_type in ('EBOK', ): # PDOC updates are not supported ATM
- logging.warn("book %s updated in library, telling device %s to download it again", book, device)
- # <item action="GET" is_incremental="false" key="asin" priority="600" sequence="0" type="EBOK">title</item>
- _add_item(x_items, 'GET', book.cde_content_type, key = book.asin, text = book.title, forced = True) # book.title)
- was_updated = True
-
while device.actions_queue:
action = device.actions_queue.pop()
# logging.debug("checking action %s", action)
if list(qxml.filter(x_items, 'item', action = action[0], type = action[1])):
# logging.debug("action %s already found in %s, skipping", action, x_items)
continue
if action == 'SET_SCFG':
- _add_item(x_items, 'SET', 'SCFG', text = _servers_config(device), key = 'KSP.set.scfg', priority = 100)
+ _add_item(x_items, 'SET', 'SCFG', key = 'KSP.set.scfg', priority = 100, body = _servers_config(device))
was_updated = True
elif action == 'UPLOAD_SNAP':
_add_item(x_items, 'UPLOAD', 'SNAP', key = 'KSP.upload.snap', priority = 1000, url = config.server_url + 'FionaCDEServiceEngine/UploadSnapshot')
@@ -129,6 +104,42 @@ def _process_xml(doc, device, reason):
was_updated = True
else:
logging.warn("unknown action %s", action)
+ return was_updated
+
+def _update_annotations(device, x_items):
+ was_updated = False
+ lru = annotations.get_last_read_updates(device)
+ # logging.debug("%s has last_read updates: %s", device, lru)
+ for lr in lru:
+ # LPRD is only supported by EBOKs
+ source_device = devices.get(lr.device)
+ source_device_alias = (source_device.alias or source_device.serial) if source_device else lr.device
+ _add_item(x_items, 'UPD_LPRD', 'EBOK', key = lr.asin, priority = 1100, sequence = lr.pos,
+ source_device = source_device_alias, lto = device_lto(source_device), annotation_time_utc = lr.timestamp)
+ was_updated = True
+ return was_updated
+
+def _process_xml(doc, device, reason):
+ x_response = qxml.get_child(doc, 'response')
+ x_items = qxml.get_child(x_response, 'items')
+ if not x_items:
+ return False
+
+ was_updated = False
+
+ # rewrite urls
+ for x_item in qxml.list_children(x_items, 'item'):
+ was_updated |= _filter_item(x_items, x_item)
+
+ was_updated |= _consume_action_queue(device, x_items)
+ was_updated |= _update_annotations(device, x_items)
+
+ if features.download_updated_books:
+ for book in calibre.books().values():
+ if book.needs_update_on(device) and book.cde_content_type in ('EBOK', ): # PDOC updates are not supported ATM
+ logging.warn("book %s updated in library, telling device %s to download it again", book, device)
+ _add_item(x_items, 'GET', book.cde_content_type, key = book.asin, title = book.title, forced = True)
+ was_updated = True
if was_updated:
x_total_count = qxml.get_child(x_response, 'total_count')
@@ -6,6 +6,7 @@
from handlers import is_uuid, TODO, TODO_PATH
import calibre, qxml
import features
+import annotations
_DUMMY_BODY = b'<?xml version="1.0" encoding="UTF-8"?><response><status>SUCCESS</status></response>'
@@ -18,20 +19,27 @@ def _process_item(device, action = None, cde_type = None, key = None, complete_s
if not features.allow_logs_upload and action == 'SND' and cde_type == 'CMND' and key.endswith(':SYSLOG:UPLOAD'):
return True
- if action in ('GET', 'DOWNLOAD') and cde_type in ('EBOK', 'PDOC', 'APNX'):
- if is_uuid(key, cde_type):
- book = calibre.book(key)
- if complete_status == 'COMPLETED':
- if book:
- book.mark_downloaded_by(device)
- else:
- logging.warn("%s successfully downloaded missing book %s", device, book)
- elif complete_status == 'FAILED':
- logging.warn("device failed to download book %s %s", key, book)
+ if action in ('GET', 'DOWNLOAD') and cde_type in ('EBOK', 'PDOC', 'APNX') and is_uuid(key, cde_type):
+ book = calibre.book(key)
+ if complete_status == 'COMPLETED':
+ if book:
+ book.mark_downloaded_by(device)
else:
- logging.warn("%s: unknown downloaded status %s for book %s", device, complete_status, book)
+ logging.warn("%s successfully updated unknown book %s", device, key)
+ elif complete_status == 'FAILED':
+ logging.warn("%s failed to update book %s", device, book or key)
+ else:
+ logging.warn("%s: unknown downloaded status %s for %s", device, complete_status, book or key)
+ return True
- return True
+ if action == 'UPD_LPRD' and is_uuid(key, cde_type):
+ if complete_status == 'COMPLETED':
+ annotations.last_read_updated(device, key)
+ elif complete_status == 'FAILED':
+ logging.warn("%s failed to update last_read for book %s", device, key)
+ else:
+ logging.warn("%s: unknown UPD_LPRD status %s for book %s", device, complete_status, key)
+ return True
return False

0 comments on commit c4fee2a

Please sign in to comment.