Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

audio: Cleanup scanner code and support live sources #599

Merged
merged 1 commit into from
Dec 4, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions mopidy/audio/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ class Scanner(object):
"""

def __init__(self, timeout=1000, min_duration=100):
self.timeout_ms = timeout
self.min_duration_ms = min_duration
self._timeout_ms = timeout
self._min_duration_ms = min_duration

sink = gst.element_factory_make('fakesink')

audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float')
pad_added = lambda src, pad: pad.link(sink.get_pad('sink'))

self.uribin = gst.element_factory_make('uridecodebin')
self.uribin.set_property('caps', audio_caps)
self.uribin.connect('pad-added', pad_added)
self._uribin = gst.element_factory_make('uridecodebin')
self._uribin.set_property('caps', audio_caps)
self._uribin.connect('pad-added', pad_added)

self.pipe = gst.element_factory_make('pipeline')
self.pipe.add(self.uribin)
self.pipe.add(sink)
self._pipe = gst.element_factory_make('pipeline')
self._pipe.add(self._uribin)
self._pipe.add(sink)

self.bus = self.pipe.get_bus()
self.bus.set_flushing(True)
self._bus = self._pipe.get_bus()
self._bus.set_flushing(True)

def scan(self, uri):
"""
Expand All @@ -54,34 +54,40 @@ def scan(self, uri):
try:
self._setup(uri)
data = self._collect()
# Make sure uri and duration does not come from tags.
# Make sure uri, mtime and duration does not come from tags.
data[b'uri'] = uri
data[b'mtime'] = self._query_mtime(uri)
data[gst.TAG_DURATION] = self._query_duration()
finally:
self._reset()

if data[gst.TAG_DURATION] < self.min_duration_ms * gst.MSECOND:
raise exceptions.ScannerError('Rejecting file with less than %dms '
'audio data.' % self.min_duration_ms)
return data
if self._min_duration_ms is None:
return data
elif data[gst.TAG_DURATION] >= self._min_duration_ms * gst.MSECOND:
return data

raise exceptions.ScannerError('Rejecting file with less than %dms '
'audio data.' % self._min_duration_ms)

def _setup(self, uri):
"""Primes the pipeline for collection."""
self.pipe.set_state(gst.STATE_READY)
self.uribin.set_property(b'uri', uri)
self.bus.set_flushing(False)
self.pipe.set_state(gst.STATE_PAUSED)
self._pipe.set_state(gst.STATE_READY)
self._uribin.set_property(b'uri', uri)
self._bus.set_flushing(False)
result = self._pipe.set_state(gst.STATE_PAUSED)
if result == gst.STATE_CHANGE_NO_PREROLL:
# Live sources don't pre-roll, so set to playing to get data.
self._pipe.set_state(gst.STATE_PLAYING)

def _collect(self):
"""Polls for messages to collect data."""
start = time.time()
timeout_s = self.timeout_ms / float(1000)
timeout_s = self._timeout_ms / float(1000)
poll_timeout_ns = 1000
data = {}

while time.time() - start < timeout_s:
message = self.bus.poll(gst.MESSAGE_ANY, poll_timeout_ns)
message = self._bus.poll(gst.MESSAGE_ANY, poll_timeout_ns)

if message is None:
pass # polling the bus timed out.
Expand All @@ -90,23 +96,23 @@ def _collect(self):
elif message.type == gst.MESSAGE_EOS:
return data
elif message.type == gst.MESSAGE_ASYNC_DONE:
if message.src == self.pipe:
if message.src == self._pipe:
return data
elif message.type == gst.MESSAGE_TAG:
taglist = message.parse_tag()
for key in taglist.keys():
data[key] = taglist[key]

raise exceptions.ScannerError('Timeout after %dms' % self.timeout_ms)
raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms)

def _reset(self):
"""Ensures we cleanup child elements and flush the bus."""
self.bus.set_flushing(True)
self.pipe.set_state(gst.STATE_NULL)
self._bus.set_flushing(True)
self._pipe.set_state(gst.STATE_NULL)

def _query_duration(self):
try:
return self.pipe.query_duration(gst.FORMAT_TIME, None)[0]
return self._pipe.query_duration(gst.FORMAT_TIME, None)[0]
except gst.QueryError:
return None

Expand Down