Skip to content

Commit

Permalink
Port to Jeepney, a pure Python D-Bus implementation
Browse files Browse the repository at this point in the history
The older dbus-python library is not easy to install from PyPI,
and not actively developed.

This is an initial port and it misses some features, such as
asynchronous (non-blocking) prompts support. Also the documentation
needs some updates.

Fixes #10.
  • Loading branch information
mitya57 committed Jan 22, 2018
1 parent 9ae085f commit 711c931
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 245 deletions.
4 changes: 1 addition & 3 deletions .travis.yml
Expand Up @@ -8,10 +8,8 @@ addons:
apt:
packages:
- dbus-x11
- libdbus-glib-1-dev
install:
- sed -i "s|not getvar('Py_ENABLE_SHARED')|True|" $(which python)-config || true
- pip install cryptography dbus-python
- pip install jeepney cryptography
before_script:
- git clone git://git.gnome.org/libsecret.git
script:
Expand Down
2 changes: 0 additions & 2 deletions docs/util.rst
Expand Up @@ -6,6 +6,4 @@ Additional utility functions
:members:
format_secret,
exec_prompt,
exec_prompt_glib,
exec_prompt_qt,
unlock_objects
36 changes: 10 additions & 26 deletions secretstorage/__init__.py
Expand Up @@ -10,7 +10,7 @@
SecretStorage releases. Those functions are not recommended for use
in new software."""

import dbus
from jeepney.integrate.blocking import connect_and_authenticate
from secretstorage.collection import Collection, create_collection, \
get_all_collections, get_default_collection, get_any_collection, \
get_collection_by_alias, search_items
Expand All @@ -24,30 +24,14 @@
__version_tuple__ = (3, 0, 0)
__version__ = '.'.join(map(str, __version_tuple__))

def dbus_init(main_loop=True, use_qt_loop=False):
"""Returns new SessionBus_. If `main_loop` is :const:`True` and no
D-Bus main loop is registered yet, registers a default main loop
(PyQt5 main loop if `use_qt_loop` is :const:`True`, otherwise GLib
main loop).
def dbus_init(*args, **kwargs):
"""Returns a new connection to the session bus, instance of
:class:`jeepney.DBusConnection` instance. This connection can
then be passed to various SecretStorage functions, such as
:func:`~secretstorage.collection.get_default_collection`.
.. _SessionBus: https://www.freedesktop.org/wiki/IntroductionToDBus/#buses
.. note::
Qt uses GLib main loops on UNIX-like systems by default, so one
will rarely need to set `use_qt_loop` to :const:`True`.
.. versionchanged:: 3.0
Before the port to Jeepney, this function returned an
instance of :class:`dbus.SessionBus` class.
"""
if main_loop and not dbus.get_default_main_loop():
if use_qt_loop:
from dbus.mainloop.pyqt5 import DBusQtMainLoop
DBusQtMainLoop(set_as_default=True)
else:
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
try:
return dbus.SessionBus()
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() in (DBUS_NOT_SUPPORTED,
DBUS_EXEC_FAILED, DBUS_NO_REPLY, DBUS_ACCESS_DENIED):
raise SecretServiceNotAvailableException(
e.get_dbus_message())
raise
return connect_and_authenticate()
82 changes: 32 additions & 50 deletions secretstorage/collection.py
Expand Up @@ -15,12 +15,11 @@
asynchronous). Creating new items and editing existing ones is possible
only in unlocked collection."""

import dbus
from secretstorage.defines import SS_PREFIX, SS_PATH
from secretstorage.exceptions import LockedException, ItemNotFoundException
from secretstorage.item import Item
from secretstorage.util import bus_get_object, InterfaceWrapper, \
exec_prompt_glib, format_secret, open_session, to_unicode, unlock_objects
from secretstorage.util import DBusAddressWrapper, exec_prompt, \
format_secret, open_session, unlock_objects

COLLECTION_IFACE = SS_PREFIX + 'Collection'
SERVICE_IFACE = SS_PREFIX + 'Service'
Expand All @@ -31,22 +30,16 @@ class Collection(object):
"""Represents a collection."""

def __init__(self, connection, collection_path=DEFAULT_COLLECTION, session=None):
collection_obj = bus_get_object(connection, collection_path)
self.connection = connection
self.session = session
self.collection_path = collection_path
self.collection_iface = InterfaceWrapper(collection_obj,
COLLECTION_IFACE)
self.collection_props_iface = InterfaceWrapper(collection_obj,
dbus.PROPERTIES_IFACE)
self.collection_props_iface.Get(COLLECTION_IFACE, 'Label',
signature='ss')
self._collection = DBusAddressWrapper(
collection_path, COLLECTION_IFACE, connection)

def is_locked(self):
"""Returns :const:`True` if item is locked, otherwise
:const:`False`."""
return bool(self.collection_props_iface.Get(
COLLECTION_IFACE, 'Locked', signature='ss'))
return bool(self._collection.get_property('Locked'))

def ensure_not_locked(self):
"""If collection is locked, raises
Expand All @@ -60,44 +53,39 @@ def unlock(self, callback=None):
:func:`~secretstorage.util.exec_prompt` description for details).
Otherwise, uses loop from GLib API and returns a boolean
representing whether the operation was dismissed."""
return unlock_objects(self.connection, [self.collection_path], callback)
return unlock_objects(self.connection, [self.collection_path])

def lock(self):
"""Locks the collection."""
service_obj = bus_get_object(self.connection, SS_PATH)
service_iface = InterfaceWrapper(service_obj, SERVICE_IFACE)
service_iface.Lock([self.collection_path], signature='ao')
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, self.connection)
service.call('Lock', 'ao', [self.collection_path])

def delete(self):
"""Deletes the collection and all items inside it."""
self.ensure_not_locked()
self.collection_iface.Delete(signature='')
self._collection.call('Delete')

def get_all_items(self):
"""Returns a generator of all items in the collection."""
for item_path in self.collection_props_iface.Get(
COLLECTION_IFACE, 'Items', signature='ss'):
for item_path in self._collection.get_property('Items'):
yield Item(self.connection, item_path, self.session)

def search_items(self, attributes):
"""Returns a generator of items with the given attributes.
`attributes` should be a dictionary."""
result = self.collection_iface.SearchItems(attributes,
signature='a{ss}')
result, = self._collection.call('SearchItems', 'a{ss}', attributes)
for item_path in result:
yield Item(self.connection, item_path, self.session)

def get_label(self):
"""Returns the collection label."""
label = self.collection_props_iface.Get(COLLECTION_IFACE,
'Label', signature='ss')
return to_unicode(label)
label = self._collection.get_property('Label')
return label

def set_label(self, label):
"""Sets collection label to `label`."""
self.ensure_not_locked()
self.collection_props_iface.Set(COLLECTION_IFACE, 'Label',
label, signature='ssv')
self._collection.set_property('Label', 's', label)

def create_item(self, label, attributes, secret, replace=False,
content_type='text/plain'):
Expand All @@ -111,13 +99,12 @@ def create_item(self, label, attributes, secret, replace=False,
if not self.session:
self.session = open_session(self.connection)
secret = format_secret(self.session, secret, content_type)
attributes = dbus.Dictionary(attributes, signature='ss')
properties = {
SS_PREFIX+'Item.Label': label,
SS_PREFIX+'Item.Attributes': attributes
SS_PREFIX + 'Item.Label': ('s', label),
SS_PREFIX + 'Item.Attributes': ('a{ss}', attributes),
}
new_item, prompt = self.collection_iface.CreateItem(properties,
secret, replace, signature='a{sv}(oayays)b')
new_item, prompt = self._collection.call('CreateItem', 'a{sv}(oayays)b',
properties, secret, replace)
return Item(self.connection, new_item, self.session)

def create_collection(connection, label, alias='', session=None):
Expand All @@ -127,25 +114,23 @@ def create_collection(connection, label, alias='', session=None):
synchronous function, uses loop from GLib API."""
if not session:
session = open_session(connection)
properties = {SS_PREFIX+'Collection.Label': label}
service_obj = bus_get_object(connection, SS_PATH)
service_iface = dbus.Interface(service_obj, SERVICE_IFACE)
collection_path, prompt = service_iface.CreateCollection(properties,
alias, signature='a{sv}s')
properties = {SS_PREFIX + 'Collection.Label': ('s', label)}
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
collection_path, prompt = service.call('CreateCollection', 'a{sv}s',
properties, alias)
if len(collection_path) > 1:
return Collection(connection, collection_path, session=session)
dismissed, unlocked = exec_prompt_glib(connection, prompt)
dismissed, result = exec_prompt(connection, prompt)
if dismissed:
raise ItemNotFoundException('Prompt dismissed.')
return Collection(connection, unlocked, session=session)
signature, collection_path = result
assert signature == 'o'
return Collection(connection, collection_path, session=session)

