Skip to content

Commit

Permalink
Fixes #7 - added MemProtocol
Browse files Browse the repository at this point in the history
  • Loading branch information
kamikaze committed Sep 28, 2019
1 parent cfcc6c0 commit 7898316
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
14 changes: 10 additions & 4 deletions src/aiofm/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,20 @@ async def ls(self, path: Union[str, PurePath], pattern: str = None, *args, **kwa

@asynccontextmanager
async def open(self, path: Union[str, PurePath], *args, **kwargs):
mode = kwargs.get('mode', args[0] if len(args) else 'r')
mode = kwargs.pop('mode', args[0] if len(args) else 'r')

try:
f = self._get_tree_item(self.tree, path)
io_class = ContextualBytesIO if 'b' in mode else ContextualStringIO
f = io_class(f, *args, **kwargs)
item = self._get_tree_item(self.tree, path)

if 'b' in mode:
f = ContextualBytesIO(item)
else:
f = ContextualStringIO(item.decode(kwargs.get('encoding', 'utf-8')))

yield f

if 'w' in mode:
self._set_tree_item(self.tree, path, f.getvalue())
except FileNotFoundError:
if 'w' in mode or 'a' in mode:
f = ContextualStringIO(None, *args, **kwargs)
Expand Down
27 changes: 24 additions & 3 deletions tests/aiofm/test_mem_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,27 @@ async def test_open_existing_file_for_read():
fs = MemProtocol()
fs.tree = {'/': {'tmp': {'xxx': {}, 'a.txt': b'data data data'}}}

with pytest.raises(FileNotFoundError):
async with fs.open('/tmp/a.txt') as f:
assert f.readall() == 'data data data'
async with fs.open('/tmp/a.txt') as f:
assert f.read() == 'data data data'


@pytest.mark.asyncio
async def test_unclosed_file_does_not_change_fs():
fs = MemProtocol()
fs.tree = {'/': {'tmp': {'xxx': {}, 'a.txt': b'data data data'}}}

async with fs.open('/tmp/a.txt', mode='w') as f:
f.write('TEST TEST TEST')

assert fs.tree['/']['tmp']['a.txt'] == b'data data data'


@pytest.mark.asyncio
async def test_closed_file_changes_fs():
fs = MemProtocol()
fs.tree = {'/': {'tmp': {'xxx': {}, 'a.txt': b'data data data'}}}

async with fs.open('/tmp/a.txt', mode='w') as f:
f.write('TEST TEST TEST')

assert fs.tree['/']['tmp']['a.txt'] == b'TEST TEST TEST'

0 comments on commit 7898316

Please sign in to comment.