Skip to content

Commit

Permalink
Fix #18: Asynchronous operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
tkem committed Aug 14, 2015
1 parent 91d5490 commit 5b2e613
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 102 deletions.
16 changes: 9 additions & 7 deletions mopidy_dleyna/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,30 @@ def setup(self, registry):
registry.add('backend', dLeynaBackend)

def validate_environment(self):
from .dleyna import SERVER_BUS_NAME, SERVER_ROOT_PATH
import dbus
try:
bus = self.__session_bus()
if 'DBUS_SESSION_BUS_ADDRESS' in os.environ:
bus = dbus.SessionBus()
else:
bus = self.__start_bus()
except Exception as e:
raise exceptions.ExtensionError(str(e))
from .dleyna import SERVER_BUS_NAME, SERVER_ROOT_PATH
try:
bus.get_object(SERVER_BUS_NAME, SERVER_ROOT_PATH)
except Exception as e:
raise exceptions.ExtensionError(str(e))

def __session_bus(self):
def __start_bus(self):
import dbus
import subprocess
if 'DBUS_SESSION_BUS_ADDRESS' in os.environ:
return dbus.SessionBus()
logger.info('Starting D-Bus session bus')
launch = subprocess.Popen('dbus-launch', stdout=subprocess.PIPE)
for line in map(str.strip, launch.stdout):
for line in launch.stdout:
name, sep, value = line.partition(b'=')
if sep:
logger.debug('dbus-launch output: %s=%s', name, value)
os.environ[name] = value
os.environ[name.strip()] = value.strip()
else:
logger.warn('Unexpected dbus-launch output: %s', line)
launch.wait()
Expand Down
5 changes: 4 additions & 1 deletion mopidy_dleyna/backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import, unicode_literals

import os

from mopidy import backend

import pykka
Expand All @@ -16,6 +18,7 @@ class dLeynaBackend(pykka.ThreadingActor, backend.Backend):

def __init__(self, config, audio):
super(dLeynaBackend, self).__init__()
self.dleyna = dLeynaClient()
# FIXME: how to use session bus?
self.dleyna = dLeynaClient(os.environ['DBUS_SESSION_BUS_ADDRESS'])
self.library = dLeynaLibraryProvider(self)
self.playback = dLeynaPlaybackProvider(audio, self)
164 changes: 109 additions & 55 deletions mopidy_dleyna/dleyna.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from __future__ import absolute_import, unicode_literals

import logging
import sys
import threading
import time

import dbus

import pykka


MEDIA_CONTAINER_IFACE = 'org.gnome.UPnP.MediaContainer2'

MEDIA_DEVICE_IFACE = 'com.intel.dLeynaServer.MediaDevice'

MEDIA_ITEM_IFACE = 'org.gnome.UPnP.MediaItem2'

SERVER_BUS_NAME = 'com.intel.dleyna-server'
Expand All @@ -18,88 +25,135 @@
logger = logging.getLogger(__name__)


def _future(value):
future = pykka.Future()
future.set_get_hook(lambda timeout: value)
return future


def _exc_future(exc_info):
future = pykka.ThreadingFuture()
future.set_exception(exc_info=exc_info)
return future


def _dbus_future(func, *args, **kwargs):
# TODO: remove timing info
future = pykka.ThreadingFuture()
t = time.time()

def on_reply(result):
logger.info('%s reply after %.3fs', func, time.time() - t)
future.set(result)

def on_error(e):
logger.info('%s error after %.3fs', func, time.time() - t)
future.set_exception(exc_info=(type(e), e, None))

func(*args, reply_handler=on_reply, error_handler=on_error, **kwargs)
return future


class dLeynaClient(object):

def __init__(self):
self.__bus = bus = self.__session_bus()
self.__lock = threading.RLock()
def __init__(self, address=None, mainloop=None):
if address:
bus = dbus.bus.BusConnection(address, mainloop=mainloop)
else:
bus = dbus.SessionBus(mainloop=mainloop)
self.__bus = bus
self.__bypath = {}
self.__byudn = {}
self.__manager = mgr = self.get_object(
SERVER_ROOT_PATH,
SERVER_MANAGER_IFACE
)
logger.debug('dleyna-server version %s', mgr.GetVersion())
self.__lock = threading.RLock()

def reply_handler(paths):
for path in paths:
self.__found_server(path)

def error_handler(e):
logger.info('Error retrieving DLNA servers: %s', e)

