Skip to content

Commit

Permalink
Now mikado serialise will load lazily (rather than a single block) th…
Browse files Browse the repository at this point in the history
…e data from the temporary msgpack files. This should reduce the memory usage. Also, reducing the maximum number of objects per-process (EI-CoreBioinformatics#280).
  • Loading branch information
lucventurini committed Apr 6, 2020
1 parent 6fcad94 commit 08d6cbc
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
1 change: 0 additions & 1 deletion Mikado/parsers/bed12.py
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,6 @@ def gff_next(self, line, sequence):
return bed12

def run(self, *args, **kwargs):
print("Started", self.__identifier)
self.handler = logging_handlers.QueueHandler(self.logging_queue)
self.logger = logging.getLogger(self.name)
self.logger.addHandler(self.handler)
Expand Down
23 changes: 12 additions & 11 deletions Mikado/serializers/blast_serializer/tabular_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ def run(self):
with open(self.params_file, "rb") as pfile:
params = msgpack.loads(pfile.read(), raw=False, strict_map_key=False)
self.columns = params["columns"]
with open(self.index_file, "rb") as index_handle:
self.indexes = msgpack.loads(index_handle.read(), raw=False, strict_map_key=False)
os.remove(self.index_file) # Clean it up
prep_hit = partial(prepare_tab_hit,
columns=self.columns, qmult=self.qmult, tmult=self.tmult,
matrix_name=self.matrix_name)
Expand All @@ -316,12 +313,15 @@ def run(self):
session = Session(bind=self.engine)
self.session = session
hits, hsps = [], []
for key, rows in self.indexes:
curr_hit, curr_hsps = prep_hit(key, rows)
hits.append(curr_hit)
hsps += curr_hsps
hits, hsps = load_into_db(self, hits, hsps, force=False, raw=True)
with open(self.index_file, "rb") as index_handle:
for key, rows in msgpack.Unpacker(index_handle, raw=False, strict_map_key=False):
curr_hit, curr_hsps = prep_hit(key, rows)
hits.append(curr_hit)
hsps += curr_hsps
hits, hsps = load_into_db(self, hits, hsps, force=False, raw=True)
_, _ = load_into_db(self, hits, hsps, force=True, raw=True)
self.logger.debug("Finished %s", self.identifier)
os.remove(self.index_file) # Clean it up
return


Expand Down Expand Up @@ -349,7 +349,7 @@ def parse_tab_blast(self,
index_files = dict((idx, tempfile.mktemp(suffix=".csv")) for idx in
range(procs))
kwargs = {"conf": conf,
"maxobjects": int(self.maxobjects),
"maxobjects": max(int(self.maxobjects / procs), 1),
"lock": lock,
"matrix_name": matrix_name,
"qmult": qmult,
Expand Down Expand Up @@ -392,8 +392,9 @@ def parse_tab_blast(self,
# Split the indices
for idx, split in enumerate(np.array_split(np.array(list(groups.items())), procs)):
with open(index_files[idx], "wb") as index:
vals = [(tuple(item[0]), values[item[1], :].tolist()) for item in split]
index.write(msgpack.dumps(vals))
for item in split:
vals = (tuple(item[0]), values[item[1], :].tolist())
msgpack.dump(vals, index)
assert os.path.exists(index_files[idx])
processes[idx].start()

Expand Down

0 comments on commit 08d6cbc

Please sign in to comment.