New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AB-367: Create dump archive in specific order #305

Merged
merged 5 commits into from Oct 24, 2018
Copy path View file
@@ -154,7 +154,7 @@
"classes",
"concluded",
),
"dataset_eval_challenge":(
"dataset_eval_challenge": (
"dataset_eval_job",
"challenge_id",
"result",
@@ -178,7 +178,7 @@ def dump_db(location, threads=None, incremental=False, dump_id=None):
Args:
location: Directory where archive will be created.
threads: Maximal number of threads to run during compression.
threads: Maximum number of threads to run during compression.
incremental: False if resulting data dump should be complete, True if
it needs to be incremental.
dump_id: If you need to reproduce previously created incremental dump,
@@ -260,43 +260,59 @@ def _copy_table(cursor, location, table_name, query):
cursor.copy_expert(copy_query, f)
def _copy_dataset_tables(location, start_time=None, end_time=None):
""" Copies datasets tables into seperate files withing a specified location (directory).
def _add_file_to_tar_and_delete(location, archive_name, tar, filename):
"""Add a file in `location` to an open TarFile at location `archive_name` and then
delete the file from disk."""
tar.add(os.path.join(location, filename),
arcname=os.path.join(archive_name, "abdump", filename))
os.remove(os.path.join(location, filename))
def _copy_dataset_tables(location, tar, archive_name, start_time=None, end_time=None):
""" Copy datasets tables into separate files within a specified location (directory).
"""
connection = db.engine.raw_connection()
try:
cursor = connection.cursor()
# dataset
_copy_table(cursor, location, "dataset", "SELECT %s FROM dataset" %
(", ".join(_DATASET_TABLES["dataset"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "dataset")
# dataset_class
_copy_table(cursor, location, "dataset_class", "SELECT %s FROM dataset_class" %
(", ".join(_DATASET_TABLES["dataset_class"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "dataset_class")
# dataset_class_member
_copy_table(cursor, location, "dataset_class_member", "SELECT %s FROM dataset_class_member" %
(", ".join(_DATASET_TABLES["dataset_class_member"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "dataset_class_member")
# dataset_snapshot
_copy_table(cursor, location, "dataset_snapshot", "SELECT %s FROM dataset_snapshot" %
(", ".join(_DATASET_TABLES["dataset_snapshot"])))
# dataset_eval_jobs
_copy_table(cursor, location, "dataset_eval_jobs", "SELECT %s FROM dataset_eval_jobs" %
(", ".join(_DATASET_TABLES["dataset_eval_jobs"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "dataset_snapshot")
# dataset_eval_sets
_copy_table(cursor, location, "dataset_eval_sets", "SELECT %s FROM dataset_eval_sets" %
(", ".join(_DATASET_TABLES["dataset_eval_sets"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "dataset_eval_sets")
# dataset_eval_jobs
_copy_table(cursor, location, "dataset_eval_jobs", "SELECT %s FROM dataset_eval_jobs" %
(", ".join(_DATASET_TABLES["dataset_eval_jobs"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "dataset_eval_jobs")
# challenge
_copy_table(cursor, location, "challenge", "SELECT %s FROM challenge" %
(", ".join(_DATASET_TABLES["challenge"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "challenge")
# dataset_eval_challenge
_copy_table(cursor, location, "dataset_eval_challenge", "SELECT %s FROM dataset_eval_challenge" %
(", ".join(_DATASET_TABLES["dataset_eval_challenge"])))
_add_file_to_tar_and_delete(location, archive_name, tar, "dataset_eval_challenge")
finally:
connection.close()
@@ -331,10 +347,12 @@ def generate_where(row_name, start_t=start_time, end_t=end_time):
# version
_copy_table(cursor, location, "version", "SELECT %s FROM version %s" %
(", ".join(_TABLES["version"]), generate_where("created")))
_add_file_to_tar_and_delete(location, archive_name, tar, "version")
# lowlevel
_copy_table(cursor, location, "lowlevel", "SELECT %s FROM lowlevel %s" %
(", ".join(_TABLES["lowlevel"]), generate_where("submitted")))
_add_file_to_tar_and_delete(location, archive_name, tar, "lowlevel")
# lowlevel_json
query = "SELECT %s FROM lowlevel_json WHERE id IN (SELECT id FROM lowlevel %s) ORDER BY id LIMIT {limit} OFFSET {offset}" \
@@ -345,29 +363,33 @@ def generate_where(row_name, start_t=start_time, end_t=end_time):
query = "SELECT %s FROM model %s" \
% (", ".join(_TABLES["model"]), generate_where("date"))
_copy_table(cursor, location, "model", query)
_add_file_to_tar_and_delete(location, archive_name, tar, "model")
# highlevel
_copy_table(cursor, location, "highlevel", "SELECT %s FROM highlevel %s" %
(", ".join(_TABLES["highlevel"]), generate_where("submitted")))
_add_file_to_tar_and_delete(location, archive_name, tar, "highlevel")
# highlevel_meta
query = "SELECT %s FROM highlevel_meta WHERE id IN (SELECT id FROM highlevel %s)" \
% (", ".join(_TABLES["highlevel_meta"]), generate_where("submitted"))
_copy_table(cursor, location, "highlevel_meta", query)
_add_file_to_tar_and_delete(location, archive_name, tar, "highlevel_meta")
# highlevel_model
query = "SELECT %s FROM highlevel_model WHERE id IN (SELECT id FROM highlevel %s) ORDER BY id LIMIT {limit} OFFSET {offset}" \
query = "SELECT %s FROM highlevel_model WHERE highlevel IN (SELECT id FROM highlevel %s) ORDER BY id LIMIT {limit} OFFSET {offset}" \
% (", ".join(_TABLES["highlevel_model"]), generate_where("submitted"))
_copy_table_into_multiple_files(cursor, "highlevel_model", query, tar, archive_name)
# statistics
_copy_table(cursor, location, "statistics", "SELECT %s FROM statistics %s" %
(", ".join(_TABLES["statistics"]), generate_where("collected")))
_add_file_to_tar_and_delete(location, archive_name, tar, "statistics")
# incremental_dumps
_copy_table(cursor, location, "incremental_dumps", "SELECT %s FROM incremental_dumps %s" %
(", ".join(_TABLES["incremental_dumps"]), generate_where("created")))
_add_file_to_tar_and_delete(location, archive_name, tar, "incremental_dumps")
finally:
connection.close()
@@ -465,7 +487,7 @@ def import_db_dump(archive_path, tables):
latest_file_num_imported[table_name] = file_num
logging.info(" - Importing data from file %s into %s table..." % (file_name, table_name))
cursor.copy_from(tar.extractfile(member), '"%s"' % table_name,
columns=_TABLES[table_name])
columns=_TABLES[table_name])
elif file_name in table_names:
logging.info(" - Importing data into %s table..." % file_name)
@@ -851,10 +873,9 @@ def _dump_tables(archive_path, threads, dataset_dump, time_now, start_t=None, en
archive_tables_dir = os.path.join(temp_dir, "abdump", "abdump")
utils.path.create_path(archive_tables_dir)
if dataset_dump:
_copy_dataset_tables(archive_tables_dir, start_t, end_t)
_copy_dataset_tables(archive_tables_dir, tar, archive_name, start_t, end_t)
else:
_copy_tables(archive_tables_dir,tar, archive_name, start_t, end_t)
tar.add(archive_tables_dir, arcname=os.path.join(archive_name, "abdump"))
_copy_tables(archive_tables_dir, tar, archive_name, start_t, end_t)
shutil.rmtree(temp_dir)
@@ -867,6 +888,7 @@ def dump_dataset_tables(location, threads=None):
Args:
location: Directory where archive will be created.
threads: Maximum number of threads to run during compression
Returns:
Path to created dump.
"""
Copy path View file
@@ -109,15 +109,26 @@ def import_data(archive, drop_constraints=False):
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'drop_primary_keys.sql'))
print('Importing data...')
db.dump.import_db_dump(archive)
db.dump.import_dump(archive)
print('Done!')
if drop_constraints:
print('Creating primary key and foreign key constraints...')
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_primary_keys.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'create_foreign_keys.sql'))
@cli.command()
@click.option("--drop-constraints", "-d", is_flag=True, help="Drop primary and foreign keys before importing.")
@click.argument("archive", type=click.Path(exists=True))
def import_dataset_data(archive):
def import_dataset_data(archive, drop_constraints=False):
"""Imports dataset dump into the database."""
if drop_constraints:
print('Dropping primary key and foreign key constraints...')
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'drop_foreign_keys.sql'))
db.run_sql_script(os.path.join(ADMIN_SQL_DIR, 'drop_primary_keys.sql'))
print('Importing dataset data...')
db.dump.import_datasets_dump(archive)
print('Done!')
ProTip! Use n and p to navigate between commits in a pull request.