bus.add_signal_receiver(
self.found_server, 'FoundServer',
self.__found_server, 'FoundServer',
bus_name=SERVER_BUS_NAME
)
bus.add_signal_receiver(
self.lost_server, 'LostServer',
self.__lost_server, 'LostServer',
bus_name=SERVER_BUS_NAME
)
# TODO: delay until later?
for path in mgr.GetServers():
self.found_server(path)

def get_object(self, path, iface=None):
obj = self.__bus.get_object(SERVER_BUS_NAME, path)
return dbus.Interface(obj, iface) if iface else obj
bus.get_object(SERVER_BUS_NAME, SERVER_ROOT_PATH).GetServers(
dbus_interface=SERVER_MANAGER_IFACE,
reply_handler=reply_handler,
error_handler=error_handler
)

def get_properties(self, path):
return self.get_object(path, dbus.PROPERTIES_IFACE).GetAll('')
def children(self, path, offset=0, limit=0, filter=['*']):
return _dbus_future(
self.__bus.get_object(SERVER_BUS_NAME, path).ListChildren,
dbus.UInt32(offset), dbus.UInt32(limit), filter,
dbus_interface=MEDIA_CONTAINER_IFACE
)

def get_container(self, path):
return self.get_object(path, MEDIA_CONTAINER_IFACE)
def properties(self, path, iface=''):
return _dbus_future(
self.__bus.get_object(SERVER_BUS_NAME, path).GetAll,
iface, dbus_interface=dbus.PROPERTIES_IFACE
)

def get_item_url(self, path):
obj = self.__bus.get_object(SERVER_BUS_NAME, path)
props = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
urls = props.Get(MEDIA_ITEM_IFACE, 'URLs')
return urls[0]
def rescan(self):
return _dbus_future(
self.__bus.get_object(SERVER_BUS_NAME, SERVER_ROOT_PATH).Rescan,
dbus_interface=SERVER_MANAGER_IFACE
)

def servers(self):
with self.__lock:
return list(self.__bypath.values())
def search(self, path, query, offset=0, limit=0, filter=['*']):
return _dbus_future(
self.__bus.get_object(SERVER_BUS_NAME, path).SearchObjects,
query, dbus.UInt32(offset), dbus.UInt32(limit), filter,
dbus_interface=MEDIA_CONTAINER_IFACE
)

def get_server(self, udn):
def server(self, udn):
try:
with self.__lock:
return self.__byudn[udn]
return _future(self.__byudn[udn])
except KeyError:
raise LookupError('DLNA media server not found: %s' % udn)
e = LookupError('DLNA media server not found: %s' % udn)
return _exc_future((type(e), e, sys.exc_info()[2]))

def found_server(self, path):
try:
props = self.get_properties(path)
def servers(self):
with self.__lock:
return _future(list(self.__bypath.values()))

def __found_server(self, path):
def reply_handler(properties):
udn = properties['UDN']
name = properties['FriendlyName']
logger.info('Found DLNA media server %s [%s]', name, udn)
with self.__lock:
self.__bypath[path] = self.__byudn[props['UDN']] = props
except dbus.DBusException as e:
logger.warn('Skipping %s: %s', path, e.get_dbus_message())
except Exception:
logger.error('Error adding %s', path, exc_info=True)
else:
logger.info('Found DLNA media server %s [%s]',
props['FriendlyName'], props['UDN'])
self.__bypath[path] = self.__byudn[udn] = properties

def error_handler(e):
logger.error('Cannot access DLNA media server %s: %s', path, e)

def lost_server(self, path):
self.__bus.get_object(SERVER_BUS_NAME, path).GetAll(
'', # all interfaces
dbus_interface=dbus.PROPERTIES_IFACE,
reply_handler=reply_handler,
error_handler=error_handler
)

def __lost_server(self, path):
try:
props = self.__bypath[path]
name = props['FriendlyName']
udn = props['UDN']
with self.__lock:
del self.__byudn[props['UDN']]
del self.__byudn[udn]
del self.__bypath[path]
except KeyError:
logger.debug('Unknown DLNA server path %s', path)
except Exception:
logger.error('Error removing %s', path, exc_info=True)
else:
logger.info('Lost DLNA media server %s [%s]',
props['FriendlyName'], props['UDN'])

def rescan(self):
self.__manager.Rescan()

def __session_bus(self):
import os
# FIXME: dbus.SessionBus() ignores DBUS_SESSION_BUS_ADDRESS?
return dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS'])
logger.info('Lost DLNA media server %s [%s]', name, udn)
53 changes: 19 additions & 34 deletions mopidy_dleyna/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import logging
import os

import dbus

from mopidy import backend
from mopidy.models import Album, Artist, Ref, SearchResult, Track

