Skip to content

Commit

Permalink
Merge pull request #8 from zblesk/reduce-memory-usage
Browse files Browse the repository at this point in the history
Reduce memory usage
  • Loading branch information
zblesk committed Apr 14, 2019
2 parents 6cacda6 + b06c297 commit 1391cd0
Showing 1 changed file with 83 additions and 39 deletions.
122 changes: 83 additions & 39 deletions csv_to_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
import time


__version__ = '1.1.1'
__version__ = '2.0.0'

def write_out(msg):
if write_out.verbose:
print(msg)


class CsvOptions:
def __init__(self, determine_column_types=True,
def __init__(self, typing_style=True,
drop_tables=False, delimiter=","):
self.determine_column_types = determine_column_types
self.typing_style = typing_style
self.drop_tables = drop_tables
self.delimiter = delimiter

Expand All @@ -26,7 +26,8 @@ def __init__(self, path, options = None):
self.path = path
self.columnNames = None
self.columnTypes = None
self.data = []
self.csvfile = None
self.reader = None
self.options = options
if not options:
self.options = CsvOptions()
Expand All @@ -47,22 +48,39 @@ def get_minimal_type(self, value):
pass
return "text"

def process_file(self):
with open(self.path, encoding="utf8") as csvfile:
rdr = csv.reader(csvfile, delimiter=self.options.delimiter)
self.columnNames = [name for name in next(rdr)]
cols = len(self.columnNames)
self.columnTypes = ["string"] * cols if not self.options.determine_column_types else ["integer"] * cols
for row in rdr:
self.data.append(row)
for col in range(cols):
if self.columnTypes[col] == "text":
continue
col_type = self.get_minimal_type(row[col])
if self.columnTypes[col] != col_type:
if col_type == "text" or \
(col_type == "real" and self.columnTypes[col] == "integer"):
self.columnTypes[col] = col_type
def __enter__(self):
self.csvfile = open(self.path, encoding="utf8")
self.reader = csv.reader(self.csvfile, delimiter=self.options.delimiter)
return self

def __exit__(self, *args):
if self.csvfile:
self.csvfile.close()

def get_restarted_reader(self):
self.csvfile.seek(0)
return self.reader

def determine_types(self):
write_out("Determining types")
rdr = self.get_restarted_reader()
self.columnNames = [name for name in next(rdr)]
cols = len(self.columnNames)
if self.options.typing_style == 'none':
self.columnTypes = ["text"] * cols
return
self.columnTypes = ["integer"] * cols
for row in rdr:
for col in range(cols):
if self.columnTypes[col] == "text":
continue
col_type = self.get_minimal_type(row[col])
if self.columnTypes[col] != col_type:
if col_type == "text" or \
(col_type == "real" and self.columnTypes[col] == "integer"):
self.columnTypes[col] = col_type
if self.options.typing_style == 'quick':
break;

def save_to_db(self, connection):
write_out("Writing table " + self.get_table_name())
Expand All @@ -73,14 +91,35 @@ def save_to_db(self, connection):
connection.execute('drop table [{tableName}]'.format(tableName=self.get_table_name()))
except:
pass
connection.execute('create table [{tableName}] (\n'.format(tableName=self.get_table_name()) +
',\n'.join("\t[%s] %s" % (i[0], i[1]) for i in zip(self.columnNames, self.columnTypes)) +
'\n);')
write_out("Inserting {0} records into {1}".format(len(self.data), self.get_table_name()))
connection.executemany('insert into [{tableName}] values ({cols})'
.format(tableName=self.get_table_name(), cols=','.join(['?'] * cols)),
self.data)
return len(self.data)
createQuery = 'create table [{tableName}] (\n'.format(tableName=self.get_table_name()) \
+ ',\n'.join("\t[%s] %s" % (i[0], i[1]) for i in zip(self.columnNames, self.columnTypes)) \
+ '\n);'
write_out(createQuery)
connection.execute(createQuery)
linesTotal = 0
currentBatch = 0
reader = self.get_restarted_reader()
buf = []
maxL = 10000
next(reader) #skip headers
for line in reader:
buf.append(line)
currentBatch += 1
if currentBatch == maxL:
write_out("Inserting {0} records into {1}".format(maxL, self.get_table_name()))
connection.executemany('insert into [{tableName}] values ({cols})'
.format(tableName=self.get_table_name(), cols=','.join(['?'] * cols)),
buf)
linesTotal += currentBatch
currentBatch = 0
buf = []
if len(buf) > 0:
write_out("Flushing the remaining {0} records into {1}".format(len(buf), self.get_table_name()))
connection.executemany('insert into [{tableName}] values ({cols})'
.format(tableName=self.get_table_name(), cols=','.join(['?'] * cols)),
buf)
linesTotal += len(buf)
return linesTotal


@click.command()
Expand All @@ -92,12 +131,16 @@ def save_to_db(self, connection):
@click.option("--output", "-o", help="The output database path",
type=click.Path(),
default=os.path.basename(os.getcwd()) + ".db")
@click.option("--find-types/--no-types",
help="Determines whether the script should guess the column type (int/float/string supported)",
default=True)
@click.option('--typing', "-t",
type=click.Choice(['full', 'quick', 'none']),
help="""Determines whether the script should guess the column type (int/float/string supported).
quick: only base the types on the first line
full: read the entire file
none: no typing, every column is string""",
default='quick')
@click.option("--drop-tables/--no-drop-tables", "-D",
help="Determines whether the tables should be dropped before creation, if they already exist"
"(BEWARE OF DATA LOSS)",
" (BEWARE OF DATA LOSS)",
default=False)
@click.option("--verbose", "-v",
is_flag=True,
Expand All @@ -106,7 +149,7 @@ def save_to_db(self, connection):
@click.option("--delimiter", "-x",
help="Choose the CSV delimiter. Defaults to comma. Hint: for tabs, in Bash use $'\\t'.",
default=",")
def start(file, output, find_types, drop_tables, verbose, delimiter):
def start(file, output, typing, drop_tables, verbose, delimiter):
"""A script that processes the input CSV files and copies them into a SQLite database.
Each file is copied into a separate table. Column names are taken from the headers (first row) in the csv file.
Expand All @@ -126,21 +169,22 @@ def start(file, output, find_types, drop_tables, verbose, delimiter):
return
write_out("Output file: " + output)
conn = sqlite3.connect(output)
defaults = CsvOptions(determine_column_types=find_types, drop_tables=drop_tables, delimiter=delimiter)
defaults = CsvOptions(typing_style=typing, drop_tables=drop_tables, delimiter=delimiter)
write_out("Typing style: " + typing)
totalRowsInserted = 0
startTime = time.clock()
startTime = time.perf_counter()
with click.progressbar(files) as _files:
actual = files if verbose else _files
for file in actual:
try:
file = file.strip()
write_out("Processing " + file)
info = CsvFileInfo(file, defaults)
info.process_file()
totalRowsInserted += info.save_to_db(conn)
with CsvFileInfo(file, defaults) as info:
info.determine_types()
totalRowsInserted += info.save_to_db(conn)
except Exception as exc:
print("Error on table {0}: \n {1}".format(info.get_table_name(), exc))
print("Written {0} rows into {1} tables in {2:.3f} seconds".format(totalRowsInserted, len(files), time.clock() - startTime))
print("Written {0} rows into {1} tables in {2:.3f} seconds".format(totalRowsInserted, len(files), time.perf_counter() - startTime))
conn.commit()

if __name__ == "__main__":
Expand Down

0 comments on commit 1391cd0

Please sign in to comment.