#!/usr/bin/python -u
# encoding: utf-8
# standard Python library imports
from __future__ import with_statement
import os
import sys
import urllib
import urllib2
from xml.sax.saxutils import escape
from xml.sax import SAXException
import codecs
import imghdr
from collections import defaultdict
import time
import netrc
import locale
import shutil
from glob import glob
# extra required packages
import xmltramp
join = os.path.join
# add another JPEG recognizer
# see
def test_jpg(h, f):
if h[:3] == '\xFF\xD8\xFF' and h[3] in "\xDB\xE0\xE1\xE2\xE3":
return 'jpg'
# variable directory names, will be set in TumblrBackup.backup()
save_folder = ''
image_folder = ''
# constant names
root_folder = os.getcwdu()
post_dir = 'posts'
xml_dir = 'xml'
image_dir = 'images'
archive_dir = 'archive'
theme_dir = 'theme'
backup_css = 'backup.css'
custom_css = 'custom.css'
avatar_base = 'avatar'
# HTML fragments
post_header = ''
footer = u'</body>\n</html>\n'
post_ext = '.html'
have_custom_css = False
# ensure the right date/time format
locale.setlocale(locale.LC_TIME, '')
except locale.Error:
def log(s):
if not options.quiet:
print s,
def mkdir(dir, recursive=False):
if not os.path.exists(dir):
if recursive:
def path_to(*parts):
return join(save_folder, *parts)
def open_text(*parts):
if len(parts) > 1:
return*parts), 'w', 'utf-8')
def xmlparse(url, data=None):
for _ in range(10):
resp = urllib2.urlopen(url, data)
except (urllib2.URLError, urllib2.HTTPError) as e:
sys.stderr.write('%r getting %s\n' % (e, url))
if == 'text/xml':
return None
xml =
doc = xmltramp.parse(xml)
except SAXException as e:
sys.stderr.write('%s %r\n\n%r\n\n%s\n' % (, resp.msg, e, xml))
return None
return doc if doc._name == 'tumblr' else None
def save_image(image_url):
"""saves an image if not saved yet"""
image_filename = image_url.split('/')[-1]
glob_filter = '' if '.' in image_filename else '.*'
# check if a file with this name already exists
image_glob = glob(join(image_folder, image_filename + glob_filter))
if image_glob:
return os.path.split(image_glob[0])[1]
# download the image data
image_response = urllib2.urlopen(image_url)
image_data =
# determine the file type if it's unknown
if '.' not in image_filename:
image_type = imghdr.what(None, image_data[:32])
if image_type:
image_filename += '.' + image_type.replace('jpeg', 'jpg')
# save the image
with open(join(image_folder, image_filename), 'wb') as image_file:
return image_filename
def save_style():
with open_text(backup_css) as css:
body { width: 720px; margin: 0 auto; }
img { max-width: 720px; }
blockquote { margin-left: 0; border-left: 8px #999 solid; padding: 0 24px; }
.archive h1, .subtitle, article { padding-bottom: 0.75em; border-bottom: 1px #ccc dotted; }
.post a.llink, .archive a.tlink { display: none; }
.meta a { text-decoration: none; }
.avatar { float: right; }
def header(heading, title='', body_class='', subtitle='', avatar=''):
root_rel = '' if body_class == 'index' else '../'
css_rel = root_rel + (custom_css if have_custom_css else backup_css)
if body_class:
body_class = ' class=' + body_class
h = u'''<!DOCTYPE html>
<head><meta charset=utf-8><title>%s</title>
<link rel=stylesheet type=text/css href=%s>
''' % (heading, css_rel, body_class)
if avatar:
h += '<img src=%s%s/%s alt=Avatar class=avatar>\n' % (root_rel, theme_dir, avatar)
if title:
h += u'<h1>%s</h1>\n' % title
if subtitle:
h += u'<p class=subtitle>%s</p>\n' % subtitle
return h
def get_theme(account, host, user, password):
theme_folder = path_to(theme_dir)
shutil.rmtree(theme_folder, True)
tumblr = xmlparse('http://%s/api/authenticate' % host,
'email': user, 'password': password, 'include-theme': '1'
if not tumblr:
for log in tumblr['tumblelog':]:
attrs = log()
if attrs.get('name') != account:
if hasattr(log, 'custom-css') and len(log['custom-css']):
with open_text(theme_dir, 'custom.css') as f:
if hasattr(log, 'theme-source') and len(log['theme-source']):
with open_text(theme_dir, 'theme.html') as f:
avatar_url = attrs.get('avatar-url')
if avatar_url:
avatar = urllib2.urlopen(avatar_url)
avatar_file = avatar_base + '.' + avatar_url.split('.')[-1]
with open(join(theme_folder, avatar_file), 'wb') as f:
class TumblrBackup:
def build_index(self):
for f in glob(path_to(post_dir, '*.html')):
post = LocalPost(f)
def save_index(self):
f = glob(path_to(theme_dir, avatar_base + '.*'))
avatar = os.path.split(f[0])[1] if f else None
with open_text('index.html') as idx:
idx.write(header(self.title, self.title, body_class='index',
subtitle=self.subtitle, avatar=avatar
for year in sorted(self.index.keys(), reverse=options.reverse_index):
self.save_year(idx, year)
idx.write('<p>Generated on %s.</p>\n' % time.strftime('%x %X'))
def save_year(self, idx, year):
idx.write('<h3>%s</h3>\n<ul>\n' % year)
for month in sorted(self.index[year].keys(), reverse=options.reverse_index):
tm = time.localtime(time.mktime([year, month, 3, 0, 0, 0, 0, 0, -1]))
month_name = self.save_month(year, month, tm)
idx.write(' <li><a href=%s/%s title="%d post(s)">%s</a></li>\n' % (
archive_dir, month_name, len(self.index[year][month]),
time.strftime('%B', tm).decode('utf-8')
def save_month(self, year, month, tm):
file_name = '%d-%02d.html' % (year, month)
with open_text(archive_dir, file_name) as arch:
header(self.title, time.strftime('%B %Y', tm).decode('utf-8'), body_class='archive'),
'\n\n'.join(p.get_post() for p in sorted(
self.index[year][month], key=lambda x:, reverse=options.reverse_month
'<p><a href=../>Index</a></p>',
return file_name
def backup(self, account):
"""makes single files and an index for every post on a public Tumblr blog account"""
# construct the tumblr API URL
base = 'http://' + account
if '.' not in account:
base += ''
base += '/api/read'
# make sure there are folders to save in
global save_folder, image_folder, post_ext, post_dir, have_custom_css
if options.blosxom:
save_folder = root_folder
post_ext = '.txt'
post_dir = os.curdir
post_class = BlosxomPost
save_folder = join(root_folder, account)
image_folder = path_to(image_dir)
post_class = TumblrPost
have_custom_css = os.access(path_to(custom_css), os.R_OK)
mkdir(save_folder, True)
self.post_count = 0
# prepare the period start and end timestamps
if options.period:
i = 0; tm = [int(options.period[:4]), 1, 1, 0, 0, 0, 0, 0, -1]
if len(options.period) >= 6:
i = 1; tm[1] = int(options.period[4:6])
if len(options.period) == 8:
i = 2; tm[2] = int(options.period[6:8])
p_start = time.mktime(tm)
tm[i] += 1
p_stop = time.mktime(tm)
if options.theme:
# if .netrc contains the login, get the style info
host = ''
auth = netrc.netrc().authenticators(host)
if auth:
log("Getting the theme\r")
get_theme(account, host, auth[0], auth[2])
# get the highest post id already saved
ident_max = None
if options.incremental:
ident_max = max(
for f in glob(path_to(post_dir, '*' + post_ext))
log('Backing up posts after %d\n' % ident_max)
except ValueError: # max() arg is an empty sequence
# start by calling the API with just a single post
log("Getting basic information\r")
soup = xmlparse(base + '?num=1')
if not soup:
# collect all the meta information
tumblelog = soup.tumblelog
self.title = escape(tumblelog('title'))
except KeyError:
self.title = account
self.subtitle = unicode(tumblelog)
# use the meta information to create a HTML header
global post_header
post_header = header(self.title, body_class='post')
# find the total number of posts
total_posts = options.count or int(soup.posts('total'))
last_post = options.skip + total_posts
def _backup(posts):
for p in sorted(posts, key=lambda x: long(x('id')), reverse=True):
post = post_class(p)
if ident_max and long(post.ident) <= ident_max:
return False
if options.period:
if >= p_stop:
if < p_start:
return False
if post.error:
sys.stderr.write('%r in post #%s%s\n' % (post.error, post.ident, 50 * ' '))
self.post_count += 1
return True
# Get the XML entries from the API, which we can only do for max 50 posts at once.
# Posts "arrive" in reverse chronological order. Post #0 is the most recent one.
MAX = 50
for i in range(options.skip, last_post, MAX):
# find the upper bound
j = min(i + MAX, last_post)
log("Getting posts %d to %d of %d...\r" % (i, j - 1, total_posts))
soup = xmlparse('%s?num=%d&start=%d' % (base, j - i, i))
if soup is None:
if not _backup(soup.posts['post':]):
if not options.blosxom and self.post_count:
if not have_custom_css:
self.index = defaultdict(lambda: defaultdict(list))
log("%d posts backed up" % self.post_count + 50 * ' ' + '\n')
class TumblrPost:
def __init__(self, post):
self.content = ''
self.xml_content = post.__repr__(1, 1)
self.ident = post('id')
self.url = post('url')
self.typ = post('type') = int(post('unix-timestamp')) = time.localtime(
self.title = ''
self.tags = []
self.file_name = self.ident + post_ext
self.error = None
except Exception, e:
self.error = e
self.content = u'<p class=error>%r</p>\n<pre>%s</pre>' % (e, escape(self.xml_content))
def generate_content(self, post):
"""generates the content for this post"""
content = []
def append(s, fmt=u'%s'):
# the %s conversion calls unicode() on the xmltramp element
content.append(fmt % s)
def get_try(elt):
return unicode(post[elt])
except KeyError:
return ''
def append_try(elt, fmt=u'%s'):
elt = get_try(elt)
if elt:
append(elt, fmt)
if self.typ == 'regular':
self.title = get_try('regular-title')
elif self.typ == 'photo':
url = get_try('photo-link-url')
for p in post.photoset['photo':] if hasattr(post, 'photoset') else [post]:
append(self.get_image_url(p['photo-url']), u'<img alt="" src="%s">')
if url:
content[-1] = '<a href="%s">%s</a>' % (url, content[-1])
content[-1] = '<p>' + content[-1] + '</p>'
if p._name == 'photo' and p('caption'):
append(p('caption'), u'<p>%s</p>')
elif self.typ == 'link':
self.title = u'<a href="%s">%s</a>' % (post['link-url'], post['link-text'])
elif self.typ == 'quote':
append(post['quote-text'], u'<blockquote>%s</blockquote>')
append_try('quote-source', u'<p>%s</p>')
elif self.typ == 'video':
source = unicode(post['video-source']).strip()
if source.startswith('<iframe') or source.startswith('<object'):
append(source, u'<p>%s</p>')
append(post['video-player'], u'<p>%s</p>')
append(source, u'<p><a href="%s">Original</a></p>')
elif self.typ == 'audio':
elif self.typ == 'answer':
self.title = post.question
elif self.typ == 'conversation':
self.title = get_try('conversation-title')
'<br>\n'.join(escape(unicode(l)) for l in post.conversation['line':]),
raise ValueError('Unknown post type: ' + self.typ)
self.tags = [u'%s' % t for t in post['tag':]]
self.content = '\n'.join(content)
def get_image_url(self, url):
return u'../%s/%s' % (image_dir, save_image(unicode(url)))
def get_post(self):
"""returns this post in HTML"""
post = post_header + '<article class=%s id=p-%s>\n' % (self.typ, self.ident)
post += '<p class=meta><span class=date>%s</span>\n' % time.strftime('%x %X',
post += u'<a class=llink href=../%s/%s>¶</a>\n' % (post_dir, self.file_name)
post += u'<a class=tlink href=%s>●</a></p>\n' % self.url
if self.title:
post += '<h2>%s</h2>\n' % self.title
post += self.content
if self.tags:
post += u'\n<p class=tags>%s</p>' % u' '.join(u'#' + t for t in self.tags)
post += '\n</article>\n\n' + footer
return post
def save_post(self):
"""saves this post locally"""
with open_text(post_dir, self.file_name) as f:
os.utime(path_to(post_dir, self.file_name),
if options.xml:
with open_text(xml_dir, self.ident + '.xml') as f:
class BlosxomPost(TumblrPost):
def get_image_url(self, url):
return url
def get_post(self):
"""returns this post as a Blosxom post"""
post = self.title + '\nmeta-id: _' + self.ident + '\nmeta-url: ' + self.url
if self.tags:
post += '\nmeta-tags: ' + ' '.join(t.replace(' ', '+') for t in self.tags)
post += '\n\n' + self.content
return post
class LocalPost:
def __init__(self, post_file):
with, 'r', 'utf-8') as f:
self.lines = f.readlines()
# remove header and footer
while self.lines and '<article ' not in self.lines[0]:
del self.lines[0]
while self.lines and '</article>' not in self.lines[-1]:
del self.lines[-1]
self.file_name = os.path.split(post_file)[1]
self.ident = os.path.splitext(self.file_name)[0] = os.stat(post_file).st_mtime = time.localtime(
def get_post(self):
return u''.join(self.lines)
if __name__ == '__main__':
import optparse
parser = optparse.OptionParser("Usage: %prog [options] blog-name ...",
description="Makes a local backup of Tumblr blogs."
parser.add_option('-q', '--quiet', action='store_true',
help="suppress progress messages"
parser.add_option('-i', '--incremental', action='store_true',
help="incremental backup mode"
parser.add_option('-x', '--xml', action='store_true',
help="save the original XML source"
parser.add_option('-t', '--theme', action='store_true',
help="save the blog's theme (needs a ~/.netrc entry)"
parser.add_option('-b', '--blosxom', action='store_true',
help="save the posts in blosxom format"
parser.add_option('-r', '--reverse-month', action='store_false', default=True,
help="reverse the post order in the monthly archives"
parser.add_option('-R', '--reverse-index', action='store_false', default=True,
help="reverse the index file order"
parser.add_option('-a', '--auto', type='int', metavar="HOUR",
help="do a full backup at HOUR hours, otherwise do an incremental backup"
" (useful for cron jobs)"
parser.add_option('-n', '--count', type='int', help="save only COUNT posts")
parser.add_option('-s', '--skip', type='int', default=0,
help="skip the first SKIP posts"
parser.add_option('-p', '--period', help="limit the backup to PERIOD"
" ('y', 'm', 'd' or YYYY[MM[DD]])"
options, args = parser.parse_args()
if is not None:
if == time.localtime().tm_hour:
options.incremental = False
options.theme = True
options.incremental = True
options.theme = False
if options.period:
options.period = time.strftime(
{'y': '%Y', 'm': '%Y%m', 'd': '%Y%m%d'}[options.period]
except KeyError:
options.period = options.period.replace('-', '')
if len(options.period) not in (4, 6, 8):
parser.error("Period must be 'y', 'm', 'd' or YYYY[MM[DD]]")
if not args:
args = ['bbolli']
tb = TumblrBackup()
for account in args:
