Skip to content

Commit

Permalink
Merge pull request #727 from mydumper/fix_719
Browse files Browse the repository at this point in the history
sql file are added in parallel
  • Loading branch information
davidducos committed Jun 19, 2022
2 parents c3c3059 + d55fadf commit 506ef72
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 86 deletions.
1 change: 1 addition & 0 deletions src/myloader.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ int main(int argc, char *argv[]) {
g_critical("the specified directory %s is not a mydumper backup",directory);
exit(EXIT_FAILURE);
}
initialize_directory();
}
}
g_free(current_dir);
Expand Down
75 changes: 67 additions & 8 deletions src/myloader_directory.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "myloader_restore_job.h"
#include "myloader_control_job.h"

extern guint total_data_sql_files;
extern guint num_threads;
extern gboolean innodb_optimize_keys;
extern gboolean innodb_optimize_keys_per_table;
Expand All @@ -47,6 +48,13 @@ extern gchar *source_db;
extern gboolean skip_triggers;
extern gboolean no_data;

GAsyncQueue *data_filename_queue;
GAsyncQueue *data_filename_queue_completed;
void initialize_directory(){
data_filename_queue = g_async_queue_new();
data_filename_queue_completed = g_async_queue_new();
}

gint compare_by_time(gconstpointer a, gconstpointer b){
return
g_date_time_difference(((struct db_table *)a)->finish_time,((struct db_table *)a)->start_time) >
Expand Down Expand Up @@ -150,6 +158,9 @@ gboolean append_filename_to_list (
return TRUE;
}

gint compare_filename_part (gconstpointer a, gconstpointer b){
return ((struct restore_job *)a)->data.drj->part == ((struct restore_job *)b)->data.drj->part ? ((struct restore_job *)a)->data.drj->sub_part > ((struct restore_job *)b)->data.drj->sub_part : ((struct restore_job *)a)->data.drj->part > ((struct restore_job *)b)->data.drj->part ;
}

void load_directory_information(struct configuration *conf) {
GError *error = NULL;
Expand All @@ -176,53 +187,90 @@ void load_directory_information(struct configuration *conf) {
g_dir_close(dir);

gchar *f = NULL;
g_debug("Processing database files");
// CREATE DATABASE
while (schema_create_list){
f = schema_create_list->data;
process_database_filename(f, "create database");
schema_create_list=schema_create_list->next;
}

g_debug("Processing table schema files");
// CREATE TABLE
conf->table_hash = g_hash_table_new ( g_str_hash, g_str_equal );
while (create_table_list != NULL){
f = create_table_list->data;
process_table_filename(f);
create_table_list=create_table_list->next;
}

g_debug("Processing table data files");
// DATA FILES
while (data_files_list != NULL){
f = data_files_list->data;
process_data_filename(f);

g_async_queue_push(data_filename_queue, data_files_list->data);
// f = data_files_list->data;
// process_data_filename(f);
data_files_list=data_files_list->next;
}
guint j;
for(j=0;j< num_threads; j++){
g_async_queue_push(data_filename_queue, g_strdup("END") );
}
for(j=0;j< num_threads; j++){
g_async_queue_pop(data_filename_queue_completed);
}
g_debug("Sorting data files");
// SORT DATA FILES TO ENQUEUE
// iterates over the dbt to create the jobs in the dbt->queue
// and sorts the dbt for the conf->table_list
// in stream mode, it is not possible to sort the tables as
// we don't know the amount the rows, .metadata are sent at the end.
GList * table_list=NULL;
GHashTableIter iter;
gchar * lkey;
g_hash_table_iter_init ( &iter, conf->table_hash );
struct db_table *dbt=NULL;
while ( g_hash_table_iter_next ( &iter, (gpointer *) &lkey, (gpointer *) &dbt ) ) {
table_list=g_list_insert_sorted_with_data (table_list,dbt,&compare_dbt,conf->table_hash);
dbt->restore_job_list=g_list_sort(dbt->restore_job_list,&compare_filename_part);
GList *i=dbt->restore_job_list;
while (i) {
g_async_queue_push(dbt->queue, new_job(JOB_RESTORE ,i->data,dbt->real_database));
i=i->next;
}
dbt->count=g_list_length(dbt->restore_job_list);
total_data_sql_files+=dbt->count;
// g_debug("Setting count to: %d", dbt->count);
}
conf->table_list=table_list;
// conf->table needs to be set.

g_debug("Processing table metadata files");
// METADATA FILES
while (metadata_list != NULL){
f = metadata_list->data;
process_metadata_filename(conf->table_hash,f);
metadata_list=metadata_list->next;
}

g_debug("Processing view files");
while (view_list != NULL){
f = view_list->data;
process_schema_filename(f,"view");
view_list=view_list->next;
}

g_debug("Processing trigger files");
while (trigger_list != NULL){
f = trigger_list->data;
process_schema_filename(f, "trigger");
trigger_list=trigger_list->next;
}

g_debug("Processing post files");
while (post_list != NULL){
f = post_list->data;
process_schema_filename(f,"post");
post_list=post_list->next;
}

/*
g_debug("Sorting data files");
// SORT DATA FILES TO ENQUEUE
// iterates over the dbt to create the jobs in the dbt->queue
// and sorts the dbt for the conf->table_list
Expand All @@ -245,14 +293,25 @@ void load_directory_information(struct configuration *conf) {
}
conf->table_list=table_list;
// conf->table needs to be set.
*/
g_debug("Loading file completed");
}

void *process_directory_queue(struct thread_data * td) {
struct db_table *dbt=NULL;
struct control_job *job = NULL;
gboolean cont=TRUE;

// Step 0: Load data jobs
// DATA FILES
gchar * f= (gchar *)g_async_queue_pop(data_filename_queue);
while (g_strcmp0(f,"END") != 0){
process_data_filename(f);
f = (gchar *)g_async_queue_pop(data_filename_queue);
}
g_async_queue_push(data_filename_queue_completed, GINT_TO_POINTER(1) );
// Step 1: creating databases
cont=TRUE;
while (cont){
job = (struct control_job *)g_async_queue_pop(td->conf->database_queue);
cont=process_job(td, job);
Expand Down
1 change: 1 addition & 0 deletions src/myloader_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
Authors: David Ducos, Percona (david dot ducos at percona dot com)
*/
void initialize_directory();
void restore_from_directory(struct configuration *conf);
void *process_directory_queue(struct thread_data * td);
75 changes: 40 additions & 35 deletions src/myloader_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ extern gboolean stream;
extern guint max_threads_per_table;
extern gchar *directory;
extern guint errors;
extern guint total_data_sql_files;
extern gboolean no_delete;
extern GHashTable *tbl_hash;
extern gboolean innodb_optimize_keys;
Expand All @@ -63,39 +62,49 @@ struct db_table* append_new_db_table(char * filename, gchar * database, gchar *t
exit(EXIT_FAILURE);
}
gchar *lkey=g_strdup_printf("%s_%s",database, table);
g_mutex_lock(table_hash_mutex);
struct db_table * dbt=g_hash_table_lookup(table_hash,lkey);
g_free(lkey);
if (dbt == NULL){
dbt=g_new(struct db_table,1);
g_mutex_lock(table_hash_mutex);
//struct db_table * dbt=g_hash_table_lookup(table_hash,lkey);
dbt=g_hash_table_lookup(table_hash,lkey);
if (dbt == NULL){
dbt=g_new(struct db_table,1);
// dbt->filename=filename;
dbt->database=database;
dbt->database=database;
// This should be the only place where we should use `db ? db : `
dbt->real_database = g_strdup(db ? db : real_db_name);
dbt->table=table;
dbt->real_table=dbt->table;
dbt->rows=number_rows;
dbt->restore_job_list = NULL;
dbt->queue=g_async_queue_new();
dbt->current_threads=0;
dbt->max_threads=max_threads_per_table;
dbt->mutex=g_mutex_new();
dbt->indexes=alter_table_statement;
dbt->start_time=NULL;
dbt->start_index_time=NULL;
dbt->finish_time=NULL;
dbt->schema_created=FALSE;
dbt->constraints=NULL;
dbt->count=0;
g_hash_table_insert(table_hash, g_strdup_printf("%s_%s",dbt->database,dbt->table),dbt);
}else{
g_free(table);
g_free(database);
if (number_rows>0) dbt->rows=number_rows;
if (alter_table_statement != NULL) dbt->indexes=alter_table_statement;
dbt->real_database = g_strdup(db ? db : real_db_name);
dbt->table=table;
dbt->real_table=dbt->table;
dbt->rows=number_rows;
dbt->restore_job_list = NULL;
dbt->queue=g_async_queue_new();
dbt->current_threads=0;
dbt->max_threads=max_threads_per_table;
dbt->mutex=g_mutex_new();
dbt->indexes=alter_table_statement;
dbt->start_time=NULL;
dbt->start_index_time=NULL;
dbt->finish_time=NULL;
dbt->schema_created=FALSE;
dbt->constraints=NULL;
dbt->count=0;
g_hash_table_insert(table_hash, lkey, dbt);
}else{
g_free(table);
g_free(database);
g_free(lkey);
if (number_rows>0) dbt->rows=number_rows;
if (alter_table_statement != NULL) dbt->indexes=alter_table_statement;
// if (real_table != NULL) dbt->real_table=g_strdup(real_table);
}
g_mutex_unlock(table_hash_mutex);
}else{
g_free(table);
g_free(database);
g_free(lkey);
if (number_rows>0) dbt->rows=number_rows;
if (alter_table_statement != NULL) dbt->indexes=alter_table_statement;
}
g_mutex_unlock(table_hash_mutex);
return dbt;
}

Expand Down Expand Up @@ -415,13 +424,8 @@ void process_schema_filename(gchar *filename, const char * object) {
g_async_queue_push(conf->post_queue, new_job(JOB_RESTORE,rj,real_db_name));
}

gint compare_filename_part (gconstpointer a, gconstpointer b){
return ((struct restore_job *)a)->data.drj->part == ((struct restore_job *)b)->data.drj->part ? ((struct restore_job *)a)->data.drj->sub_part > ((struct restore_job *)b)->data.drj->sub_part : ((struct restore_job *)a)->data.drj->part > ((struct restore_job *)b)->data.drj->part ;
}

void process_data_filename(char * filename){
gchar *db_name, *table_name;
total_data_sql_files++;
// TODO: check if it is a data file
// TODO: we need to count sections of the data file to determine if it is ok.
guint part=0,sub_part=0;
Expand All @@ -436,10 +440,11 @@ void process_data_filename(char * filename){
return;
}
struct db_table *dbt=append_new_db_table(filename, db_name, table_name,0,conf->table_hash,NULL);
struct restore_job *rj = new_data_restore_job( g_strdup(filename), JOB_RESTORE_FILENAME, dbt, part, sub_part);
g_mutex_lock(dbt->mutex);
dbt->count++;
struct restore_job *rj = new_data_restore_job( g_strdup(filename), JOB_RESTORE_FILENAME, dbt, part, sub_part);
dbt->restore_job_list=g_list_insert_sorted(dbt->restore_job_list,rj,&compare_filename_part);
// dbt->restore_job_list=g_list_insert_sorted(dbt->restore_job_list,rj,&compare_filename_part);
dbt->restore_job_list=g_list_append(dbt->restore_job_list,rj);
g_mutex_unlock(dbt->mutex);
}

Expand Down

0 comments on commit 506ef72

Please sign in to comment.