def get_all_collections(connection):
"""Returns a generator of all available collections."""
service_obj = bus_get_object(connection, SS_PATH)
service_props_iface = dbus.Interface(service_obj,
dbus.PROPERTIES_IFACE)
for collection_path in service_props_iface.Get(SERVICE_IFACE,
'Collections', signature='ss'):
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
for collection_path in service.get_property('Collections'):
yield Collection(connection, collection_path)

def get_default_collection(connection, session=None):
Expand Down Expand Up @@ -183,19 +168,16 @@ def get_collection_by_alias(connection, alias):
"""Returns the collection with the given `alias`. If there is no
such collection, raises
:exc:`~secretstorage.exceptions.ItemNotFoundException`."""
service_obj = bus_get_object(connection, SS_PATH)
service_iface = dbus.Interface(service_obj, SERVICE_IFACE)
collection_path = service_iface.ReadAlias(alias, signature='s')
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
collection_path, = service.call('ReadAlias', 's', alias)
if len(collection_path) <= 1:
raise ItemNotFoundException('No collection with such alias.')
return Collection(connection, collection_path)

def search_items(connection, attributes):
"""Returns a generator of items in all collections with the given
attributes. `attributes` should be a dictionary."""
service_obj = bus_get_object(connection, SS_PATH)
service_iface = dbus.Interface(service_obj, SERVICE_IFACE)
locked, unlocked = service_iface.SearchItems(attributes,
signature='a{ss}')
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
locked, unlocked = service.call('SearchItems', 'a{ss}', attributes)
for item_path in locked + unlocked:
yield Item(connection, item_path)
49 changes: 17 additions & 32 deletions secretstorage/item.py
Expand Up @@ -9,11 +9,10 @@
the item is unlocked. The collection can be unlocked using collection's
:meth:`~secretstorage.collection.Collection.unlock` method."""

import dbus
from secretstorage.defines import SS_PREFIX
from secretstorage.exceptions import LockedException
from secretstorage.util import InterfaceWrapper, bus_get_object, \
open_session, format_secret, to_unicode, unlock_objects
from secretstorage.util import DBusAddressWrapper, \
open_session, format_secret, unlock_objects
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

Expand All @@ -24,22 +23,18 @@ class Item(object):

def __init__(self, connection, item_path, session=None):
self.item_path = item_path
item_obj = bus_get_object(connection, item_path)
self._item = DBusAddressWrapper(item_path, ITEM_IFACE, connection)
self._item.get_property('Label')
self.session = session
self.connection = connection
self.item_iface = InterfaceWrapper(item_obj, ITEM_IFACE)
self.item_props_iface = InterfaceWrapper(item_obj,
dbus.PROPERTIES_IFACE)
self.item_props_iface.Get(ITEM_IFACE, 'Label', signature='ss')

def __eq__(self, other):
return self.item_path == other.item_path

def is_locked(self):
"""Returns :const:`True` if item is locked, otherwise
:const:`False`."""
return bool(self.item_props_iface.Get(ITEM_IFACE, 'Locked',
signature='ss'))
return bool(self._item.get_property('Locked'))

def ensure_not_locked(self):
"""If collection is locked, raises
Expand All @@ -57,44 +52,37 @@ def unlock(self, callback=None):
boolean representing whether the operation was dismissed.
.. versionadded:: 2.1.2"""
return unlock_objects(self.connection, [self.item_path], callback)
return unlock_objects(self.connection, [self.item_path])

def get_attributes(self):
"""Returns item attributes (dictionary)."""
attrs = self.item_props_iface.Get(ITEM_IFACE, 'Attributes',
signature='ss')
return {to_unicode(key): to_unicode(value)
for key, value in attrs.items()}
attrs = self._item.get_property('Attributes')
return dict(attrs)

def set_attributes(self, attributes):
"""Sets item attributes to `attributes` (dictionary)."""
self.item_props_iface.Set(ITEM_IFACE, 'Attributes', attributes,
signature='ssv')
self._item.set_property('Attributes', 'a{ss}', attributes)

def get_label(self):
"""Returns item label (unicode string)."""
label = self.item_props_iface.Get(ITEM_IFACE, 'Label',
signature='ss')
return to_unicode(label)
return self._item.get_property('Label')

def set_label(self, label):
"""Sets item label to `label`."""
self.ensure_not_locked()
self.item_props_iface.Set(ITEM_IFACE, 'Label', label,
signature='ssv')
self._item.set_property('Label', 's', label)

def delete(self):
"""Deletes the item."""
self.ensure_not_locked()
return self.item_iface.Delete(signature='')
return self._item.call('Delete')

def get_secret(self):
"""Returns item secret (bytestring)."""
self.ensure_not_locked()
if not self.session:
self.session = open_session(self.connection)
secret = self.item_iface.GetSecret(self.session.object_path,
signature='o')
secret, = self._item.call('GetSecret', 'o', self.session.object_path)
if not self.session.encrypted:
return bytes(secret[2])
aes = algorithms.AES(self.session.aes_key)
Expand All @@ -109,8 +97,7 @@ def get_secret_content_type(self):
self.ensure_not_locked()
if not self.session:
self.session = open_session(self.connection)
secret = self.item_iface.GetSecret(self.session.object_path,
signature='o')
secret, = self._item.call('GetSecret', 'o', self.session.object_path)
return str(secret[3])

def set_secret(self, secret, content_type='text/plain'):
Expand All @@ -121,21 +108,19 @@ def set_secret(self, secret, content_type='text/plain'):
if not self.session:
self.session = open_session(self.connection)
secret = format_secret(self.session, secret, content_type)
self.item_iface.SetSecret(secret, signature='(oayays)')
self._item.call('SetSecret', '(oayays)', secret)

def get_created(self):
"""Returns UNIX timestamp (integer) representing the time
when the item was created.
.. versionadded:: 1.1"""
return int(self.item_props_iface.Get(ITEM_IFACE, 'Created',
signature='ss'))
return self._item.get_property('Created')

def get_modified(self):
"""Returns UNIX timestamp (integer) representing the time
when the item was last modified."""
return int(self.item_props_iface.Get(ITEM_IFACE, 'Modified',
signature='ss'))
return self._item.get_property('Modified')

def to_tuple(self):
"""Returns (*attributes*, *secret*) tuple representing the
Expand Down

0 comments on commit 711c931

Please sign in to comment.