Skip to content

Commit

Permalink
CB-230: Migrate dump_manager.py off the ORM
Browse files Browse the repository at this point in the history
  • Loading branch information
ferbncode authored and gentlecat committed May 31, 2017
1 parent b41bb04 commit 4092986
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 52 deletions.
170 changes: 118 additions & 52 deletions critiquebrainz/data/dump_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from flask import current_app, jsonify
from flask.json import JSONEncoder
from critiquebrainz.data.utils import create_path, remove_old_archives, get_columns, slugify, explode_db_uri
from critiquebrainz.data import model
from critiquebrainz.db import license as db_license, review as db_review
from critiquebrainz import frontend
from critiquebrainz.data import db
from critiquebrainz import db
from time import gmtime, strftime
from datetime import datetime
from functools import wraps
import subprocess
import sqlalchemy
import tempfile
import tarfile
import shutil
Expand All @@ -20,6 +21,47 @@
cli = click.Group()


_TABLES = {
"review": (
"id",
"entity_id",
"entity_type",
"user_id",
"edits",
"is_draft",
"is_hidden",
"license_id",
"language",
"source",
"source_url",
),
"revision": (
"id",
"review_id",
"timestamp",
"text",
),
"license": (
"id",
"full_name",
"info_url",
),
"user": (
"id",
"display_name",
"email",
"created",
"musicbrainz_id",
"show_gravatar",
"is_blocked",
"spam_reports",
"clients",
"grants",
"tokens"
),
}


def with_request_context(f):
@wraps(f)
def decorated(*args, **kwargs):
Expand All @@ -28,6 +70,15 @@ def decorated(*args, **kwargs):
return decorated


def has_data(table_name):
with db.engine.connect() as connection:
result = connection.execute(sqlalchemy.text("""
SELECT COUNT(*)
FROM "{table_name}"
""".format(table_name=table_name)))
return result.fetchone()[0] > 0


@cli.command()
@click.option("--location", "-l", default=os.path.join("/", "data", "export", "full"), show_default=True,
help="Directory where dumps need to be created")
Expand All @@ -45,7 +96,8 @@ def full_db(location, rotate=False):
create_path(location)

FILE_PREFIX = "cb-backup-"
db_hostname, db_port, db_name, db_username, db_password = explode_db_uri(current_app.config['SQLALCHEMY_DATABASE_URI'])
db_hostname, db_port, db_name, db_username, db_password = \
explode_db_uri(current_app.config['SQLALCHEMY_DATABASE_URI'])

print('Creating database dump in "%s"...' % location)

Expand Down Expand Up @@ -93,26 +145,26 @@ def json(location, rotate=False):
current_app.json_encoder = DumpJSONEncoder

print("Creating new archives...")
for license in model.License.query.all():
safe_name = slugify(license.id)
for license in db_license.list_licenses():
safe_name = slugify(license["id"])
with tarfile.open(os.path.join(location, "critiquebrainz-%s-%s-json.tar.bz2" %
(datetime.today().strftime('%Y%m%d'), safe_name)), "w:bz2") as tar:
temp_dir = tempfile.mkdtemp()
license_dir = os.path.join(temp_dir, safe_name)
create_path(license_dir)

# Finding release groups that have reviews with current license
query = db.session.query(model.Review.entity_id).group_by(model.Review.entity_id)
for entity in query.all():
entity = entity[0]
# Finding entities that have reviews with current license
entities = db_review.distinct_entities()
for entity in entities:
entity = str(entity)
# Creating directory structure and dumping reviews
dir_part = os.path.join(entity[0:1], entity[0:2])
reviews = model.Review.list(entity_id=entity, license_id=license.id)[0]
reviews = db_review.list_reviews(entity_id=entity, license_id=license["id"], limit=None)[0]
if len(reviews) > 0:
rg_dir = '%s/%s' % (license_dir, dir_part)
create_path(rg_dir)
f = open('%s/%s.json' % (rg_dir, entity), 'w+')
f.write(jsonify(reviews=[r.to_dict() for r in reviews]).data.decode("utf-8"))
f.write(jsonify(reviews=[db_review.to_dict(r) for r in reviews]).data.decode("utf-8"))
f.close()

tar.add(license_dir, arcname='reviews')
Expand Down Expand Up @@ -147,7 +199,8 @@ def public(location, rotate=False):
print("Creating public database dump...")
time_now = datetime.today()

cursor = db.session.connection().connection.cursor()
connection = db.engine.raw_connection()
cursor = connection.cursor()

# Creating a directory where all dumps will go
dump_dir = os.path.join(location, time_now.strftime('%Y%m%d-%H%M%S'))
Expand All @@ -159,7 +212,7 @@ def public(location, rotate=False):
with open(os.path.join(temp_dir, 'TIMESTAMP'), 'w') as f:
f.write(time_now.isoformat(' '))
with open(os.path.join(temp_dir, 'SCHEMA_SEQUENCE'), 'w') as f:
f.write(str(model.__version__))
f.write(str(db.SCHEMA_VERSION))

