Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG] Force unload data from SBT searches by default (and fix ZipStorage deallocation along the way) #1513

Merged
merged 6 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sourmash/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ def index(args):

notify('loaded {} sigs; saving SBT under "{}"', n, args.sbt_name)
tree.save(args.sbt_name, sparseness=args.sparseness)
if tree.storage:
tree.storage.close()


def search(args):
Expand Down
5 changes: 3 additions & 2 deletions src/sourmash/sbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def add_node(self, node):
def _find_nodes(self, search_fn, *args, **kwargs):
"Search the tree using `search_fn`."

unload_data = kwargs.get("unload_data", False)
unload_data = kwargs.get("unload_data", True)

# initialize search queue with top node of tree
matches = []
Expand Down Expand Up @@ -579,6 +579,7 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False):
subdir = '.sbt.{}'.format(name)
storage_args = FSStorage("", subdir).init_args()
storage.save(subdir + "/", b"")
storage.subdir = subdir
index_filename = os.path.abspath(path)
else: # path.endswith('.sbt.json')
assert path.endswith('.sbt.json')
Expand Down Expand Up @@ -658,7 +659,7 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False):
tree_data = json.dumps(info).encode("utf-8")
save_path = "{}.sbt.json".format(name)
storage.save(save_path, tree_data)
storage.close()
storage.flush()

elif kind == "FS":
with open(index_filename, 'w') as fp:
Expand Down
31 changes: 26 additions & 5 deletions src/sourmash/sbt_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,24 @@ def init_args(self):
return {'path': self.path}

def close(self):
# TODO: this is not ideal; checking for zipfile.fp is looking at
# internal implementation details from CPython...
if self.zipfile is not None or self.bufferzip is not None:
self.flush(keep_closed=True)
self.zipfile.close()
self.zipfile = None

def flush(self, *, keep_closed=False):
# This is a bit complicated, but we have to deal with new data
# (if the original zipfile is read-only) and possible duplicates.

if self.bufferzip is None:
# The easy case: just close the zipfile, nothing else to do
self.zipfile.close()
# The easy case: close (to force flushing) and reopen the zipfile
if self.zipfile is not None:
self.zipfile.close()
if not keep_closed:
self.zipfile = zipfile.ZipFile(self.path, mode='a',
compression=zipfile.ZIP_STORED)
else:
# The complicated one. Need to consider:
# - Is there data in the buffer?
Expand All @@ -200,7 +212,7 @@ def close(self):
# bad news, need to create new file...
# create a temporary file to write the final version,
# which will be copied to the right place later.
tempfile = NamedTemporaryFile()
tempfile = NamedTemporaryFile(delete=False)
final_file = zipfile.ZipFile(tempfile, mode="w")
all_data = buffer_names.union(zf_names)

Expand All @@ -219,16 +231,22 @@ def close(self):
final_file.close()
os.unlink(self.path)
shutil.move(tempfile.name, self.path)

if not keep_closed:
self.zipfile = zipfile.ZipFile(self.path, mode='a',
compression=zipfile.ZIP_STORED)
elif new_data:
# Since there is no duplicated data, we can
# reopen self.zipfile in append mode and write the new data
self.zipfile.close()
zf = zipfile.ZipFile(self.path, mode='a')
if not keep_closed:
zf = zipfile.ZipFile(self.path, mode='a',
compression=zipfile.ZIP_STORED)
for item in new_data:
zf.writestr(item, self.bufferzip.read(item))
self.zipfile = zf
# finally, close the buffer and release memory
self.bufferzip.close()
self.bufferzip = None

@staticmethod
def can_open(location):
Expand All @@ -237,6 +255,9 @@ def can_open(location):
def list_sbts(self):
return [f for f in self.zipfile.namelist() if f.endswith(".sbt.json")]

def __del__(self):
self.close()


class IPFSStorage(Storage):

Expand Down