Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistence to cache.FileCache #90

Merged
merged 2 commits into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 65 additions & 8 deletions chainerio/cache/file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class FileCache(cache.Cache):
dir (str): The path to the directory to place cache data in
case home directory is not backed by fast storage device.

TODO(kuenishi): retain cache file in case of correct process
termination and reuse for future process re-invocation.

'''

def __init__(self, length, multithread_safe=False, do_pickle=False,
Expand All @@ -126,9 +123,7 @@ def __init__(self, length, multithread_safe=False, do_pickle=False,

self.closed = False
self.indexfp = tempfile.NamedTemporaryFile(delete=True, dir=self.dir)
self.indexfile = self.indexfp.name
self.datafp = tempfile.NamedTemporaryFile(delete=True, dir=self.dir)
self.datafile = self.datafp.name

# allocate space to store 2n 64bit unsigned integers
# 16 bytes * n chunks
Expand All @@ -142,15 +137,23 @@ def __init__(self, length, multithread_safe=False, do_pickle=False,
assert r == self.buflen
self.verbose = verbose
if self.verbose:
print('created index file:', self.indexfile)
print('created data file:', self.datafile)
print('created index file:', self.indexfp.name)
print('created data file:', self.datafp.name)

self._frozen = False

def __len__(self):
return self.length

@property
def frozen(self):
return self._frozen

@property
def multiprocess_safe(self):
return False
# If it's preseved/preloaded, then the file contents are
# fixed.
return self._frozen

@property
def multithread_safe(self):
Expand Down Expand Up @@ -178,6 +181,7 @@ def _get(self, i):
return data

def put(self, i, data):
assert not self._frozen
try:
if self.do_pickle:
data = pickle.dumps(data)
Expand Down Expand Up @@ -251,3 +255,56 @@ def close(self):
self.datafp.close()
self.indexfp = None
self.datafp = None

def preload(self, name):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to split the preload and the freeze of cache?
In the current implementation, user has to do following to freeze the cache and get multi-process safe:
load => preserve => preload

If we can split these two functionalities, then we can simply do "load" => "freeze". Moreover, the cache can be modified before "freeze".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by load?
If "load" means writing data into cache files, that means creating file cache with specific name if I understand it correctly. But it is unacceptable because it leads to partially written named files left after job failure and they'll definitely mess up the cache and following re-run of the job.

Also the principle of cache in this module is data being immutable and deterministic, there should be no modification. What do you mean by "modified"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to cause confusion, by "load" I mean reading data from remote storage. If I understand correctly, in order to use this module, user has to do following:

  1. read data from remote storage
  2. preserve
  3. preload

My suggestion is to have a "freeze" function, so that we can:

  1. read data from remote storage
  2. freeze

to support the multiprocess. I do not think it would increase the risk of having a broken cache.

By modifying, I do not mean changing the data itself, but the ability of adding new cache entities.

Copy link
Member Author

@kuenishi kuenishi Nov 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think it would increase the risk of having a broken cache.

I believe definitely it happens. For example,

  1. Create or a cache file named "foo.cache"
  2. Write 50% data to "foo.cache", like put(1, b'bar')
  3. Job fail

Even in this case partial write may happen like b'ba' was written to disk but b'r' wasn't. Then, restart the job,

  1. Open a 50% written cache file named "foo.cache"
  2. Can't run put(1, b'bar') as the data 1 is immutable
  3. Job fail

Also in this case it is not possible to put data nor to run get(1) if partial write happened. This is typical scenario of broken data and broken cache files must be thrown away.

Copy link
Member

@belldandyxtq belldandyxtq Nov 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we both are misunderstanding each other. I think in your code, you integrated the "freeze" into preserve, which I misunderstood, so the 3 is not necessary. But I think that design forces the user to preserve the cache, which is not always necessary. For example, they just want to enable multiprocessing, which only needs "freezing" the cache. Preserving the data disables the automatic deletion of the cache.

In my example, the cache should not be frozen until all the data is loaded, and after "freeze", no data can be added to the cache. Then when the job fails at a "put", which is definitely before the "freeze", the cache will be neither frozen nor preserved. That is why I think such design will not increase the risk of having broken case.

Sorry for making confusion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatic deletion of the cache among multiple processes does not work correctly because there are no synchronization on closing the file objects, where child processes can't read contents after parent process exit. This is because Python's tempfile module explicitly unlinks named temporary files. Please try following script:

import tempfile
import os
import multiprocessing as mp
import time

with tempfile.NamedTemporaryFile() as f:
    f.write(b'foo')
    f.flush()
    name = f.name

    pid = os.fork()
    print('pid', pid)
    if 0 == pid:
        pname = 'child'
    else:
        pname = 'parent'

    f2 = open(name, 'rb')
    assert f2
    time.sleep(1)
    if pname == 'child':
        time.sleep(1)

You'll see this:

$ Traceback (most recent call last):
  File "t.py", line 22, in <module>
    time.sleep(1)
  File "/usr/lib/python3.7/tempfile.py", line 500, in __exit__
    self.close()
  File "/usr/lib/python3.7/tempfile.py", line 507, in close
    self._closer.close()
  File "/usr/lib/python3.7/tempfile.py", line 444, in close
    unlink(self.name)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmplpb16mgk'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thank you for the explanation. Can you fix the other issues?

'''Load the cache saved by ``preserve()``

