Skip to content

Commit

Permalink
iotests: add QMP event waiting queue
Browse files Browse the repository at this point in the history
A filter is added to allow callers to request very specific
events to be pulled from the event queue, while leaving undesired
events still in the stream.

This allows us to poll for completion data for multiple asynchronous
events in any arbitrary order.

A new timeout context is added to the qmp pull_event method's
wait parameter to allow tests to fail if they do not complete
within some expected period of time.

Also fixed is a bug in qmp.pull_event where we try to retrieve an event
from an empty list if we attempt to retrieve an event with wait=False
but no events have occurred.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Max Reitz <mreitz@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Message-id: 1429314609-29776-19-git-send-email-jsnow@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
  • Loading branch information
jnsnow authored and kevmw committed Apr 28, 2015
1 parent 9f7264f commit 7898f74
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 30 deletions.
95 changes: 65 additions & 30 deletions scripts/qmp/qmp.py
Expand Up @@ -21,6 +21,9 @@ class QMPConnectError(QMPError):
class QMPCapabilitiesError(QMPError):
pass

class QMPTimeoutError(QMPError):
pass

class QEMUMonitorProtocol:
def __init__(self, address, server=False):
"""
Expand Down Expand Up @@ -72,6 +75,44 @@ def __json_read(self, only_event=False):

error = socket.error

def __get_events(self, wait=False):
"""
Check for new events in the stream and cache them in __events.
@param wait (bool): block until an event is available.
@param wait (float): If wait is a float, treat it as a timeout value.
@raise QMPTimeoutError: If a timeout float is provided and the timeout
period elapses.
@raise QMPConnectError: If wait is True but no events could be retrieved
or if some other error occurred.
"""

# Check for new events regardless and pull them into the cache:
self.__sock.setblocking(0)
try:
self.__json_read()
except socket.error, err:
if err[0] == errno.EAGAIN:
# No data available
pass
self.__sock.setblocking(1)

# Wait for new events, if needed.
# if wait is 0.0, this means "no wait" and is also implicitly false.
if not self.__events and wait:
if isinstance(wait, float):
self.__sock.settimeout(wait)
try:
ret = self.__json_read(only_event=True)
except socket.timeout:
raise QMPTimeoutError("Timeout waiting for event")
except:
raise QMPConnectError("Error while reading from socket")
if ret is None:
raise QMPConnectError("Error while reading from socket")
self.__sock.settimeout(None)

def connect(self, negotiate=True):
"""
Connect to the QMP Monitor and perform capabilities negotiation.
Expand Down Expand Up @@ -140,43 +181,37 @@ def pull_event(self, wait=False):
"""
Get and delete the first available QMP event.
@param wait: block until an event is available (bool)
@param wait (bool): block until an event is available.
@param wait (float): If wait is a float, treat it as a timeout value.
@raise QMPTimeoutError: If a timeout float is provided and the timeout
period elapses.
@raise QMPConnectError: If wait is True but no events could be retrieved
or if some other error occurred.
@return The first available QMP event, or None.
"""
self.__sock.setblocking(0)
try:
self.__json_read()
except socket.error, err:
if err[0] == errno.EAGAIN:
# No data available
pass
self.__sock.setblocking(1)
if not self.__events and wait:
self.__json_read(only_event=True)
event = self.__events[0]
del self.__events[0]
return event
self.__get_events(wait)

if self.__events:
return self.__events.pop(0)
return None

def get_events(self, wait=False):
"""
Get a list of available QMP events.
@param wait: block until an event is available (bool)
"""
self.__sock.setblocking(0)
try:
self.__json_read()
except socket.error, err:
if err[0] == errno.EAGAIN:
# No data available
pass
self.__sock.setblocking(1)
if not self.__events and wait:
ret = self.__json_read(only_event=True)
if ret == None:
# We are in blocking mode, if don't get anything, something
# went wrong
raise QMPConnectError("Error while reading from socket")
@param wait (bool): block until an event is available.
@param wait (float): If wait is a float, treat it as a timeout value.
@raise QMPTimeoutError: If a timeout float is provided and the timeout
period elapses.
@raise QMPConnectError: If wait is True but no events could be retrieved
or if some other error occurred.
@return The list of available QMP events.
"""
self.__get_events(wait)
return self.__events

def clear_events(self):
Expand Down
38 changes: 38 additions & 0 deletions tests/qemu-iotests/iotests.py
Expand Up @@ -78,6 +78,23 @@ def create_image(name, size):
i = i + 512
file.close()

# Test if 'match' is a recursive subset of 'event'
def event_match(event, match=None):
if match is None:
return True

for key in match:
if key in event:
if isinstance(event[key], dict):
if not event_match(event[key], match[key]):
return False
elif event[key] != match[key]:
return False
else:
return False

return True

class VM(object):
'''A QEMU VM'''

Expand All @@ -92,6 +109,7 @@ def __init__(self):
'-machine', 'accel=qtest',
'-display', 'none', '-vga', 'none']
self._num_drives = 0
self._events = []

# This can be used to add an unused monitor instance.
def add_monitor_telnet(self, ip, port):
Expand Down Expand Up @@ -202,14 +220,34 @@ def qtest(self, cmd):

def get_qmp_event(self, wait=False):
'''Poll for one queued QMP events and return it'''
if len(self._events) > 0:
return self._events.pop(0)
return self._qmp.pull_event(wait=wait)

def get_qmp_events(self, wait=False):
'''Poll for queued QMP events and return a list of dicts'''
events = self._qmp.get_events(wait=wait)
events.extend(self._events)
del self._events[:]
self._qmp.clear_events()
return events

def event_wait(self, name='BLOCK_JOB_COMPLETED', timeout=60.0, match=None):
# Search cached events
for event in self._events:
if (event['event'] == name) and event_match(event, match):
self._events.remove(event)
return event

# Poll for new events
while True:
event = self._qmp.pull_event(wait=timeout)
if (event['event'] == name) and event_match(event, match):
return event
self._events.append(event)

return None

index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')

class QMPTestCase(unittest.TestCase):
Expand Down

0 comments on commit 7898f74

Please sign in to comment.