Expand Down Expand Up @@ -85,8 +83,6 @@

SCHEME = Extension.ext_name

ZERO = dbus.UInt32(0)


def _quote(s):
return '"' + s.replace('\\', '\\\\').replace('"', '\\"') + '"'
Expand Down Expand Up @@ -208,15 +204,16 @@ def browse(self, uri):
refs = []
dleyna = self.backend.dleyna
if uri == self.root_directory.uri:
for server in dleyna.servers():
name = server.get('FriendlyName', server['DisplayName'])
for server in dleyna.servers().get():
name = server['FriendlyName']
uri = uricompose(SCHEME, host=server['UDN'])
refs.append(Ref.directory(name=name, uri=uri))
else:
parts = urisplit(uri)
server = dleyna.get_server(parts.gethost())
container = dleyna.get_container(server['Path'] + parts.getpath())
for obj in container.ListChildren(ZERO, ZERO, BROWSE_FILTER):
server = dleyna.server(parts.gethost()).get()
path = server['Path'] + parts.getpath()
future = dleyna.children(path, filter=BROWSE_FILTER)
for obj in future.get():
ref = _properties_to_ref(server, obj)
if ref:
refs.append(ref)
Expand All @@ -230,18 +227,15 @@ def get_images(self, uris):
def lookup(self, uri):
parts = urisplit(uri)
dleyna = self.backend.dleyna
server = dleyna.get_server(parts.gethost())
server = dleyna.server(parts.gethost()).get()
path = server['Path'] + parts.getpath()
props = dleyna.get_properties(path)
props = dleyna.properties(path).get()
type = props['Type']

tracks = []
# TODO: test on iface?
if type == 'container':
container = dleyna.get_container(path)
for obj in container.SearchObjects(
LOOKUP_QUERY, ZERO, ZERO, SEARCH_FILTER
):
future = dleyna.search(path, LOOKUP_QUERY, filter=SEARCH_FILTER)
for obj in future.get():
track = _properties_to_track(server, obj)
if track:
tracks.append(track)
Expand All @@ -254,6 +248,7 @@ def lookup(self, uri):
return tracks

def refresh(self, uri=None):
logger.info('library.refresh')
self.backend.dleyna.rescan()

def search(self, query=None, uris=None, exact=False):
Expand All @@ -270,18 +265,19 @@ def search(self, query=None, uris=None, exact=False):
query = '*'
logger.debug('dLeyna search query: %s', query)

# TODO: refactor this (no future.map from reply_handler?)
futures = []
for uri in uris or [self.root_directory.uri]:
if uri == self.root_directory.uri:
for server in self.backend.dleyna.servers():
for server in self.backend.dleyna.servers().get():
uri = uricompose(SCHEME, host=server['UDN'])
futures.append(self.__search(query, uri))
else:
futures.append(self.__search(query, uri))

results = collections.defaultdict(list)
for server, objs in pykka.get_all(futures):
for obj in objs:
for objs in pykka.get_all(futures):
for server, obj in objs:
model = _properties_to_model(server, obj)
if model:
results[type(model)].append(model)
Expand All @@ -297,18 +293,7 @@ def search(self, query=None, uris=None, exact=False):
def __search(self, query, uri):
parts = urisplit(uri)
dleyna = self.backend.dleyna
server = dleyna.get_server(parts.gethost())
future = pykka.ThreadingFuture()

def reply_handler(objs):
future.set((server, objs))

def error_handler(e):
future.set_exception(exc_info=(type(e), e, None))

dleyna.get_container(server['Path'] + parts.getpath()).SearchObjects(
query, ZERO, ZERO, SEARCH_FILTER,
reply_handler=reply_handler,
error_handler=error_handler
)
return future
server = dleyna.server(parts.gethost()).get()
path = server['Path'] + parts.getpath()
future = dleyna.search(path, query, filter=SEARCH_FILTER)
return future.map(lambda result: (server, result))
7 changes: 5 additions & 2 deletions mopidy_dleyna/playback.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ class dLeynaPlaybackProvider(backend.PlaybackProvider):
def translate_uri(self, uri):
parts = urisplit(uri)
dleyna = self.backend.dleyna
server = dleyna.get_server(parts.gethost())
return dleyna.get_item_url(server['Path'] + parts.getpath())
server = dleyna.server(parts.gethost()).get()
path = server['Path'] + parts.getpath()
# TODO: single prop, MEDIA_ITEM_IFACE, compatible resources...
future = dleyna.properties(path)
return future.get()['URLs'][0]

0 comments on commit 5b2e613

Please sign in to comment.