Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak in Pending.journal. #50

Merged
merged 1 commit into from Feb 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/gofer/rmi/store.py
Expand Up @@ -75,7 +75,7 @@ def _read(path):
return request
except ValueError:
log.error('%s corrupt (discarded)', path)
os.unlink(path)
unlink(path)
finally:
fp.close()

Expand Down Expand Up @@ -160,7 +160,7 @@ def commit(self, sn):
:param sn: str
"""
try:
path = self.journal[sn]
path = self.journal.pop(sn)
unlink(path)
log.debug('%s committed', sn)
except KeyError:
Expand Down
19 changes: 19 additions & 0 deletions test/functional/server.py
Expand Up @@ -193,6 +193,23 @@ def test_performance():
sys.exit(0)


def test_memory():
N = 10000
with open(__file__) as fp:
content = fp.read()
agent = Agent(data=content)
dog = agent.Dog()
t = Timer()
t.start()
print 'testing memory ...'
for n in range(0, N):
dog.bark('hello!')
print 'tested %d' % n
t.stop()
print 'total=%s, percall=%f (ms)' % (t, (t.duration()/N)*1000)
sys.exit(0)


def test_triggers():
agent = Agent(trigger=1)
dog = agent.Dog()
Expand Down Expand Up @@ -445,6 +462,8 @@ def get_options():
Agent.address = address
Agent.base_options['authenticator'] = authenticator

# test_memory()

queue = Queue(address.split('/')[-1].upper())
queue.durable = False
queue.declare(url)
Expand Down
106 changes: 104 additions & 2 deletions test/unit/rmi/test_store.py
Expand Up @@ -10,7 +10,109 @@
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.

from unittest import TestCase
from mock import patch, Mock

from gofer.rmi.store import Pending, Sequential

class Test(TestCase):
pass

class TestPendingQueue(TestCase):

@patch('__builtin__.open')
def test_write(self, _open):
request = Mock()
path = '/tmp/123'
Pending._write(request, path)
_open.assert_called_once_with(path, 'w+')
_open.return_value.write.assert_called_once_with(request.dump.return_value)
_open.return_value.close.assert_called_once_with()

@patch('__builtin__.open')
@patch('gofer.rmi.store.unlink')
def test_read(self, unlink, _open):
body = '{"A": 1}'
_open.return_value.read.return_value = body
path = '/tmp/123'
document = Pending._read(path)
_open.assert_called_once_with(path)
_open.return_value.read.assert_called_once_with()
_open.return_value.close.assert_called_once_with()
self.assertFalse(unlink.called)
self.assertEqual(document.__dict__, {'A': 1})

@patch('__builtin__.open')
@patch('gofer.rmi.store.unlink')
def test_read_invalid_json(self, unlink, _open):
body = '__invalid__'
_open.return_value.read.return_value = body
path = '/tmp/123'
document = Pending._read(path)
_open.assert_called_once_with(path)
_open.return_value.read.assert_called_once_with()
_open.return_value.close.assert_called_once_with()
unlink.assert_called_once_with(path)
self.assertEqual(document, None)

@patch('gofer.rmi.store.unlink')
@patch('gofer.rmi.store.Thread', Mock())
def test_commit(self, unlink):
sn = '123'
path = '/tmp/123'
p = Pending('')
p.journal = {sn: path}
p.commit(sn)
unlink.assert_called_once_with(path)
self.assertEqual(p.journal, {})

@patch('gofer.rmi.store.unlink')
@patch('gofer.rmi.store.Thread', Mock())
def test_commit_not_found(self, unlink):
sn = '123'
path = '/tmp/123'
p = Pending('')
p.journal = {sn: path}
p.commit('invalid')
self.assertFalse(unlink.called)
self.assertEqual(p.journal, {sn: path})


class TestSequential(TestCase):

def test_init(self):

s = Sequential()
self.assertEqual(s.n, 0)
self.assertEqual(s.last, 0.0)

@patch('gofer.rmi.store.time')
def test_next_time_stopped(self, time):
time.return_value = 3.14
s = Sequential()
values = [
s.next(),
s.next(),
s.next(),
]
self.assertEqual(
values,
[
'3-140000-0000.json',
'3-140000-0001.json',
'3-140000-0002.json'
])

@patch('gofer.rmi.store.time')
def test_next_time_moving(self, time):
time.side_effect = [3.14, 3.15, 3.16]
s = Sequential()
values = [
s.next(),
s.next(),
s.next(),
]
self.assertEqual(
values,
[
'3-140000-0000.json',
'3-150000-0000.json',
'3-160000-0000.json'
])