# BASE ARCHIVE
# Archiving stuff that is independent from licenses (users, licenses)
Expand All @@ -173,7 +226,7 @@ def public(location, rotate=False):
with open(os.path.join(base_archive_tables_dir, 'user_sanitised'), 'w') as f:
cursor.copy_to(f, '"user"', columns=('id', 'created', 'display_name', 'musicbrainz_id'))
with open(os.path.join(base_archive_tables_dir, 'license'), 'w') as f:
cursor.copy_to(f, 'license', columns=get_columns(model.License))
cursor.copy_to(f, 'license', columns=_TABLES["license"])
tar.add(base_archive_tables_dir, arcname='cbdump')

# Including additional information about this archive
Expand All @@ -189,19 +242,20 @@ def public(location, rotate=False):

# 1. COMBINED
# Archiving all reviews (any license)
REVISION_COMBINED_SQL = "SELECT %s FROM revision JOIN review " \
"ON review.id = revision.review_id " \
"WHERE review.is_hidden = false AND review.is_draft = false" \
% ', '.join(['revision.' + col for col in get_columns(model.Revision)])
REVISION_COMBINED_SQL = """
SELECT {columns} FROM revision JOIN review
ON review.id = revision.review_id
WHERE review.is_hidden = false AND review.is_draft = false
""".format(columns=', '.join(['revision.' + col for col in _TABLES["revision"]]))
with tarfile.open(os.path.join(dump_dir, "cbdump-reviews-all.tar.bz2"), "w:bz2") as tar:
# Dumping tables
reviews_combined_tables_dir = os.path.join(temp_dir, 'cbdump-reviews-all')
create_path(reviews_combined_tables_dir)
with open(os.path.join(reviews_combined_tables_dir, 'review'), 'w') as f:
cursor.copy_to(f, "(SELECT %s FROM review WHERE is_hidden = false AND is_draft = false)" %
(', '.join(get_columns(model.Review))))
cursor.copy_to(f, "(SELECT {columns} FROM review WHERE is_hidden = false AND is_draft = false)"
.format(columns=', '.join(_TABLES["review"])))
with open(os.path.join(reviews_combined_tables_dir, 'revision'), 'w') as f:
cursor.copy_to(f, "(%s)" % REVISION_COMBINED_SQL)
cursor.copy_to(f, "({sql})".format(sql=REVISION_COMBINED_SQL))
tar.add(reviews_combined_tables_dir, arcname='cbdump')

# Including additional information about this archive
Expand All @@ -214,18 +268,23 @@ def public(location, rotate=False):

# 2. SEPARATE
# Creating separate archives for each license
REVISION_SEPARATE_SQL = REVISION_COMBINED_SQL + " AND review.license_id ='%s'"
for license in model.License.query.all():
safe_name = slugify(license.id)
for license in db_license.list_licenses():
safe_name = slugify(license["id"])
with tarfile.open(os.path.join(dump_dir, "cbdump-reviews-%s.tar.bz2" % safe_name), "w:bz2") as tar:
# Dumping tables
tables_dir = os.path.join(temp_dir, safe_name)
create_path(tables_dir)
with open(os.path.join(tables_dir, 'review'), 'w') as f:
cursor.copy_to(f, "(SELECT %s FROM review WHERE is_hidden = false AND is_draft = false " \
"AND license_id = '%s')" % (', '.join(get_columns(model.Review)), license.id))
cursor.copy_to(f, """(
SELECT {columns}
FROM review
WHERE is_hidden = false
AND is_draft = false
AND license_id = '{license_id}'
)""".format(columns=', '.join(_TABLES["review"]), license_id=license["id"]))
with open(os.path.join(tables_dir, 'revision'), 'w') as f:
cursor.copy_to(f, "(%s)" % (REVISION_SEPARATE_SQL % license.id))
cursor.copy_to(f, """({REVISION_COMBINED_SQL} AND review.license_id='{license_id}')"""
.format(REVISION_COMBINED_SQL=REVISION_COMBINED_SQL, license_id=license["id"]))
tar.add(tables_dir, arcname='cbdump')

# Including additional information about this archive
Expand All @@ -236,6 +295,7 @@ def public(location, rotate=False):
print(" + %s/cbdump-reviews-%s.tar.bz2" % (dump_dir, safe_name))

shutil.rmtree(temp_dir) # Cleanup
connection.close()

if rotate:
print("Removing old dumps (except two latest)...")
Expand Down Expand Up @@ -269,47 +329,53 @@ def importer(archive):
try:
with open(os.path.join(temp_dir, 'SCHEMA_SEQUENCE')) as f:
archive_version = f.readline()
if archive_version != str(model.__version__):
if archive_version != str(db.SCHEMA_VERSION):
sys.exit("Incorrect schema version! Expected: %d, got: %c."
"Please, get the latest version of the dump."
% (model.__version__, archive_version))
% (db.SCHEMA_VERSION, archive_version))
except IOError as exception:
if exception.errno == errno.ENOENT:
print("Can't find SCHEMA_SEQUENCE in the specified archive. Importing might fail.")
else:
sys.exit("Failed to open SCHEMA_SEQUENCE file. Error: %s" % exception)

