Skip to content
This repository has been archived by the owner on May 10, 2023. It is now read-only.

Added num_procs option to config file, allowing for parallel processing #56

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions mysql2pgsql/lib/config.py
Expand Up @@ -44,17 +44,17 @@ def reset_configfile(self, file_path):
port: 3306
socket: /tmp/mysql.sock
username: mysql2psql
password:
password:
database: mysql2psql_test
compress: false
destination:
# if file is given, output goes to file, else postgres
file:
file:
postgres:
hostname: localhost
port: 5432
username: mysql2psql
password:
password:
database: mysql2psql_test

# if tables is given, only the listed tables will be converted. leave empty to convert all tables.
Expand All @@ -77,4 +77,7 @@ def reset_configfile(self, file_path):

# if timezone is true, forces to append/convert to UTC tzinfo mysql data
timezone: false

# number of processes to use when writing directly to postgresql.
num_procs: 1
"""
63 changes: 57 additions & 6 deletions mysql2pgsql/lib/converter.py
@@ -1,28 +1,47 @@
from __future__ import absolute_import

from multiprocessing import Process
from threading import Thread
from Queue import Queue

from . import print_start_table


class Converter(object):
def __init__(self, reader, writer, file_options, verbose=False):
def __init__(self, reader_class, reader_args, writer_class, writer_args,
file_options, num_procs=1, verbose=False):
# We store the read/writer classes and args so that we can create
# new instances for multiprocessing, via the get_reader and
# get_writer methods.
self.verbose = verbose
self.reader = reader
self.writer = writer
self.reader_class = reader_class
self.reader_args = reader_args
self.reader = self.get_reader()
self.writer_class = writer_class
self.writer_args = writer_args
self.writer = self.get_writer()
self.file_options = file_options
self.num_procs = num_procs
self.exclude_tables = file_options.get('exclude_tables', [])
self.only_tables = file_options.get('only_tables', [])
self.supress_ddl = file_options.get('supress_ddl', None)
self.supress_data = file_options.get('supress_data', None)
self.force_truncate = file_options.get('force_truncate', None)

def get_reader(self):
return self.reader_class(*self.reader_args)

def get_writer(self):
return self.writer_class(*self.writer_args)

def convert(self):
if self.verbose:
print_start_table('>>>>>>>>>> STARTING <<<<<<<<<<\n\n')

tables = [t for t in (t for t in self.reader.tables if t.name not in self.exclude_tables) if not self.only_tables or t.name in self.only_tables]
if self.only_tables:
tables.sort(key=lambda t: self.only_tables.index(t.name))

if not self.supress_ddl:
if self.verbose:
print_start_table('START CREATING TABLES')
Expand All @@ -47,8 +66,40 @@ def convert(self):
if self.verbose:
print_start_table('START WRITING TABLE DATA')

for table in tables:
self.writer.write_contents(table, self.reader)
if self.num_procs == 1:
# No parallel processing - process tables sequentially.
for table in tables:
self.writer.write_contents(table, self.reader)
else:
# Parallel processing. Work is CPU bound so we need to
# use multiprocessing, however the MySQL table objects
# can't be pickled, so we're unable to easily build a
# worker pool using multiprocessing, so we use threads
# to manager the worker pool, with each worker thread
# creating a new process for each table transferred.
queue = Queue()
for table in tables:
queue.put(table)
for _ in range(self.num_procs):
queue.put(None)

def worker():
while True:
writer = self.get_writer()
reader = self.get_reader()
table = queue.get()
if table is None:
return
proc = Process(target=writer.write_contents, args=(table, reader))
proc.start()
proc.join()

threads = []
for _ in range(self.num_procs):
threads.append(Thread(target=worker))
threads[-1].start()
for thread in threads:
thread.join()

if self.verbose:
print_start_table('DONE WRITING TABLE DATA')
Expand Down
13 changes: 9 additions & 4 deletions mysql2pgsql/mysql2pgsql.py
Expand Up @@ -21,14 +21,19 @@ def __init__(self, options):
raise e

def convert(self):
reader = MysqlReader(self.file_options['mysql'])
reader_class = MysqlReader
reader_args = (self.file_options['mysql'],)
num_procs = 1

if self.file_options['destination']['file']:
writer = PostgresFileWriter(self._get_file(self.file_options['destination']['file']), self.run_options.verbose, tz=self.file_options.get('timezone', False))
writer_class = PostgresFileWriter
writer_args = (self._get_file(self.file_options['destination']['file']), self.run_options.verbose, self.file_options.get('timezone', False))
else:
writer = PostgresDbWriter(self.file_options['destination']['postgres'], self.run_options.verbose, tz=self.file_options.get('timezone', False))
writer_class = PostgresDbWriter
writer_args = (self.file_options['destination']['postgres'], self.run_options.verbose, self.file_options.get('timezone', False))
num_procs = self.file_options.get('num_procs', num_procs)

Converter(reader, writer, self.file_options, self.run_options.verbose).convert()
Converter(reader_class, reader_args, writer_class, writer_args, self.file_options, num_procs, self.run_options.verbose).convert()

def _get_file(self, file_path):
return codecs.open(file_path, 'wb', 'utf-8')