Skip to content
This repository
Browse code

acoustid replication sync script

  • Loading branch information...
commit 4e4a1f651df0b57910f18fbf064c95ec5096a0a8 1 parent 74724b4
Adamansky Anton adamansky authored
3  acoustid.conf.dist
@@ -20,3 +20,6 @@ level=WARN
20 20 base_url=http://acoustid.org/
21 21 base_https_url=https://acoustid.org/
22 22 secret=XXX
  23 +
  24 +[replication]
  25 +import_acoustid=http://data.acoustid.org/replication/acoustid-update-{seq}.xml.bz2
14 acoustid/config.py
@@ -115,6 +115,18 @@ def read(self, parser, section):
115 115 self.secret = parser.get(section, 'secret')
116 116
117 117
  118 +class ReplicationConfig(object):
  119 + def __init__(self):
  120 + self.import_acoustid = None
  121 + self.import_acoustid_musicbrainz = None
  122 +
  123 + def read(self, parser, section):
  124 + if parser.has_option(section, 'import_acoustid'):
  125 + self.import_acoustid = parser.get(section, 'import_acoustid')
  126 + if parser.has_option(section, 'import_acoustid_musicbrainz'):
  127 + self.import_acoustid_musicbrainz = parser.get(section, 'import_acoustid_musicbrainz')
  128 +
  129 +
118 130 class Config(object):
119 131
120 132 def __init__(self, path):
@@ -129,4 +141,6 @@ def __init__(self, path):
129 141 self.website.read(parser, 'website')
130 142 self.index = IndexConfig()
131 143 self.index.read(parser, 'index')
  144 + self.replication = ReplicationConfig()
  145 + self.replication.read(parser, 'replication')
132 146
0  acoustid/xml/__init__.py
No changes.
121 acoustid/xml/digester.py
... ... @@ -0,0 +1,121 @@
  1 +# -*- coding: utf-8 -*-
  2 +
  3 +import xml.sax
  4 +from xml.sax import ContentHandler
  5 +from xml.sax.handler import ErrorHandler
  6 +
  7 +class Rule(object):
  8 + def __init__(self):
  9 + self._storedAttrs = None
  10 +
  11 + def begin(self, tag, attrs):
  12 + pass
  13 +
  14 + def body(self, tag, attrs, text):
  15 + pass
  16 +
  17 + def end(self, tag):
  18 + pass
  19 +
  20 + def finish(self):
  21 + pass
  22 +
  23 +
  24 +class FuncRule(Rule):
  25 + def __init__(self, bf=None, bbf=None, ef=None, ff=None):
  26 + Rule.__init__(self)
  27 + if bf is not None:
  28 + self.begin = bf
  29 + if bbf is not None:
  30 + self.body = bbf
  31 + if ef is not None:
  32 + self.end = ef
  33 + if ff is not None:
  34 + self.finish = ff
  35 +
  36 +
  37 +class Digester(ContentHandler):
  38 + def __init__(self):
  39 + ContentHandler.__init__(self)
  40 + self.reset()
  41 +
  42 + def addAll(self, path, bf, bbf, ef, ff):
  43 + self.addRule(path, FuncRule(bf, bbf, ef, ff))
  44 +
  45 + def addOnBeginAndEnd(self, path, bf, ef):
  46 + self.addRule(path, FuncRule(bf, None, ef, None))
  47 +
  48 + def addOnBegin(self, path, bf):
  49 + self.addRule(path, FuncRule(bf, None, None, None))
  50 +
  51 + def addOnBody(self, path, bbf):
  52 + self.addRule(path, FuncRule(None, bbf, None, None))
  53 +
  54 + def addOnEnd(self, path, ef):
  55 + self.addRule(path, FuncRule(None, None, ef, None))
  56 +
  57 + def addOnFinish(self, ff):
  58 + self.addRule("/", FuncRule(None, None, None, ff))
  59 +
  60 + def addRule(self, path, rule):
  61 + rslot = self._rules.get(path)
  62 + if rslot is None:
  63 + rslot = self._rules[path] = []
  64 + rslot.append(rule)
  65 +
  66 + def parse(self, input, errorHandler = ErrorHandler()):
  67 + xml.sax.parse(input, self, errorHandler)
  68 +
  69 + def pop(self):
  70 + return self._stack.pop()
  71 +
  72 + def push(self, item):
  73 + return self._stack.append(item)
  74 +
  75 + def reset(self):
  76 + self._rules = {}
  77 + self._stack = []
  78 + self._path = []
  79 + self._cdata = []
  80 + self._active_rules = 0
  81 +
  82 + def peek(self):
  83 + return self._stack[-1:][0] if len(self._stack) > 0 else None
  84 +
  85 + def startDocument(self):
  86 + self._path = []
  87 +
  88 + def endDocument(self):
  89 + assert self._active_rules == 0
  90 + assert len(self._path) == 0
  91 + for a in self._rules.itervalues():
  92 + for r in [x for x in a if x and hasattr(x.finish, '__call__')]:
  93 + r.finish()
  94 +
  95 + def startElement(self, tag, attrs):
  96 + self._path.append(tag)
  97 + if len(self._cdata) > 0:
  98 + self._cdata = []
  99 + rset = self._rules.get('/'.join(self._path))
  100 + for r in rset if rset is not None else []:
  101 + self._active_rules += 1
  102 + r._storedAttrs = attrs.copy()
  103 + r.begin(tag, attrs)
  104 + return
  105 +
  106 + def characters(self, content):
  107 + if content is not None and self._active_rules > 0:
  108 + self._cdata.append(content)
  109 +
  110 + def endElement(self, tag):
  111 + rset = self._rules.get('/'.join(self._path))
  112 + if rset is not None:
  113 + body = ''.join(self._cdata) if len(self._cdata) > 0 else ''
  114 + for r in rset:
  115 + self._active_rules -= 1
  116 + r.body(tag, r._storedAttrs, body)
  117 + r.end(tag)
  118 + self._path.pop()
  119 + if len(self._cdata) > 0:
  120 + self._cdata = []
  121 +