# Importing data
import_data(os.path.join(temp_dir, 'cbdump', 'user_sanitised'), model.User,
('id', 'created', 'display_name', 'musicbrainz_id'))
import_data(os.path.join(temp_dir, 'cbdump', 'license'), model.License)
import_data(os.path.join(temp_dir, 'cbdump', 'review'), model.Review)
import_data(os.path.join(temp_dir, 'cbdump', 'revision'), model.Revision)
import_data(os.path.join(temp_dir, 'cbdump', 'user_sanitised'), 'user',
columns=('id', 'created', 'display_name', 'musicbrainz_id'))
import_data(os.path.join(temp_dir, 'cbdump', 'license'), 'license')
import_data(os.path.join(temp_dir, 'cbdump', 'review'), 'review')
import_data(os.path.join(temp_dir, 'cbdump', 'revision'), 'revision')

shutil.rmtree(temp_dir) # Cleanup
print("Done!")


def import_data(file_name, model, columns=None):
db_connection = db.session.connection().connection
cursor = db_connection.cursor()
def import_data(file_name, table_name, columns=None):

connection = db.engine.raw_connection()
try:
with open(file_name) as f:
# Checking if table already contains any data
if model.query.count() > 0:
print("Table %s already contains data. Skipping." % model.__tablename__)
return
# and if it doesn't, trying to import data
print("Importing data into %s table." % model.__tablename__)
cursor = connection.cursor()

# Checking if table already contains any data
if has_data(table_name):
print("Table %s already contains data. Skipping." % table_name)
return

# Checking if the specified file exists or if the file is empty
if not os.path.exists(file_name) or os.stat(file_name).st_size == 0:
print("Can't find data file for %s table. Skipping." % table_name)
return

# and if it doesn't, trying to import data
print("Importing data into %s table." % table_name)
with open(file_name, 'r') as f:
if columns is None:
columns = get_columns(model)
cursor.copy_from(f, '"%s"' % model.__tablename__, columns=columns)
db_connection.commit()
except IOError as exception:
if exception.errno == errno.ENOENT:
print("Can't find data file for %s table. Skipping." % model.__tablename__)
else:
sys.exit("Failed to open data file. Error: %s" % exception)
columns = _TABLES[table_name]
cursor.copy_from(f, '"{table_name}"'.format(table_name=table_name), columns=columns)
connection.commit()

finally:
connection.close()


class DumpJSONEncoder(JSONEncoder):
Expand Down
21 changes: 21 additions & 0 deletions critiquebrainz/db/license.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,24 @@ def delete(*, id):
"""), {
"id": id,
})


def list_licenses():
"""Get a list of licenses.
Returns:
List of dictionaries with the following structure
{
"id": (str),
"info_url": (str),
"full_name": (str),
}
"""
with db.engine.connect() as connection:
results = connection.execute(sqlalchemy.text("""
SELECT id,
info_url,
full_name
FROM license
"""))
return [dict(row) for row in results.fetchall()]
13 changes: 13 additions & 0 deletions critiquebrainz/db/license_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,16 @@ def test_delete_license(self):
info_url="www.example.com",
)
db_license.delete(id=license["id"])

def test_list_licenses(self):
license = db_license.create(
id="test",
full_name="Test license",
info_url="www.example.com",
)
licenses = db_license.list_licenses()
self.assertDictEqual({
"id": "test",
"full_name": "Test license",
"info_url":"www.example.com"
}, licenses[0])
14 changes: 14 additions & 0 deletions critiquebrainz/db/review.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,17 @@ def delete(review_id):
"""), {
"review_id": review_id,
})


def distinct_entities():
"""Get a set of ID(s) of entities reviewed.
Returns:
Set of ID(s) of distinct entities reviewed.
"""
with db.engine.connect() as connection:
results = connection.execute(sqlalchemy.text("""
SELECT DISTINCT entity_id
FROM review
"""))
return {row[0] for row in results.fetchall()}
24 changes: 24 additions & 0 deletions critiquebrainz/db/review_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def setUp(self):
self.user = User(db_users.get_or_create("Tester", new_user_data={
"display_name": "test user",
}))
self.user_2 = User(db_users.get_or_create("Tester 2", new_user_data={
"display_name": "test user 2",
}))

# And license
self.license = db_license.create(
id=u'Test',
Expand Down Expand Up @@ -212,3 +216,23 @@ def test_get_count(self):
)
count = db_review.get_count(is_draft=False, is_hidden=False)
self.assertEqual(count, 1)

def test_distinct_entities(self):
review = db_review.create(
user_id=self.user.id,
entity_id="e7aad618-fa86-3983-9e77-405e21796eca",
entity_type="release_group",
text="Awesome",
is_draft=False,
license_id=self.license["id"],
)
review = db_review.create(
user_id=self.user_2.id,
entity_id="e7aad618-fa86-3983-9e77-405e21796eca",
entity_type="release_group",
text="Awesome Album",
is_draft=False,
license_id=self.license["id"],
)
entities = db_review.distinct_entities()
self.assertEqual(len(entities), 1)

0 comments on commit 4092986

Please sign in to comment.