Permalink
Browse files

make --quiet option work during read

  • Loading branch information...
1 parent 89353e2 commit 7e3dad20040a6d306a6b9ea6457375da089b8a4a @olt olt committed Jul 10, 2012
Showing with 25 additions and 18 deletions.
  1. +3 −1 imposm/app.py
  2. +16 −16 imposm/reader.py
  3. +6 −1 imposm/util.py
View
@@ -202,8 +202,10 @@ def main(argv=None):
if options.quiet:
logger = imposm.util.QuietProgressLog
+ logger_parser = imposm.util.QuietParserProgress
else:
logger = imposm.util.ProgressLog
+ logger_parser = imposm.util.ParserProgress
imposm_timer = imposm.util.Timer('imposm', logger)
@@ -231,7 +233,7 @@ def main(argv=None):
sys.exit(2)
reader = ImposmReader(tag_mapping, cache=cache, merge=options.merge_cache,
- pool_size=options.concurrency, logger=logger)
+ pool_size=options.concurrency, logger=logger_parser)
reader.estimated_coords = imposm.util.estimate_records(args)
for arg in args:
logger.message('## reading %s' % arg)
View
@@ -1,11 +1,11 @@
# Copyright 2011 Omniscale (http://omniscale.com)
-#
+#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,7 @@
from multiprocessing import Process, JoinableQueue
from imposm.parser import OSMParser
-from imposm.util import ParserProgress, setproctitle
+from imposm.util import setproctitle
class ImposmReader(object):
def __init__(self, mapping, cache, pool_size=2, merge=False, logger=None):
@@ -33,22 +33,22 @@ def read(self, filename):
coords_queue = JoinableQueue(512)
ways_queue = JoinableQueue(128)
relations_queue = JoinableQueue(128)
-
- log_proc = ParserProgress()
+
+ log_proc = self.logger()
log_proc.start()
-
+
marshal = True
if self.merge:
# merging needs access to unmarshaled data
marshal = False
-
+
estimates = {
'coords': self.estimated_coords,
'nodes': self.estimated_coords//50,
'ways': self.estimated_coords//7,
'relations': self.estimated_coords//1000,
}
-
+
coords_writer = CacheWriterProcess(coords_queue, self.cache.coords_cache,
estimates['coords'], log=partial(log_proc.log, 'coords'),
marshaled_data=marshal)
@@ -64,27 +64,27 @@ def read(self, filename):
estimates['ways'], merge=self.merge, log=partial(log_proc.log, 'ways'),
marshaled_data=marshal)
ways_writer.start()
-
+
relations_writer = CacheWriterProcess(relations_queue, self.cache.relations_cache,
estimates['relations'], merge=self.merge, log=partial(log_proc.log, 'relations'),
marshaled_data=marshal)
relations_writer.start()
-
+
log_proc.message('coords: %dk nodes: %dk ways: %dk relations: %dk (estimated)' % (
estimates['coords']/1000, estimates['nodes']/1000, estimates['ways']/1000,
estimates['relations']/1000)
)
-
+
# keep one CPU free for writer proc on hosts with 4 or more CPUs
pool_size = self.pool_size if self.pool_size < 4 else self.pool_size - 1
-
+
parser = OSMParser(pool_size, nodes_callback=nodes_queue.put, coords_callback=coords_queue.put,
ways_callback=ways_queue.put, relations_callback=relations_queue.put, marshal_elem_data=marshal)
parser.nodes_tag_filter = self.mapper.tag_filter_for_nodes()
parser.ways_tag_filter = self.mapper.tag_filter_for_ways()
parser.relations_tag_filter = self.mapper.tag_filter_for_relations()
-
+
parser.parse(filename)
coords_queue.put(None)
@@ -111,7 +111,7 @@ def __init__(self, queue, cache, estimated_records=None, merge=False, log=None,
self.log = log
self.marshaled_data = marshaled_data
self.estimated_records = estimated_records
-
+
def run(self):
# print 'creating %s (%d)' % (self.filename, self.estimated_records or 0)
cache = self.cache(mode='w', estimated_records=self.estimated_records)
@@ -121,7 +121,7 @@ def run(self):
cache_put = cache.put
while True:
data = self.queue.get()
- if data is None:
+ if data is None:
self.queue.task_done()
break
if self.merge:
View
@@ -41,6 +41,8 @@ def stop(self):
self.logger.message('%s took %s' % (self.title, format_total_time(seconds)))
class ParserProgress(multiprocessing.Process):
+ log_every_seconds = 0.2
+
def __init__(self):
self.queue = multiprocessing.Queue()
multiprocessing.Process.__init__(self)
@@ -55,7 +57,7 @@ def run(self):
log_type, incr = log_statement
counters[log_type] += incr
- if time.time() - last_log > 0.2:
+ if time.time() - last_log > self.log_every_seconds:
last_log = time.time()
self.print_log(counters)
@@ -82,6 +84,9 @@ def stop(self):
sys.stderr.flush()
self.queue.put(None)
+class QuietParserProgress(ParserProgress):
+ log_every_seconds = 60
+
class ProgressLog(object):
log_every_seconds = 0.2

0 comments on commit 7e3dad2

Please sign in to comment.