7 admin/run-sync-acoustid.sh
... ... @@ -0,0 +1,7 @@
  1 +#!/bin/sh
  2 +
  3 +DIR=`dirname $0`/..
  4 +export PYTHONPATH=${DIR}
  5 +${DIR}/scripts/acoustid_sync.py -c ${DIR}/acoustid.conf
  6 +exit $?
  7 +
235 scripts/acoustid_sync.py
... ... @@ -0,0 +1,235 @@
  1 +#!/usr/bin/env python
  2 +# -*- coding: utf-8 -*-
  3 +from collections import OrderedDict
  4 +import os
  5 +
  6 +import urllib2
  7 +import logging
  8 +import tempfile
  9 +import shutil
  10 +import subprocess
  11 +import psycopg2.extensions
  12 +
  13 +from contextlib import closing
  14 +from bz2 import BZ2File
  15 +from xml.sax._exceptions import SAXParseException
  16 +from xml.sax.xmlreader import InputSource
  17 +
  18 +from acoustid.xml.digester import Digester
  19 +from acoustid.script import run_script
  20 +
  21 +
  22 +logger = logging.getLogger(__name__)
  23 +
  24 +class DataImporter(Digester):
  25 + def __init__(self, ictx, file):
  26 + Digester.__init__(self)
  27 + self._ictx = ictx
  28 + self._file = file
  29 + self._input = InputSource(file.name)
  30 + self._input.setByteStream(BZ2File(file.name, 'r'))
  31 + self._conn = ictx['conn'].connection
  32 + self._cursor = self._conn.cursor()
  33 + self.success = self._closed = False
  34 + self._add_rules()
  35 +
  36 + def _add_rules(self):
  37 + self.addOnBegin('packet', self._check_packet)
  38 + self.addOnBeginAndEnd('packet/transaction/event', self._on_event, self._on_event_end)
  39 + self.addOnBody('packet/transaction/event/keys/column', self._on_key_column)
  40 + self.addOnBody('packet/transaction/event/values/column', self._on_value_column)
  41 + self.addOnFinish(self._on_finish)
  42 +
  43 + def _check_packet(self, tag, attrs):
  44 + if self._ictx['schema_seq'] != int(attrs.getValue('schema_seq')):
  45 + raise Exception('<packet> schema_seq: {0} not matched the expected seq number {1}',
  46 + attrs.getValue('schema_seq'), self._ictx['replication_seq'])
  47 +
  48 + if self._ictx['replication_seq'] != int(attrs.getValue('replication_seq')):
  49 + raise Exception('<packet> replication_seq: {0} not matched the expected seq number {1}',
  50 + attrs.getValue('replication_seq'), self._ictx['replication_seq'])
  51 +
  52 + def _on_key_column(self, tag, attrs, val):
  53 + event = self.peek()
  54 + event['keys'][attrs.getValue('name')] = val
  55 +
  56 + def _on_value_column(self, tag, attrs, val):
  57 + event = self.peek()
  58 + isNull = attrs.getValue("null") if attrs.has_key('null') else None
  59 + event['values'][attrs.getValue('name')] = val if isNull != "yes" else None
  60 +
  61 + def _on_event(self, tag, attrs):
  62 + event = {
  63 + 'op': attrs.getValue('op'),
  64 + 'table': attrs.getValue('table'),
  65 + 'keys': OrderedDict(), #array of tuples column name -> column val
  66 + 'values': OrderedDict() #array of tuples column name -> column val
  67 + }
  68 + self.push(event)
  69 +
  70 + def _on_event_end(self, tag):
  71 + event = self.pop()
  72 + type = event['op']
  73 + table = event['table']
  74 + keys = event['keys']
  75 + values = event['values']
  76 + params = []
  77 + if type == 'I':
  78 + sql_columns = ', '.join(values.keys())
  79 + sql_values = ', '.join(['%s'] * len(values))
  80 + sql = 'INSERT INTO %s (%s) VALUES (%s)' % (table, sql_columns, sql_values)
  81 + params = values.values()
  82 + elif type == 'U':
  83 + sql_values = ', '.join('%s=%%s' % i for i in values)
  84 + sql = 'UPDATE %s SET %s' % (table, sql_values)
  85 + params = values.values()
  86 + elif type == 'D':
  87 + sql = 'DELETE FROM %s' % table
  88 + else:
  89 + raise Exception('Invalid <event> op: %s' % type)
  90 +
  91 + if type == 'D' or type == 'U':
  92 + sql += ' WHERE ' + ' AND '.join('%s%s%%s' % (i, ' IS ' if keys[i] is None else '=') for i in keys.keys())
  93 + params.extend(keys.values())
  94 +
  95 + #print '%s %s' % (sql, params)
  96 + self._cursor.execute(sql, params)
  97 +
  98 + def _on_finish(self):
  99 + pass
  100 +
  101 + def load(self):
  102 + logger.warning('Saving dataset....')
  103 + self.parse(self._input)
  104 + self.success = True
  105 +
  106 + def recover(self):
  107 + """ This is duty hack to remove weird characters presented in some replications files.
  108 + Using the tidy tool.
  109 + """
  110 + logger.warning('Trying to recover invalid XML...')
  111 + originalXML = None
  112 + fixedXML = None
  113 + try:
  114 + originalXML = tempfile.NamedTemporaryFile(suffix='.xml', delete=False) #bunzipped tmp
  115 + fixedXML = tempfile.NamedTemporaryFile(suffix='.xml', delete=False) #fixed tmp
  116 + fixedXML.close()
  117 +
  118 + #Fetch uncompressed file data to recover
  119 + bzf = self._input.getByteStream()
  120 + bzf.seek(0)
  121 + shutil.copyfileobj(bzf, originalXML)
  122 + originalXML.close()
  123 +
  124 + cmd = ['tidy', '-xml', '-o', fixedXML.name, originalXML.name]
  125 + logger.warning('Running: %s', ' '.join(cmd))
  126 + ret = subprocess.call(cmd)
  127 + if ret:
  128 + #raise Exception('Failed to fix XML data, ret=%s' % ret)
  129 + pass
  130 +
  131 + #ready to load
  132 + self.close()
  133 + self._file = file(fixedXML.name, 'r')
  134 + self._input = InputSource(fixedXML.name)
  135 + self._input.setByteStream(self._file)
  136 + self._cursor = self._conn.cursor()
  137 + self.success = self._closed = False
  138 + self.reset()
  139 + self._add_rules()
  140 + self.load()
  141 + finally:
  142 + for f in [originalXML, fixedXML]:
  143 + if f and not f.closed:
  144 + f.close()
  145 + if f and os.path.exists(f.name):
  146 + os.unlink(f.name)
  147 +
  148 +
  149 + def close(self):
  150 + if self._closed:
  151 + return
  152 + try:
  153 + if self.success:
  154 + self._conn.commit()
  155 + logger.warning('Done')
  156 + else:
  157 + logger.warning('Rolling back transaction. Seq number: {0}'.format(self._ictx['replication_seq']))
  158 + self._conn.rollback()
  159 + self._cursor.close()
  160 + finally:
  161 + self._closed = True
  162 + self._file.close()
  163 +
  164 +
  165 +def download_arch(url):
  166 + logger.warning('Downloading: %s', url)
  167 + try:
  168 + data = urllib2.urlopen(url)
  169 + except urllib2.HTTPError, e:
  170 + if e.code == 404:
  171 + logger.warning('Resource %s not found', url)
  172 + return None
  173 + raise
  174 + except urllib2.URLError, e:
  175 + if '[Errno 2]' in str(e.reason):
  176 + logger.warning('Resource %s not found', url)
  177 + return None
  178 + raise
  179 + tmp = tempfile.NamedTemporaryFile(suffix='.xml.bz2')
  180 + shutil.copyfileobj(data, tmp)
  181 + data.close()
  182 + tmp.seek(0)
  183 + logger.debug('Stored in %s', tmp.name)
  184 + return tmp
  185 +
  186 +
  187 +def sync(script, ictx):
  188 + if script.config.replication.import_acoustid is None:
  189 + err = 'Missing required \'import_acoustid\' configuration parameter in [replication]'
  190 + logger.error(err)
  191 + exit(1)
  192 +
  193 + rseq = ictx['replication_seq']
  194 + while True:
  195 + rseq += 1
  196 + ictx['replication_seq'] = rseq
  197 + url = script.config.replication.import_acoustid.format(seq=rseq)
  198 + arch = download_arch(url)
  199 + if arch is None:
  200 + logger.warning('Stopped on seq %d', rseq)
  201 + break
  202 + di = DataImporter(ictx, arch)
  203 + with closing(di):
  204 + try:
  205 + di.load()
  206 + except SAXParseException, spe:
  207 + logger.error('XML data parsing error. %s URL: %s', spe, url)
  208 + di.recover()
  209 +
  210 +
  211 +def main(script, opts, args):
  212 + conn = script.engine.connect()
  213 + conn.detach()
  214 + with closing(conn):
  215 + conn.connection.rollback()
  216 + conn.connection.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
  217 + cursor = conn.connection.cursor()
  218 + with closing(cursor):
  219 + cursor.execute('''SELECT current_schema_sequence,
  220 + current_replication_sequence FROM replication_control''')
  221 + schema_seq, replication_seq = cursor.fetchone()
  222 + conn.connection.commit()
  223 +
  224 + ictx = {
  225 + 'schema_seq': schema_seq,
  226 + 'replication_seq': replication_seq,
  227 + 'script': script,
  228 + 'conn': conn,
  229 + }
  230 + sync(script, ictx)
  231 +
  232 +
  233 +if __name__ == '__main__':
  234 + run_script(main)
  235 +
8 sql/CreateTables.sql
... ... @@ -1,5 +1,13 @@
1 1 BEGIN;
2 2
  3 +
  4 +CREATE TABLE replication_control (
  5 + id SERIAL,
  6 + current_schema_sequence INTEGER NOT NULL,
  7 + current_replication_sequence INTEGER,
  8 + last_replication_date TIMESTAMP WITH TIME ZONE
  9 +);
  10 +
3 11 CREATE TABLE account (
4 12 id serial NOT NULL,
5 13 name varchar NOT NULL,

0 comments on commit 4e4a1f6

Please sign in to comment.
Something went wrong with that request. Please try again.