After loading the files, no data can be added to the cache.
``name`` is the prefix of the persistent files. To use cache
in ``multiprocessing`` environment, call this method at every
forked process, except the process that called ``preserve()``.

.. note:: This feature is experimental.

'''
if self._frozen:
return

indexfile = os.path.join(self.dir, '{}.cachei'.format(name))
datafile = os.path.join(self.dir, '{}.cached'.format(name))

with self.lock.wrlock():
# Hard link and save them
self.indexfp.close()
self.datafp.close()

self.indexfp = open(indexfile, 'rb')
self.datafp = open(datafile, 'rb')
self._frozen = True

def preserve(self, name):
'''Preserve the cache as persistent files on the disk

Once the cache is preserved, cache files will not be removed
at cache close. To read data from preserved files, use
``preload()`` method. After preservation, no data can be added
to the cache. ``name`` is the prefix of the persistent
files.

.. note:: This feature is experimental.

'''

indexfile = os.path.join(self.dir, '{}.cachei'.format(name))
datafile = os.path.join(self.dir, '{}.cached'.format(name))

with self.lock.wrlock():
# Hard link and save them
os.link(self.indexfp.name, indexfile)
os.link(self.datafp.name, datafile)
self.indexfp.close()
self.datafp.close()

self.indexfp = open(indexfile, 'rb')
self.datafp = open(datafile, 'rb')
self._frozen = True
2 changes: 1 addition & 1 deletion docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Cache API
:members:

.. autoclass:: FileCache
:members:
:members: preserve, preload

Chainer Extensions API
----------------------
Expand Down
23 changes: 23 additions & 0 deletions tests/cache_tests/test_file_cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from chainerio.cache import FileCache
import os
import tempfile

import pytest

Expand Down Expand Up @@ -28,3 +29,25 @@ def mock_pread(_fd, _buf, _offset):

with pytest.raises(OSError):
cache.put(4, str(4))


def test_preservation():
with tempfile.TemporaryDirectory() as d:
cache = FileCache(10, dir=d, do_pickle=True)

for i in range(10):
cache.put(i, str(i))

cache.preserve('preserved')

for i in range(10):
assert str(i) == cache.get(i)

cache.close()

# Imitating a new process, fresh load
cache2 = FileCache(10, dir=d, do_pickle=True)

cache2.preload('preserved')
for i in range(10):
assert str(i) == cache2.get(i)